Skip to main content

Kafka Structured streaming : Using Nifi, Spark & Snowflake

This is a end-to-end data flow pipeline created using Apache Nifi, Kafka-Spark structured streaming and Snowflake.


Flow of data in this pipeline :

Server(https://randomuser.me/api/) -----> Nifi (Using REST API) -----> Kafka(Kafka brokers) -----> Consumer(Kafka Structured streaming) ----->Snowflake(To store data)

  • For this project, I have used above online data generation website(randomuser.me) to collect the data stream. This will act as Server.
  • We have to configure Nifi to catch this data stream using InvokeHTTP processor
  • We have to configure Nifi to send this data stream to Kafka Producer using another processor called PublishKafkaRecord_2_6
  • So that data stream will be continuously happening from Online website to Kafka Producer
  • Using PyCharm/VSS, create code for Kafka Consumer to receive this data using Spark structured streaming
  • And store it in snowflake (after doing required transformation if needed)


What knowledge required to understand this pipeline ?  We need to understand below things before creating this pipeline.


Apache Nifi : Nifi was built to automate the flow of data between systems. Data flow will happen in a managed and automatic way in between multiple systems using Nifi.

Apache Spark structured streaming : Apache Spark structured streaming is a stream processing engine that allows user to process data in near real time.

Snowflake : Snowflake is a cloud based data warehouse and data management system(DBMS) that stores processes and analyses data.

Now let's see how to create an end-to-end pipeline for data transformation using above concepts.


Getting started with Apache Nifi :

  • Download "nifi-1.28.1-bin.zip" from link (https://downloads.apache.org/nifi/1.28.1/?C=S;O=D)
  • Extract and put it inside C:\bigdata
  • Run "nifi" batch file which is available in the path "C:\bigdata\nifi-1.28.1\bin"
  • It will open a command prompt and w'll take some time to initial configuration
  • Once it is done, you can see your credentials to login NIFI in "nifi-app.log" file available in path C:\bigdata\nifi-1.28.1\logs
  • Save these credentials to re-use them later
  • URL to open NIFI : Nifi URL (https://localhost:8443/nifi/)
  • Login using above credentials

Nifi InvokeHTTP configuration :

  • Add source website url under HTTP URL as shown in below screen shot


 Nifi PublishkafkaRecord configuration :

  • Need to set Record Reader, Record Writer details
  • I am using JSON data, hence selected JSONTreeReader as record reader & JSONRecordSetWriter as Record Writer



After above configurations, create a connection between InvokeHTTP & PublishkafkaRecord_2_6 configuration and then start the workflow by right clicking on the screen and pressing start button.


Now, we are ready with initial flow of data from online webserver to KafkaProducer. If you want to test, you can start below services, create a topic and run Kafka Consumer in command prompt to see the in-flow data. Make sure to use same topic name that you create in InvokeHTTP configuration.

  • Start Zookeeper service 
    • Zookeeper is a distributed coordination service used to manage and synchronize distributed systems.
    • %KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
  • Start Kafka server
    • %KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties
  • Start a Kafka topic
    • %KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic_name>
    • Note 2181 is the Zookeeper port number for Kafka
  • Start Producer with a topic name
    • %KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic <topic_name>
  • Start Consumer with same topic name as mentioned while starting Producer
    • %KAFKA_HOME%\bin\windows\kafka-console-consumer.bat --topic indpak --from-beginning --bootstrap-server localhost:9092


Below is the Kafka Consumer code :

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName(
"test").master("local[*]").getOrCreate()
df = spark.readStream.format(
"kafka") \
.option(
"kafka.bootstrap.servers", "localhost:9092") \
.option(
"subscribe", "feb11") \
.load()

df=df.selectExpr(
"CAST(value AS STRING) as json_file")
#df.writeStream.outputMode("append").format("console").start().awaitTermination()
#df=df.selectExpr("CAST(value AS STRING)")
sch = StructType([
StructField(
"results", StringType(), True),
StructField(
"info", StringType(), True)
])

df1 = df.withColumn(
"parsed_json",from_json(col("json_file"),sch)).select("parsed_json.*")
#df1.writeStream.outputMode("append").format("console").start().awaitTermination()

# Set options below
sfOptions = {
"sfURL" : "hfeasqz-ys73889.snowflakecomputing.com",
"sfUser" : "arunkumarmathe",
"sfPassword" : "Arun@9704117111",
"sfDatabase" : "arundb",
"sfSchema" : "public",
"sfWarehouse" : "compute_wh"
}
#hfeasqz-ys73889.snowflakecomputing.com
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
#df1.writestream.mode("append").format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable","livekafka").save()

def foreach_batch_function(df, epoch_id):
df=df.withColumn("ts",current_timestamp())

df.write.mode("append").format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable","livekafka").save()


# Transform and write batchDF
pass

df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()


Explanation of above code :
  • Import statements, creating spark session and creating a dataframe is same as we have seen previously
  • Include your topic name in Data Frame
  • Transformation 
    • df1 = df.withColumn("parsed_json",from_json(col("json_file"),sch)).select("parsed_json.*")
  • Below are Snowflake details
        # Set options below
        sfOptions = {
            
"sfURL" : "hfeasqz-ys73889.snowflakecomputing.com",
            
"sfUser" : "arunkumarmathe",
            
"sfPassword" : "Arun@9704117111",
            
"sfDatabase" : "arundb",
            
"sfSchema" : "public",
            
"sfWarehouse" : "compute_wh"
        }
        SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

  • Looping each stream that we are receiving in consistent time intervals, add a column and write to snowflake in the new table with name livekafka

        def foreach_batch_function(df, epoch_id):
    
df=df.withColumn("ts",current_timestamp())

    
df.write.mode("append").format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable","livekafka").save()

  • Repeat write after waiting for some time

        df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()


Snowflake screenshot :

  • Observed that there is a table called LIVEKAFKA inside database ARUNDB
  • We have to create this database and map it in above spark code



That's all for this blog! See you again with another topic in Cloud.


Thanks,
Arun Mathe
Email ID : arunkumar.mathe@gmail.com
Contact ID : 9704117111

Comments

Popular posts from this blog

Python : Python for Spark

Python is a general purpose programming language, that is used for variety of tasks like web-development, Data analytics etc. Initially Python is developed as a functional programming language, later object oriented programming concepts are also added to Python. We will see what basics we need in Python to play with Spark. Incase if you want to practice Spark in Big Data environment, you can use Databricks. URL :  https://community.cloud.databricks.com This is the main tool which programmers are using in real time production environment We have both Community edition(Free version with limited support) & paid versions available Register for above tool online for free and practice Indentation is very important in Python. We don't use braces in Python like we do in Java, and the scope of the block/loop/definition is interpreted based on the indentation of code. Correct Indentation : def greet():     print("Hello!")  # Indented correctly     print("Welcome ...

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

AWS : Boto3 (Accessing AWS using Python)

Boto3 is the Amazon Web Services software development kit for Python, which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2. Boto3 is maintained and published by AWS. Please find latest documentation at : https://boto3.amazonaws.com/v1/documentation/api/latest/index.html Command to install it : pip install boto3 Local storage Vs Cloud storage: Local file system is block oriented, means storage is divided into block with size range 1-4kb Collections of multiple blocks is called a file in local storage Example : 10MB file will be occupying almost 2500 blocks(assuming 4kb each block) We know that we can install softwares in local system (indirectly in blocks) Local system blocks managed by Operating system But Cloud storage is a object oriented storage, means everything is object No size limit, it is used only to store data, we can't install software in cloud storage Cloud storage managed by users We need to install either Pyc...