Skip to main content

AWS : Running spark code using Databricks

In this blog, we are going to understand how to prepare Databricks workbench to read, write data from relational databases in cloud(AWS) environment. Databricks is a company that provides could based platform for data analytics and AI. We have to create an account here which is free of cost.


What is the use case ? 

We are basically trying to read data from a MySQL table which is residing inside a MySQL DB created in AWS RDS service. Once we read the table/data, we need to apply a simple transformation and load same table in another SQL DB i.e., MicrosoftSQL which is also residing inside AWS RDS service. So, first we need to create 2 databases, one MySQL and another MSSQL by logging in into AWS account. 

Once these 2 databases are ready, we need to install SQL workbench to interact with these relational databases from our local system(Please refer : How to setup SQL workbench blog)


How to create Databricks account ?

  • Login to https://community.cloud.databricks.com/ using email ID
  • You will receive an OTP to email, enter it in the prompt and login
  • Reference screenshot 


How to create a cluster with Spark environment (refer above screenshot) ?
  • Click on compute option
  • Click on "Create compute" button
  • Now select some random version of spark and click create compute button
  • That's all, cluster with spark environment have created

How to open a new note to write Spark code ?
  • Click on "New" button on the top and then create a new notebook


Scenario 1 :  Read table from MySQL
  • Open SQL workbench, connect to MySQL database inside AWS RDS(refer section "How to open a new workbench and connect to a Database ?" inside my blob "How to setup SQL workbench" 
Spark code :

# Below line of code is the connection URL of MySQL database inside AWS RDS service
data="jdbc:mysql://sparkpoc1.ctucw2ay6r46.ap-south-1.rds.amazonaws.com:3306/arundb"

# This is how we connect to a specific table using spark, check query section in spark website : https://spark.apache.org/docs/3.5.1/sql-data-sources-jdbc.html
df=spark.read.format("jdbc").option("url", data).option("user","admin").option("password","Mypassword.01").option("dbtable","emp").load()

# To display the data that we read using above line of code
df.show()


Scenario 2 :  Read table from MySQL and load same table in another database, MSSQL

Spark code :

from pyspark.sql.functions import *
# MySQL URL
data="jdbc:mysql://sparkpoc1.ctucw2ay6r46.ap-south-1.rds.amazonaws.com:3306/arundb"
# reading emp table from MySQL
df=spark.read.format("jdbc").option("url", data).option("user","admin").option("password","Mypassword.01").option("dbtable","emp").load()
# displaying information from data frame
#df.show()
# Transformation
df=df.withColumn("today", current_date())
# MSSQL URL
mssqlurl="jdbc:sqlserver://sparkpoc2.ctucw2ay6r46.ap-south-1.rds.amazonaws.com:1433;trustServerCertificate=true;databaseName=arundb"
# Loading emp table read from MySQL DB to arunemp table in MSSQL table
df.write.format("jdbc").option("url", mssqlurl).option("user","admin").option("password","Mypassword.02").option("dbtable","arun1emp").save()


Scenario 3 :  Reading all tables from a DB in MySQl, transform and load it into destination DB MSSQL

Spark code :

from pyspark.sql.functions import *

#tabs=["emp","dept","orders"]

data="jdbc:mysql://walmart2025.cdyg20qqucui.ap-south-1.rds.amazonaws.com:3306/pavanidb"
qry="(SELECT table_name FROM information_schema.tables WHERE table_schema = 'pavanidb') aaa"
all=spark.read.format("jdbc").option("url",data).option("user","myuser").option("password","Mypassword.1").option("dbtable",qry).load()
type(all)
print(all)
tabs = [t[0] for t in all.collect()]

print(tabs)
for t in tabs:
print("table:",t)
data="jdbc:mysql://walmart2025.cdyg20qqucui.ap-south-1.rds.amazonaws.com:3306/pavanidb"
df=spark.read.format("jdbc").option("url",data).option("user","myuser").option("password","Mypassword.1").option("dbtable",t).load()
df.show(4)
#df.write.format("jdbc").option("url",mshost).option("user","myuser").option("password","Mypassword.1").option("dbtable",t+"_mysql123").save()



Arun Mathe

Gmail ID : arunkumar.mathe@gmail.com

Contact No : +91 9704117111

Comments

Popular posts from this blog

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

Python : Python for Spark

Python is a general purpose programming language, that is used for variety of tasks like web-development, Data analytics etc. Initially Python is developed as a functional programming language, later object oriented programming concepts are also added to Python. We will see what basics we need in Python to play with Spark. Incase if you want to practice Spark in Big Data environment, you can use Databricks. URL :  https://community.cloud.databricks.com This is the main tool which programmers are using in real time production environment We have both Community edition(Free version with limited support) & paid versions available Register for above tool online for free and practice Indentation is very important in Python. We don't use braces in Python like we do in Java, and the scope of the block/loop/definition is interpreted based on the indentation of code. Correct Indentation : def greet():     print("Hello!")  # Indented correctly     print("Welcome ...

Spark Core : Understanding RDD & Partitions in Spark

Let us see how to create an RDD in Spark.   RDD (Resilient Distributed Dataset): We can create RDD in 2 ways. From Collections For small amount of data We can't use it for large amount of data From Datasets  For huge amount of data Text, CSV, JSON, PDF, image etc. When data is large we should go with Dataset approach     How to create an RDD ? Using collections val list = List(1, 2, 3, 4, 5, 6) val rdd = sc.parallelize(list) SC is Spark Context parallelize() method will convert input(collection in this case) into RDD Type of RDD will be based on the values assigned to collection, if we assign integers and RDD will be of type int Let's see below Scala code : # Created an RDD by providing a Collection(List) as input scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23 # Printing RDD using collect() method scala> rdd.collect() res0: Array[Int] = Array(1, 2, 3, 4...