Skip to main content

Kafka : Kafka integrated with Spark Structured streaming

Apache Kafka :

Apache Kafka is an open-source data streaming platform that stores, processes, and analyzes large amounts of real-time data. It's used to build real-time data pipelines and applications that can adapt to data streams. 

Event streaming : 

Event streaming is the digital equivalent of the human body's central nervous system. It is the technological foundation for the 'always-on' world where businesses are increasingly software-defined and automated, and where the user of software is more software.

Technically speaking, event streaming is the practice of capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and software applications in the form of streams of events; storing these event streams durably for later retrieval; manipulating, processing, and reacting to the event streams in real-time as well as retrospectively; and routing the event streams to different destination technologies as needed. Event streaming thus ensures a continuous flow and interpretation of data so that the right information is at the right place, at the right time.

Apache Kafka is an event streaming platform. What does that mean?

Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:

  • To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
  • To store streams of events durably and reliably for as long as you want.
  • To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner. Kafka can be deployed on bare-metal hardware, virtual machines, and containers, and on-premises as well as in the cloud. You can choose between self-managing your Kafka environments and using fully managed services offered by a variety of vendors. 

For more information on Kafka, please visit : https://kafka.apache.org/documentation/#gettingStarted

Examples of use :

  • Tracking user activity data to see how people use a website in real-time 
  • Feeding an application that tracks product sales in real-time 

Download and installation :

  • Open Google home page and search as "download Kafka index"
  • Click on following website : https://archive.apache.org/dist/kafka/ 
  • Recommended version at this time : https://archive.apache.org/dist/kafka/2.8.1/ 
  • Download the tgz file with maximum size : kafka_2.12-2.8.1.tgz 
  • Go to Downloads in your local computer, extract zipped file (need to extract twice as extracted folder will be also in zipped format)
  • Put this Kafka folder in you local bigdata folder
  • Copy path : C:\bigdata\kafka_2.12-2.8.1
  • Paste it in system environment variables as shown in below screen shot

  • Installation is Done. 

Kafka life cycle :
  • Source (From where data is getting generated)
  • Topic (It is just a name to this particular event/action)
  • Messages (Actual event streams)
  • Producers (Part of Zookeeper, who receives streams)
  • Brokers/Servers (part of Zookeeper, who stores & transform streams)
  • Consumers (Part of Zookeeper, who send streams to sink)
  • Sink (Receiver end)
  • Zookeeper (A distributed co-ordination service, to manage & sync distributed systems)


Getting started with a sample POC using Kafka:

To get start, we need to run below 3 commands in 3 different command prompts(assuming you are using windows), these steps are mandatory in any environment(learning/dev/testing) :
  • Start Zookeeper service 
    • Zookeeper is a distributed coordination service used to manage and synchronize distributed systems.
    • %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
  • Start Kafka server
    • %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties
  • Start a Kafka topic
    • %KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic_name>
    • Note 2181 is the Zookeeper port number for Kafka

After running above 3 commands successfully, we need to test Producer and Consumer using below commands :
  • Start Producer with a topic name
    • %KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>
  • Start Consumer with same topic name as mentioned while starting Producer
    • %KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --topic indpak --from-beginning --bootstrap-server localhost:9092
Please note above 2 commands are just for testing to see if data that we are generating at Producers end being received by Consumer or not. We won't do this in production, either we have to use an IDE like PyCharm, Visual Studio Code or use some kind of tools like Putty.


Reference information for working with Kafka using an IDE :
  • While working on IDE on PyCharm, please refer following website for more information : https://kafka-python.readthedocs.io/en/master/usage.html
  • Use following command to install Kafka in PyCharm
    • pip install kafka-python

PyCharm terminal log :
(.venv) PS C:\Users\arunk\PyCharmMiscProject> pip install kafka-python
Collecting kafka-python
  Obtaining dependency information for kafka-python from https://files.pythonhosted.org/packages/75/68/dcb0db055309f680ab2931a3eeb22d865604b638acf8c914bedf4c1a0c8c/kafka_python-2.0.2-py2.py3-none-any.whl.metadata
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl.metadata (7.8 kB)
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 kB 2.5 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2

  • Now, let's start Zookeeper service, Kafka server and also start a topic using commands mentioned above(In windows, run each command in a different command prompt window)



  • Now, in PyCharm, run below Kafka Consumer code
    • 'feb8' is the topic
from kafka import KafkaConsumer
consumer = KafkaConsumer('feb8')
for msg in consumer:
print (msg)
  • Now, let's start Kafka Producer service in a new command prompt as below.

  • As you have already started the Kafka Consumer code, it will be continuously iterating and receiving the live information that's getting generated from Kafka Producer
  • The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value as below: 
