Apache Kafka :
Apache Kafka is an open-source data streaming platform that stores, processes, and analyzes large amounts of real-time data. It's used to build real-time data pipelines and applications that can adapt to data streams.
Event streaming :
Event streaming is the digital equivalent of the human body's central nervous system. It is the technological foundation for the 'always-on' world where businesses are increasingly software-defined and automated, and where the user of software is more software.
Technically speaking, event streaming is the practice of capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and software applications in the form of streams of events; storing these event streams durably for later retrieval; manipulating, processing, and reacting to the event streams in real-time as well as retrospectively; and routing the event streams to different destination technologies as needed. Event streaming thus ensures a continuous flow and interpretation of data so that the right information is at the right place, at the right time.
Apache Kafka is an event streaming platform. What does that mean?
Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:
- To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
- To store streams of events durably and reliably for as long as you want.
- To process streams of events as they occur or retrospectively.
And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner. Kafka can be deployed on bare-metal hardware, virtual machines, and containers, and on-premises as well as in the cloud. You can choose between self-managing your Kafka environments and using fully managed services offered by a variety of vendors.
For more information on Kafka, please visit : https://kafka.apache.org/documentation/#gettingStarted
Examples of use :
- Tracking user activity data to see how people use a website in real-time
- Feeding an application that tracks product sales in real-time
Download and installation :
- Open Google home page and search as "download Kafka index"
- Click on following website : https://archive.apache.org/dist/kafka/
- Recommended version at this time : https://archive.apache.org/dist/kafka/2.8.1/
- Download the tgz file with maximum size : kafka_2.12-2.8.1.tgz
- Go to Downloads in your local computer, extract zipped file (need to extract twice as extracted folder will be also in zipped format)
- Put this Kafka folder in you local bigdata folder
- Copy path : C:\bigdata\kafka_2.12-2.8.1
- Paste it in system environment variables as shown in below screen shot
- Source (From where data is getting generated)
- Topic (It is just a name to this particular event/action)
- Messages (Actual event streams)
- Producers (Part of Zookeeper, who receives streams)
- Brokers/Servers (part of Zookeeper, who stores & transform streams)
- Consumers (Part of Zookeeper, who send streams to sink)
- Sink (Receiver end)
- Zookeeper (A distributed co-ordination service, to manage & sync distributed systems)
- Start Zookeeper service
- Zookeeper is a distributed coordination service used to manage and synchronize distributed systems.
- %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
- Start Kafka server
- %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties
- Start a Kafka topic
- %KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic_name>
- Note 2181 is the Zookeeper port number for Kafka
- Start Producer with a topic name
- %KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>
- Start Consumer with same topic name as mentioned while starting Producer
- %KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --topic indpak --from-beginning --bootstrap-server localhost:9092
- While working on IDE on PyCharm, please refer following website for more information : https://kafka-python.readthedocs.io/en/master/usage.html
- Use following command to install Kafka in PyCharm
- pip install kafka-python
- Now, let's start Zookeeper service, Kafka server and also start a topic using commands mentioned above(In windows, run each command in a different command prompt window)
from kafka import KafkaConsumer
consumer = KafkaConsumer('feb8')
for msg in consumer:
print (msg)
- Now, let's start Kafka Producer service in a new command prompt as below.
- As you have already started the Kafka Consumer code, it will be continuously iterating and receiving the live information that's getting generated from Kafka Producer
- The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value as below:
C:\Users\arunk\PyCharmMiscProject\.venv\Scripts\python.exe C:\Users\arunk\PyCharmMiscProject\kafka_streaming.py
ConsumerRecord(topic='feb8', partition=0, offset=2, timestamp=1739006035480, timestamp_type=0, key=None, value=b'', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=0, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=3, timestamp=1739006047331, timestamp_type=0, key=None, value=b'Thoshi,01', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=4, timestamp=1739013897518, timestamp_type=0, key=None, value=b'', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=0, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=5, timestamp=1739013903631, timestamp_type=0, key=None, value=b'anu, 35', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=7, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=6, timestamp=1739022625346, timestamp_type=0, key=None, value=b'', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=0, serialized_header_size=-1)
ConsumerRecord(topic='feb8', partition=0, offset=7, timestamp=1739022635440, timestamp_type=0, key=None, value=b'Arun, xyz', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1)
- If you want just value, then instead in print(msg), use print(msg.value)
Solution is just run below pip install command in PyCharm terminal :
pip install git+https://github.com/dpkp/kafka-python.git
Keep in mind that whatever we did until now is ONLY Kafka, we haven't integrated Kafka with Spark yet. Let's see how to do it. By now, you should have a basic understanding on What is Kafka and How it work. If not, I would recommend to have another reading of this page since beginning.
- Download dependency jar from below link :
- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.2
- Click on jar in "Files" section, download it.
- Copy paste it in your Spark\jars folder
- My local location is : C:\bigdata\spark-3.1.2-bin-hadoop3.2\jars\
- All important dependencies will be here for Spark
- Document to follow : https://spark.apache.org/docs/3.5.1/structured-streaming-kafka-integration.html
Please find the Kafka Consumer code as below :
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, timestamp_seconds, current_timestamp
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
# Kafka Consumer Code
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "feb8") \
.load()
df.printSchema()
df=df.selectExpr("CAST(value AS STRING)")
#df.writeStream.outputMode("append").format("console").start().awaitTermination()
res=(df.withColumn("name", split(col("value"),',')[0])
.withColumn("age", split(col("value"),',')[1]))
def for_each_batch_function(df, epoch_id):
df = df.withColumn("ts", current_timestamp())
host = "jdbc:mysql://diwakarmysql.cf08m8g4ysxd.ap-south-1.rds.amazonaws.com:3306/mysqldb"
df.write.mode("append").format("jdbc").option("url", host).option("user", "admin").option("password","xxxxxxxxx").option("dbtable", "livekafkafeb8").save()
df.show()
res.writeStream.foreachBatch(for_each_batch_function).start().awaitTermination()
Points to note :
- Above Python code is an example for Spark structured streaming + Kafka Consumer
- We need to start Zookeeper service, Kafka server and create a Topic (feb8 in above code) before running this code in either PyCharm/VSS
- Then run this code, Consumer will start listening and receiving data from Producer
- Start Kafka Producer in a separate CMD and start entering some data to send it to Consumer
- I will add more information about Spark structured streaming + Kafka Producer
I will be adding more information + code to be more precise, and will keep updating this page on integrating Spark structured streaming with Kafka.
Arun Mathe
Gmail ID : arunkumar.mathe@gmail.com
Contact No : +91 9704117111
Thank you
ReplyDeleteThank you for links
ReplyDelete