Skip to main content

Spark : Spark Streaming using Databricks, EC2 & RDS

In this blog, we are going to see how Spark streaming will work, we will also see how to write Spark code to read streaming data, and store in some other place, let's say AWS RDS.


Spark Architecture : 

We have below layers in Spark architecture. 

  • Data Storage (HDFS, HBase, Cassandra, Amazon S3)
  • Resource Management (Hadoop Yarn, Apache Mesos, Kubernetes)
  • Processing Engine (Spark Core)
  • Libraries (Spark streaming, Spark SQL, GraphX, MLlib)
  • API's (Scala, Python, Java, R)


Spark core : 

Spark core is the heart of Spark architecture. By default, it will process only batch data(historical data), on the top of this we can use libraries to perform multiple activities. That means, if you need to run any SQL queries on the top of Spark, it won't support directly, hence we use Spark SQL.

We have to understand that there is a limitation in Spark where we can't read live incoming traffic, at-least we need to let that incoming traffic wait for few seconds, and then only read, process it. This is configurable using a property in Spark code. We will see more information in below sections.

Apache Flink is an open source Apache tool best fit to process real time data(live streaming), but Spark streaming is a best fit for batch processing. To make it simple, use Spark streaming for historical data, and Apache Flink for live data processing.

More information about Apache Flink at : https://www.tutorialspoint.com/apache_flink/apache_flink_batch_realtime_processing.htm


Let's get into the use case now, below are the prerequisites :

  • Create an EC2 to send some live data using netcat command
  • Using Databricks/EMR/local computer, create a Spark environment
  • Write Spark code to read incoming data


Use below command in command prompt after connecting to Ubuntu EC2 instance :

This is act as a source which will be sending live data.

ubuntu@ip-172-31-12-58:~$ nc -lk 1111

bob,36,hyd

sri,35,hyd

thoshi,01,hyd

ashu,34,hyd

ram,60,hyd


Now login to Databricks, create a Spark ecosystem and run below piece of code :

  • To create or get the Spark session

spark = SparkSession.builder.appName("StopStreamingContext").getOrCreate()

  • To set Streaming context, '10' is to tell streaming to repeat receiving stream after 10 seconds

ssc = StreamingContext(spark.sparkContext, 10)

  • To receive stream from below EC2 in AWS using port number '1111' (we need to set this in AWS EC2 config under security tab)

lines = ssc.socketTextStream("ec2-65-2-69-46.ap-south-1.compute.amazonaws.com", 1111)

  • To print received stream in Databricks console

lines.pprint()

  • Create data frame using RDD, lambda
  • File columns groupBy city="hyd"

 df = rdd.map(lambda x:x.split(",")).toDF(["name","age","city"])
        hydf=df.where(col("city")=="hyd")

  • Connect to AWS RDS (MySQL)
  • Create a table with name livefeb4
  • Append records into this table

host="jdbc:mysql://diwakarmysql.cf08m8g4ysxd.ap-south-1.rds.amazonaws.com:3306/mysqldb"
        df.write.mode("append").format("jdbc").option("url",host).option("user","admin").option("password","Mypassword.1").option("dbtable","livefeb4").save()



Spark code for this activity :

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.streaming import *
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Create or get the existing Spark session
spark = SparkSession.builder.appName("StopStreamingContext").getOrCreate()

ssc = StreamingContext(spark.sparkContext, 10)

lines = ssc.socketTextStream("ec2-65-2-69-46.ap-south-1.compute.amazonaws.com", 1111)
lines.pprint()
#foreachRDD

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]


def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        df = rdd.map(lambda x:x.split(",")).toDF(["name","age","city"])
        hydf=df.where(col("city")=="hyd")

        host="jdbc:mysql://diwakarmysql.cf08m8g4ysxd.ap-south-1.rds.amazonaws.com:3306/mysqldb"
        df.write.mode("append").format("jdbc").option("url",host).option("user","admin").option("password","xxxxxxxxx").option("dbtable","livefeb4").save()

        df.show()
    except:
        pass

lines.foreachRDD(process)
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminat


Now, after this job completed, you can see a table created with name livefeb4 in RDS.

Let's learn more information in coming blogs. Have a great day!


Arun Mathe

Gmail ID : arunkumar.mathe@gmail.com

Contact No : +91 9704117111







Comments

Popular posts from this blog

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

Python : Python for Spark

Python is a general purpose programming language, that is used for variety of tasks like web-development, Data analytics etc. Initially Python is developed as a functional programming language, later object oriented programming concepts are also added to Python. We will see what basics we need in Python to play with Spark. Incase if you want to practice Spark in Big Data environment, you can use Databricks. URL :  https://community.cloud.databricks.com This is the main tool which programmers are using in real time production environment We have both Community edition(Free version with limited support) & paid versions available Register for above tool online for free and practice Indentation is very important in Python. We don't use braces in Python like we do in Java, and the scope of the block/loop/definition is interpreted based on the indentation of code. Correct Indentation : def greet():     print("Hello!")  # Indented correctly     print("Welcome ...

AWS : Boto3 (Accessing AWS using Python)

Boto3 is the Amazon Web Services software development kit for Python, which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2. Boto3 is maintained and published by AWS. Please find latest documentation at : https://boto3.amazonaws.com/v1/documentation/api/latest/index.html Command to install it : pip install boto3 Local storage Vs Cloud storage: Local file system is block oriented, means storage is divided into block with size range 1-4kb Collections of multiple blocks is called a file in local storage Example : 10MB file will be occupying almost 2500 blocks(assuming 4kb each block) We know that we can install softwares in local system (indirectly in blocks) Local system blocks managed by Operating system But Cloud storage is a object oriented storage, means everything is object No size limit, it is used only to store data, we can't install software in cloud storage Cloud storage managed by users We need to install either Pyc...