This is one of the important concept where we will see how an end-to-end pipeline will work in AWS.
We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.
Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL).
Understanding above flow chart :
- Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3
- As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code
- Once this trigger is initiated, another AWS service called GLUE(ETL Tool) will start a Pyspark job to receive this file from Lambda, perform some transformation and then load it in AWS RDS(MySQL)
- To make it simple, when a file got uploaded in S3, Lambda will initiate a trigger to tell Glue that you have to initiate your Pyspark job to receive this file from it, transform and load it in S3/RDS
- Finally, sales team will use Power BI to visualize this final data (but this is not in our scope today). This is just for our understanding.
Let's start this activity in AWS :
AWS Lambda :
AWS Lambda is a serverless compute service that runs code in response to events. It automatically manages the compute resources, so you don't need to provision or manage servers.
First we need to go to Lambda page in AWS, click Creation function button on top right hand side, give a name to this function, under Runtime select Python(preferable any version from python 3.9) as language, under Change default execution role click on Use an existing role and select the appropriate role(this should have full permission to S3/Glue), finally click Create function
Now we will be in this page ass shown in screenshot :
Note : Lambda have unlimited number of limitations, one of it is, it will timeout in 3 seconds by default. This means, within 3 sec we have to run, we can change this setting using Configuration tab as shown in below screen shot. Also Boto3 code storge is max 75mb. Seems like we will use only for alerting purpose in production, also we can fix this by using step functions with Lambda otherwise we can use Glue.
Also note that there is an option called +Add Trigger, click it and it will take you to next page. Under Trigger Configuration option select S3 (this means you want to monitor S3 and initiate a trigger). Now select a bucket(it will list available buckets in S3), now under Event Types select event(which event to select comes by experience - we need to monitor a file upload so we have selected event types as shown in the below screen shot), after that under Prefix option mention the S3 bucket folder name which we want to monitor(I have mentioned a folder called landingzone), and under Suffix option I have mentioned .csv (it means if .CSV file is uploaded into this folder then Lambda show initiate a trigger). Select I acknowledged option and click Add button at the end. Now the trigger is successfully added.
Now we have to write code under Code tab to do an action once that trigger is initiated. This action/block of code is nothing but what Lambda should inform Glue as soon as a .CSV file is uploaded in S3.
Below is the code in this scenario :
import json
import boto3
glue_client = boto3.client('glue')
def lambda_handler(event, context):
for record in event['Records']:
bucket_name = record['s3']['bucket']['name']
object_key = record['s3']['object']['key']
print(f"New file added: s3://{bucket_name}/{object_key}")
# Start AWS Glue job with arguments
response = glue_client.start_job_run(
JobName='firstjob', # Replace with your Glue job name
Arguments={
'--s3p': bucket_name,
'--fn': object_key
}
)
Explanation of code :
- We need to import json, botot3 as we have to use boto3 SDK to access S3
- Create a Glue client using boto3
- Under definition lambda_handler() we need to write code to perform what we expected to do
- Note that we need to pass the file to Glue as soon as that file uploaded in S3 which we are monitoring using Trigger
- We need S3 bucket name and file name, let's collect them using below 2 lines of code.
- bucket_name = record[''s3]['bucket']['name']
- object_key = record['s3']['object']['key']
- Now print it using Python format string option
- print(f"New file added: s3://{bucket}/{object_key}")
- This is a kind of log mechanism, it will print in the Glue logs
- Now we have to start Glue job and pass these bucket & file names as command line arguments
AWS Glue : ETL service (Extract - Load - transform)
AWS Glue is a serverless data integration service that makes it easy for analytics users to discover, prepare, move, and integrate data from multiple sources. You can use it for analytics, machine learning, and application development. It also includes additional productivity and data ops tooling for authoring, running jobs, and implementing business workflows.
With AWS Glue, you can discover and connect to more than 70 diverse data sources and manage your data in a centralized data catalog. You can visually create, run, and monitor extract, transform, and load (ETL) pipelines to load data into your data lakes. Also, you can immediately search and query cataloged data using Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum.
AWS Glue consolidates major data integration capabilities into a single service. These include data discovery, modern ETL, cleansing, transforming, and centralized cataloging. It's also serverless, which means there's no infrastructure to manage. With flexible support for all workloads like ETL, ELT, and streaming in one service, AWS Glue supports users across various workloads and types of users.
Also, AWS Glue makes it easy to integrate data across your architecture. It integrates with AWS analytics services and Amazon S3 data lakes. AWS Glue has integration interfaces and job-authoring tools that are easy to use for all users, from developers to business users, with tailored solutions for varied technical skill sets.
Step-By-Step process to create a Glue Job :
Go to AWS Glue home page in your AWS account, left side to screen, under Data Integration and ETL click on ETL jobs
Click on Script editor option on the right side of screen, select Spark and click on create script option.
It will print some basic code with some import, we can use it accordingly. Now give a name to Job on the top of the page. Once done, it will show your JOB in the home screen as below.
Below is the code in this scenario :
# Below all are import statements
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import re
# Operating system's python module
# More information at https://www.geeksforgeeks.org/os-module-python-examples/
import os
# We are receiving command line arguments from Lambda job and assigning them to some names
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['s3p','fn'])
bucket=args['s3p']
ob_name=args['fn']
# Location of input file from S3 bucket's folder
input=f"s3://{bucket}/{ob_name}"
print(input)
# Remember our file is something like 1000records.csv
# Using OS module of Python, we are splitting that file name into 2 parts ("1000records" , "csv")
# We will use this to create the table name in RDS at destination, hence collected it like this
tab = os.path.splitext(os.path.basename(input))[0]
# Using Python regular expressions, we are removing digits
tab = re.sub(r'[^a-zA-Z0-9]', '', tab)
# Check if the filename starts with a digit
# then prefix 'a_' to file name, this would be our final table name in RDS
if tab[0].isdigit():
tab = f"a_{tab}"
print("tabname",tab)
# importing all from pyspark.sql.functions
from pyspark.sql.functions import *
# pyspark code to create spark context context, glue context and creating a session
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
#data="s3://arun2025/input/asl/asl.csv"
#output ="s3://arun2025/output/asl/aslres"
# Creating a data frame by reading the files data from 'input'
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(input)
# Formatting columns names in the received file
df=df.toDF(*[re.sub('[^A-Za-z0-9]','',c) for c in df.columns])
# Applying transformation by adding a column to add current date in each record
df=df.withColumn("today", current_date())
df.show()
# URL of destination RDS (mysql in this case)
host="jdbc:mysql://diwakarmysql.cf08m8g4ysxd.ap-south-1.rds.amazonaws.com:3306/mysqldb"
# Creating a table in MySQL using the table name formatted in "tab"
# Connecting to MySQL(AWS RDS) using credentials
# Appending all records
df.write.mode("append").format("jdbc").option("url",host).option("user","admin").option("password","Mypassword.1").option("dbtable",tab).save()
##df.write.mode("append").format("csv").option("header","true").save(output)
Also, in your Glue Job page, click on Job Details tab and fill details as shown in below screen shot. Make sure you have a IAM role with full access to S3, Lambda, RDS, cloudbwatch and assign it here.
Finally, once we uploaded a .csv file in AWS S3 bucket in the folder name mentioned in the Boto3 code, Lambda will trigger Boto3 code, it will get the bucket and file information and pass it to Glue and ask Glue to initiate its spark code. Now, Glue will initiate it's spark code, to receive the file from S3, transform it and load in it MySQL(AWS RDS). See below screenshot for same, Job succeeded. You can cross verify the created table in MySQL but connecting to SQLWorkbench.
Arun Mathe
Gmail ID : arunkumar.mathe@gmail.com
Contact No : +91 9704117111
THANKS DEAR
ReplyDeleteYou are welcome !
Delete