C:\Users\arunk\PyCharmMiscProject\.venv\Scripts\python.exe C:\Users\arunk\PyCharmMiscProject\kafka_streaming.py 
ConsumerRecord(topic='feb8', partition=0, offset=2, timestamp=1739006035480, timestamp_type=0, key=None, value=b'', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=0, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=3, timestamp=1739006047331, timestamp_type=0, key=None, value=b'Thoshi,01', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=4, timestamp=1739013897518, timestamp_type=0, key=None, value=b'', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=0, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=5, timestamp=1739013903631, timestamp_type=0, key=None, value=b'anu, 35', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=7, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=6, timestamp=1739022625346, timestamp_type=0, key=None, value=b'', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=0, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=7, timestamp=1739022635440, timestamp_type=0, key=None, value=b'Arun, xyz', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1)
  • If you want just value, then instead in print(msg), use print(msg.value)
C:\Users\arunk\PyCharmMiscProject\.venv\Scripts\python.exe C:\Users\arunk\PyCharmMiscProject\kafka_streaming.py 
b'Arun,36'

Note : 
I have landed into below exception/error while running Consumer code as below.

Issue was documented in the following GitHub page : https://github.com/dpkp/kafka-python/issues/2412 
Solution is just run below pip install command in PyCharm terminal :
pip install git+https://github.com/dpkp/kafka-python.git

Keep in mind that whatever we did until now is ONLY Kafka, we haven't integrated Kafka with Spark yet. Let's see how to do it. By now, you should have a basic understanding on What is Kafka and How it work. If not, I would recommend to have another reading of this page since beginning.
  • Download dependency jar from below link :
    • https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.2

  • Click on jar in "Files" section, download it.
  • Copy paste it in your Spark\jars folder
    • My local location is : C:\bigdata\spark-3.1.2-bin-hadoop3.2\jars\
    • All important dependencies will be here for Spark
  • Document to follow : https://spark.apache.org/docs/3.5.1/structured-streaming-kafka-integration.html

Please find the Kafka Consumer code as below :

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, timestamp_seconds, current_timestamp

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

# Kafka Consumer Code
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "feb8") \
.load()

df.printSchema()
df=df.selectExpr("CAST(value AS STRING)")

#df.writeStream.outputMode("append").format("console").start().awaitTermination()
res=(df.withColumn("name", split(col("value"),',')[0])
.withColumn("age", split(col("value"),',')[1]))

def for_each_batch_function(df, epoch_id):
df = df.withColumn("ts", current_timestamp())
host = "jdbc:mysql://diwakarmysql.cf08m8g4ysxd.ap-south-1.rds.amazonaws.com:3306/mysqldb"
df.write.mode("append").format("jdbc").option("url", host).option("user", "admin").option("password","xxxxxxxxx").option("dbtable", "livekafkafeb8").save()

df.show()

res.writeStream.foreachBatch(for_each_batch_function).start().awaitTermination()

Points to note :
  • Above Python code is an example for Spark structured streaming + Kafka Consumer 
  • We need to start Zookeeper service, Kafka server and create a Topic (feb8 in above code) before running this code in either PyCharm/VSS
  • Then run this code, Consumer will start listening and receiving data from Producer
  • Start Kafka Producer in a separate CMD and start entering some data to send it to Consumer
  • I will add more information about Spark structured streaming + Kafka Producer

I will be adding more information + code to be more precise, and will keep updating this page on integrating Spark structured streaming with Kafka.

Arun Mathe

Gmail ID : arunkumar.mathe@gmail.com

Contact No : +91 9704117111

Comments

Post a Comment

Popular posts from this blog

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

Python : Python for Spark

Python is a general purpose programming language, that is used for variety of tasks like web-development, Data analytics etc. Initially Python is developed as a functional programming language, later object oriented programming concepts are also added to Python. We will see what basics we need in Python to play with Spark. Incase if you want to practice Spark in Big Data environment, you can use Databricks. URL :  https://community.cloud.databricks.com This is the main tool which programmers are using in real time production environment We have both Community edition(Free version with limited support) & paid versions available Register for above tool online for free and practice Indentation is very important in Python. We don't use braces in Python like we do in Java, and the scope of the block/loop/definition is interpreted based on the indentation of code. Correct Indentation : def greet():     print("Hello!")  # Indented correctly     print("Welcome ...

AWS : Boto3 (Accessing AWS using Python)

Boto3 is the Amazon Web Services software development kit for Python, which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2. Boto3 is maintained and published by AWS. Please find latest documentation at : https://boto3.amazonaws.com/v1/documentation/api/latest/index.html Command to install it : pip install boto3 Local storage Vs Cloud storage: Local file system is block oriented, means storage is divided into block with size range 1-4kb Collections of multiple blocks is called a file in local storage Example : 10MB file will be occupying almost 2500 blocks(assuming 4kb each block) We know that we can install softwares in local system (indirectly in blocks) Local system blocks managed by Operating system But Cloud storage is a object oriented storage, means everything is object No size limit, it is used only to store data, we can't install software in cloud storage Cloud storage managed by users We need to install either Pyc...