Skip to main content

AWS : Spark, HIVE using EMR in AWS

What is the use case ? 

Let's see how to read data from either AWS S3/RDS, apply transformation and load it in HIVE using EMR service in Amazon Web Services.


Pre-requisites :

  • We need an AWS account
  • Create a EMR system with Hadoop, Hive & Spark installed in it
  • Create a S3 bucket and have some .csv files with same data
  • Create a RDS system in AWS (preferable MySQL as it's free) 

Sample EMR service will be as below :


Now, click on Connect to the primary node using SSH option as shown in below screenshot.



Now copy the SSH command as shown below. 



Now from your local desktop/laptop, open command prompt and paste copied SSH command, press enter and then say yes and then enter. It will connect to EMR service as shown below.


Use below command to cross check installed software, it will display the location of those softwares.





To enter into HIVE shell, just type hive and press enter, it will take you to hive shell.



If you want to go to Hadoop prompt from HIVE, just do Ctrl+C. Now we can run HDFS commands.


Points to remember :

  • HIVE is a OLAP system (not OLTP)
  • This is not a database, it is layer on the top of Hadoop to process data
  • 2 main modules in Hadoop is Hadoop Distributed File System(HDFS) & Map Reduce (programming module).
  • HDFS will store data
  • Map Reduce(MR) will process the stored data from HDFS
  • Basically MR module is a programming module where we have to write Java code to process data
  • Hence, as it might be complicated for non coders, HIVE came into existence to process data using SQL type of queries instead of Java
  • HIVE language is nothing but HQL(Hive query language)
  • HIVE doesn't have any storage, in general, it will store schema information in a relational database and data into HDFS


How to run a HIVE query inside AWS EMR ?

Once you connected to command prompt, connected to EMR using below SSH command.

C:\Users\arunk>ssh -i ~/sparkkeypair.pem hadoop@ec2-35-154-30-239.ap-south-1.compute.amazonaws.com

Last login: Fri Jan 24 12:45:55 2025


Type hive and press enter to enter into HIVE prompt and execute below HIVE query to create a table.

Points to note :
  • Make sure to create below mentioned S3 bucket in AWS 
  • Upload some csv file to read data from it
  • In this example, we have name, age, city columns in the csv
  • Make sure that your EMR service have S3 full permission assigned under IAM role(or else it won't connect to S3)

HIVE query :
CREATE EXTERNAL TABLE asltab (
name STRING,
age INT,
city STRING
)
LOCATION 's3://arunkumar2025/input1/' tblproperties ("skip.header.line.count"="1");

Terminal output :



Set below property to se header as well(it is dependent on current session)

set hive.cli.print.header=true;




If you want to set this property permanently, then we have to set it in hive-site.xml under below location.

/usr/lib/hive/conf : hive-site.xml


Property :

<property>
<name>hive.cli.print.header</name>
<value>true</value>
<description>whether to print names of the columns in query output</description>
</property>


hive> describe formatted asltab;
OK
# col_name              data_type               comment
name                    string
age                     int
city                    string

# Detailed Table Information
Database:               default
OwnerType:              USER
Owner:                  hadoop
CreateTime:             Fri Jan 24 12:52:48 UTC 2025
LastAccessTime:         UNKNOWN
Retention:              0
Location:               s3://arunkumar2025/input1
Table Type:             EXTERNAL_TABLE
Table Parameters:
        EXTERNAL                TRUE
        bucketing_version       2
        numFiles                2
        skip.header.line.count  1
        totalSize               10631
        transient_lastDdlTime   1737723168

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        serialization.format    1
Time taken: 0.121 seconds, Fetched: 32 row(s)
hive>


Please note all the below properties mentioned in hive-site.xml :

[hadoop@ip-172-31-16-51 conf]$ sudo cat hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Licensed to the Apache Software Foundation (ASF) under one or more       -->
<!-- contributor license agreements.  See the NOTICE file distributed with    -->
<!-- this work for additional information regarding copyright ownership.      -->
<!-- The ASF licenses this file to You under the Apache License, Version 2.0  -->
<!-- (the "License"); you may not use this file except in compliance with     -->
<!-- the License.  You may obtain a copy of the License at                    -->
<!--                                                                          -->
<!--     http://www.apache.org/licenses/LICENSE-2.0                           -->
<!--                                                                          -->
<!-- Unless required by applicable law or agreed to in writing, software      -->
<!-- distributed under the License is distributed on an "AS IS" BASIS,        -->
<!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
<!-- See the License for the specific language governing permissions and      -->
<!-- limitations under the License.                                           -->

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->

<property>
  <name>hbase.master</name>
  <value></value>
  <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>

<property>
  <name>hive.zookeeper.quorum</name>
  <value>ip-172-31-16-51.ap-south-1.compute.internal:2181</value>
</property>

<property>
  <name>hive.llap.zk.sm.connectionString</name>
  <value>ip-172-31-16-51.ap-south-1.compute.internal:2181</value>
</property>

<property>
  <name>hbase.zookeeper.quorum</name>
  <value>ip-172-31-16-51.ap-south-1.compute.internal</value>
  <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
</property>

<property>
  <name>hive.execution.engine</name>
  <value>tez</value>
</property>

  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ip-172-31-16-51.ap-south-1.compute.internal:8020</value>
  </property>


  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://ip-172-31-16-51.ap-south-1.compute.internal:9083</value>
    <description>JDBC connect string for a JDBC metastore</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://ip-172-31-16-51.ap-south-1.compute.internal:3306/hive?createDatabaseIfNotExist=true</value>
    <description>username to use against metastore database</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mariadb.jdbc.Driver</value>
    <description>username to use against metastore database</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    <description>username to use against metastore database</description>
  </property>

  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>AExeP1djqFBrA4Td</value>
    <description>password to use against metastore database</description>
  </property>

<property>
   <name>hive.server2.allow.user.substitution</name>
   <value>true</value>
</property>

<property>
   <name>hive.server2.enable.doAs</name>
   <value>true</value>
</property>

<property>
   <name>hive.server2.thrift.port</name>
   <value>10000</value>
</property>

<property>
   <name>hive.server2.thrift.http.port</name>
   <value>10001</value>
</property>



<property>
  <name>hive.optimize.ppd.input.formats</name>
  <value>com.amazonaws.emr.s3select.hive.S3SelectableTextInputFormat</value>
</property>

<property>
  <name>s3select.filter</name>
  <value>false</value>
</property>

<property>
    <name>hive.server2.in.place.progress</name>
    <value>false</value>
</property>

<property>
    <name>hive.llap.zk.registry.user</name>
    <value>hadoop</value>
</property>

<property>
    <name>hive.security.metastore.authorization.manager</name>
    <value>org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider</value>
</property>

<property>
    <name>hive.log.explain.output</name>
    <value>false</value>
</property>

  <property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
  </property>

  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>

  <property>
    <name>mapred.max.split.size</name>
    <value>256000000</value>
  </property>

  <property>
    <name>hive.mapjoin.hybridgrace.hashtable</name>
    <value>false</value>
  </property>

  <property>
    <name>hive.merge.nway.joins</name>
    <value>false</value>
  </property>

  <property>
    <name>hive.metastore.connect.retries</name>
    <value>15</value>
  </property>

  <property>
    <name>hive.optimize.joinreducededuplication</name>
    <value>false</value>
  </property>

  <property>
    <name>hive.optimize.sort.dynamic.partition.threshold</name>
    <value>1</value>
  </property>

  <property>
    <name>hive.server2.materializedviews.registry.impl</name>
    <value>DUMMY</value>
  </property>

  <property>
    <name>hive.tez.auto.reducer.parallelism</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.vectorized.execution.mapjoin.minmax.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.optimize.dynamic.partition.hashjoin</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.blobstore.use.output-committer</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.llap.daemon.service.hosts</name>
    <value>@llap0</value>
  </property>

  <property>
    <name>hive.llap.execution.mode</name>
    <value>only</value>
  </property>

  <property>
    <name>hive.optimize.metadataonly</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.tez.bucket.pruning</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.exec.mode.local.auto</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.exec.mode.local.auto.inputbytes.max</name>
    <value>50000000</value>
  </property>

  <property>
    <name>hive.query.reexecution.stats.persist.scope</name>
    <value>hiveserver</value>
  </property>

  <property>
    <name>hive.auto.convert.join.noconditionaltask.size</name>
    <value>1073741824</value>
  </property>

  <property>
    <name>hive.stats.fetch.column.stats</name>
    <value>true</value>
  </property>

  <property>
    <name>hive.compactor.worker.threads</name>
    <value>1</value>
  </property>

</configuration>
[hadoop@ip-172-31-16-51 conf]$



How to do it in production ?

We need to create a python script having spark code to execute it in production. 



Sample code just to read some data from AWS S3 bucket.

[hadoop@ip-172-31-16-51 ~]$ cat pyspark-demo.py

#importing required packages
from pyspark.sql import *
from pyspark.sql.functions import *

# Building a Spark sesssion
spark=SparkSession.builder.appName("test").master("local[*]").getOrCreate()

# Below is the input file with full path from S3 bucket in AWS S3 service
# Putting a "r" before makes python not to interprit '/' as escape character
# It's a simple concept in Python string formatting
input_location = r"s3://arunkumar2025/input1/asl.csv"

# reading data from above file
df = spark.read.format("csv").option("header","true").option("inferschema","true").load(input_location)

# show only 5 lines
df.show(5)

[hadoop@ip-172-31-16-51 ~]$


How to run above spark code ?

[hadoop@ip-172-31-16-51 ~]$ spark-submit --master local --deploy-mode client pyspark-demo.py


Terminal output :

[hadoop@ip-172-31-16-51 ~]$ spark-submit --master local --deploy-mode client pyspark-demo.py
25/01/24 13:45:22 INFO SparkContext: Running Spark version 3.4.0-amzn-0
25/01/24 13:45:22 INFO ResourceUtils: ==============================================================
25/01/24 13:45:22 INFO ResourceUtils: No custom resources configured for spark.driver.
25/01/24 13:45:22 INFO ResourceUtils: ==============================================================
25/01/24 13:45:22 INFO SparkContext: Submitted application: test
25/01/24 13:45:22 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 2, script: , vendor: , memory -> name: memory, amount: 4743, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/01/24 13:45:22 INFO ResourceProfile: Limiting resource is cpus at 2 tasks per executor
25/01/24 13:45:22 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/01/24 13:45:22 INFO SecurityManager: Changing view acls to: hadoop
25/01/24 13:45:22 INFO SecurityManager: Changing modify acls to: hadoop
25/01/24 13:45:22 INFO SecurityManager: Changing view acls groups to:
25/01/24 13:45:22 INFO SecurityManager: Changing modify acls groups to:
25/01/24 13:45:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: hadoop; groups with view permissions: EMPTY; users with modify permissions: hadoop; groups with modify permissions: EMPTY
25/01/24 13:45:23 INFO Utils: Successfully started service 'sparkDriver' on port 46205.
25/01/24 13:45:23 INFO SparkEnv: Registering MapOutputTracker
25/01/24 13:45:23 INFO SparkEnv: Registering BlockManagerMaster
25/01/24 13:45:23 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
25/01/24 13:45:23 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
25/01/24 13:45:23 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/01/24 13:45:23 INFO DiskBlockManager: Created local directory at /mnt/tmp/blockmgr-5c738a25-4ddb-44a6-a026-3782c50c554a
25/01/24 13:45:23 INFO MemoryStore: MemoryStore started with capacity 912.3 MiB
25/01/24 13:45:23 INFO SparkEnv: Registering OutputCommitCoordinator
25/01/24 13:45:23 INFO SubResultCacheManager: Sub-result caches are disabled.
25/01/24 13:45:23 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
25/01/24 13:45:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.
25/01/24 13:45:23 INFO Executor: Starting executor ID driver on host ip-172-31-16-51.ap-south-1.compute.internal
25/01/24 13:45:23 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): 'file:/usr/lib/hadoop-lzo/lib/*,file:/usr/lib/hadoop/hadoop-aws.jar,file:/usr/share/aws/aws-java-sdk/*,file:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar,file:/usr/share/aws/emr/security/conf,file:/usr/share/aws/emr/security/lib/*,file:/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,file:/usr/share/aws/redshift/spark-redshift/lib/*,file:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar,file:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar,file:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar,file:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar,file:/docker/usr/lib/hadoop-lzo/lib/*,file:/docker/usr/lib/hadoop/hadoop-aws.jar,file:/docker/usr/share/aws/aws-java-sdk/*,file:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar,file:/docker/usr/share/aws/emr/security/conf,file:/docker/usr/share/aws/emr/security/lib/*,file:/docker/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,file:/docker/usr/share/aws/redshift/spark-redshift/lib/*,file:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar,file:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar,file:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar,file:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar,file:/home/hadoop/conf,file:/home/hadoop/emr-spark-goodies.jar,file:/home/hadoop/*,file:/home/hadoop/aws-glue-datacatalog-spark-client.jar,file:/home/hadoop/hive-openx-serde.jar,file:/home/hadoop/sagemaker-spark-sdk.jar,file:/home/hadoop/hadoop-aws.jar,file:/home/hadoop/RedshiftJDBC.jar,file:/home/hadoop/emr-s3-select-spark-connector.jar'
25/01/24 13:45:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43751.
25/01/24 13:45:23 INFO NettyBlockTransferService: Server created on ip-172-31-16-51.ap-south-1.compute.internal:43751
25/01/24 13:45:23 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
25/01/24 13:45:23 INFO BlockManager: external shuffle service port = 7337
25/01/24 13:45:23 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-172-31-16-51.ap-south-1.compute.internal, 43751, None)
25/01/24 13:45:23 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-16-51.ap-south-1.compute.internal:43751 with 912.3 MiB RAM, BlockManagerId(driver, ip-172-31-16-51.ap-south-1.compute.internal, 43751, None)
25/01/24 13:45:23 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-172-31-16-51.ap-south-1.compute.internal, 43751, None)
25/01/24 13:45:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-172-31-16-51.ap-south-1.compute.internal, 43751, None)
25/01/24 13:45:24 INFO SingleEventLogFileWriter: Logging events to hdfs:/var/log/spark/apps/local-1737726323654.inprogress
25/01/24 13:45:25 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
25/01/24 13:45:25 INFO SharedState: Warehouse path is 'hdfs://ip-172-31-16-51.ap-south-1.compute.internal:8020/user/spark/warehouse'.
25/01/24 13:45:26 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
25/01/24 13:45:27 INFO InMemoryFileIndex: It took 38 ms to list leaf files for 1 paths.
25/01/24 13:45:27 INFO InMemoryFileIndex: It took 2 ms to list leaf files for 1 paths.
25/01/24 13:45:31 INFO FileSourceStrategy: Pushed Filters:
25/01/24 13:45:31 INFO FileSourceStrategy: Post-Scan Filters: (length(trim(value#0, None)) > 0)
25/01/24 13:45:32 INFO CodeGenerator: Code generated in 551.615874 ms
25/01/24 13:45:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 426.0 KiB, free 911.9 MiB)
25/01/24 13:45:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 42.8 KiB, free 911.8 MiB)
25/01/24 13:45:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-31-16-51.ap-south-1.compute.internal:43751 (size: 42.8 KiB, free: 912.3 MiB)
25/01/24 13:45:32 INFO SparkContext: Created broadcast 0 from load at NativeMethodAccessorImpl.java:0
25/01/24 13:45:32 INFO GPLNativeCodeLoader: Loaded native gpl library
25/01/24 13:45:32 INFO LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 049362b7cf53ff5f739d6b1532457f2c6cd495e8]
25/01/24 13:45:32 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes, number of split files: 1, prefetch: false
25/01/24 13:45:32 INFO FileSourceScanExec: relation: None, fileSplitsInPartitionHistogram: Vector((1 fileSplits,1))
25/01/24 13:45:33 INFO SparkContext: Starting job: load at NativeMethodAccessorImpl.java:0
25/01/24 13:45:33 INFO DAGScheduler: Got job 0 (load at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/24 13:45:33 INFO DAGScheduler: Final stage: ResultStage 0 (load at NativeMethodAccessorImpl.java:0)
25/01/24 13:45:33 INFO DAGScheduler: Parents of final stage: List()
25/01/24 13:45:33 INFO DAGScheduler: Missing parents: List()
25/01/24 13:45:33 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at load at NativeMethodAccessorImpl.java:0), which has no missing parents
25/01/24 13:45:33 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 15.2 KiB, free 911.8 MiB)
25/01/24 13:45:33 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.7 KiB, free 911.8 MiB)
25/01/24 13:45:33 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-172-31-16-51.ap-south-1.compute.internal:43751 (size: 7.7 KiB, free: 912.3 MiB)
25/01/24 13:45:33 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1592
25/01/24 13:45:33 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at load at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
25/01/24 13:45:33 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
25/01/24 13:45:33 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (ip-172-31-16-51.ap-south-1.compute.internal, executor driver, partition 0, ANY, 8058 bytes)
25/01/24 13:45:33 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
25/01/24 13:45:33 INFO FileScanRDD: TID: 0 - Reading current file: path: s3://arunkumar2025/input1/asl.csv, range: 0-827, partition values: [empty row], isDataPresent: false, eTag: 2414c4d8a4ccb1d651f01d3ad898f818
25/01/24 13:45:33 INFO CodeGenerator: Code generated in 23.458352 ms
25/01/24 13:45:33 INFO S3NativeFileSystem: Opening 's3://arunkumar2025/input1/asl.csv' for reading
25/01/24 13:45:33 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2035 bytes result sent to driver
25/01/24 13:45:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 616 ms on ip-172-31-16-51.ap-south-1.compute.internal (executor driver) (1/1)
25/01/24 13:45:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
25/01/24 13:45:33 INFO DAGScheduler: ResultStage 0 (load at NativeMethodAccessorImpl.java:0) finished in 0.856 s
25/01/24 13:45:34 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
25/01/24 13:45:34 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
25/01/24 13:45:34 INFO DAGScheduler: Job 0 finished: load at NativeMethodAccessorImpl.java:0, took 0.981137 s
25/01/24 13:45:34 INFO CodeGenerator: Code generated in 11.66323 ms
25/01/24 13:45:34 INFO FileSourceStrategy: Pushed Filters:
25/01/24 13:45:34 INFO FileSourceStrategy: Post-Scan Filters:
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 426.0 KiB, free 911.4 MiB)
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 42.8 KiB, free 911.4 MiB)
25/01/24 13:45:34 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ip-172-31-16-51.ap-south-1.compute.internal:43751 (size: 42.8 KiB, free: 912.2 MiB)
25/01/24 13:45:34 INFO SparkContext: Created broadcast 2 from load at NativeMethodAccessorImpl.java:0
25/01/24 13:45:34 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes, number of split files: 1, prefetch: false
25/01/24 13:45:34 INFO FileSourceScanExec: relation: None, fileSplitsInPartitionHistogram: Vector((1 fileSplits,1))
25/01/24 13:45:34 INFO SparkContext: Starting job: load at NativeMethodAccessorImpl.java:0
25/01/24 13:45:34 INFO DAGScheduler: Got job 1 (load at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/24 13:45:34 INFO DAGScheduler: Final stage: ResultStage 1 (load at NativeMethodAccessorImpl.java:0)
25/01/24 13:45:34 INFO DAGScheduler: Parents of final stage: List()
25/01/24 13:45:34 INFO DAGScheduler: Missing parents: List()
25/01/24 13:45:34 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[9] at load at NativeMethodAccessorImpl.java:0), which has no missing parents
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 31.5 KiB, free 911.3 MiB)
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 14.0 KiB, free 911.3 MiB)
25/01/24 13:45:34 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-31-16-51.ap-south-1.compute.internal:43751 (size: 14.0 KiB, free: 912.2 MiB)
25/01/24 13:45:34 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1592
25/01/24 13:45:34 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[9] at load at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
25/01/24 13:45:34 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
25/01/24 13:45:34 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (ip-172-31-16-51.ap-south-1.compute.internal, executor driver, partition 0, ANY, 8058 bytes)
25/01/24 13:45:34 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
25/01/24 13:45:34 INFO FileScanRDD: TID: 1 - Reading current file: path: s3://arunkumar2025/input1/asl.csv, range: 0-827, partition values: [empty row], isDataPresent: false, eTag: 2414c4d8a4ccb1d651f01d3ad898f818
25/01/24 13:45:34 INFO S3NativeFileSystem: Opening 's3://arunkumar2025/input1/asl.csv' for reading
25/01/24 13:45:34 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1747 bytes result sent to driver
25/01/24 13:45:34 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 137 ms on ip-172-31-16-51.ap-south-1.compute.internal (executor driver) (1/1)
25/01/24 13:45:34 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
25/01/24 13:45:34 INFO DAGScheduler: ResultStage 1 (load at NativeMethodAccessorImpl.java:0) finished in 0.181 s
25/01/24 13:45:34 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
25/01/24 13:45:34 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
25/01/24 13:45:34 INFO DAGScheduler: Job 1 finished: load at NativeMethodAccessorImpl.java:0, took 0.189476 s
25/01/24 13:45:34 INFO FileSourceStrategy: Pushed Filters:
25/01/24 13:45:34 INFO FileSourceStrategy: Post-Scan Filters:
25/01/24 13:45:34 INFO CodeGenerator: Code generated in 40.744369 ms
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 425.9 KiB, free 910.9 MiB)
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 42.8 KiB, free 910.9 MiB)
25/01/24 13:45:34 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on ip-172-31-16-51.ap-south-1.compute.internal:43751 (size: 42.8 KiB, free: 912.2 MiB)
25/01/24 13:45:34 INFO SparkContext: Created broadcast 4 from showString at NativeMethodAccessorImpl.java:0
25/01/24 13:45:34 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes, number of split files: 1, prefetch: false
25/01/24 13:45:34 INFO FileSourceScanExec: relation: None, fileSplitsInPartitionHistogram: Vector((1 fileSplits,1))
25/01/24 13:45:34 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
25/01/24 13:45:34 INFO DAGScheduler: Got job 2 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/01/24 13:45:34 INFO DAGScheduler: Final stage: ResultStage 2 (showString at NativeMethodAccessorImpl.java:0)
25/01/24 13:45:34 INFO DAGScheduler: Parents of final stage: List()
25/01/24 13:45:34 INFO DAGScheduler: Missing parents: List()
25/01/24 13:45:34 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 17.0 KiB, free 910.8 MiB)
25/01/24 13:45:34 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 8.7 KiB, free 910.8 MiB)
25/01/24 13:45:34 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on ip-172-31-16-51.ap-south-1.compute.internal:43751 (size: 8.7 KiB, free: 912.1 MiB)
25/01/24 13:45:34 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1592
25/01/24 13:45:34 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
25/01/24 13:45:34 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
25/01/24 13:45:34 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2) (ip-172-31-16-51.ap-south-1.compute.internal, executor driver, partition 0, ANY, 8058 bytes)
25/01/24 13:45:34 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
25/01/24 13:45:34 INFO FileScanRDD: TID: 2 - Reading current file: path: s3://arunkumar2025/input1/asl.csv, range: 0-827, partition values: [empty row], isDataPresent: false, eTag: 2414c4d8a4ccb1d651f01d3ad898f818
25/01/24 13:45:34 INFO CodeGenerator: Code generated in 17.255603 ms
25/01/24 13:45:34 INFO S3NativeFileSystem: Opening 's3://arunkumar2025/input1/asl.csv' for reading
25/01/24 13:45:34 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 2110 bytes result sent to driver
25/01/24 13:45:34 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 158 ms on ip-172-31-16-51.ap-south-1.compute.internal (executor driver) (1/1)
25/01/24 13:45:34 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
25/01/24 13:45:34 INFO DAGScheduler: ResultStage 2 (showString at NativeMethodAccessorImpl.java:0) finished in 0.188 s
25/01/24 13:45:34 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/01/24 13:45:34 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
25/01/24 13:45:34 INFO DAGScheduler: Job 2 finished: showString at NativeMethodAccessorImpl.java:0, took 0.201108 s
25/01/24 13:45:34 INFO CodeGenerator: Code generated in 22.026247 ms
+------+---+----+
|  name|age|city|
+------+---+----+
|  venu| 32| hyd|
|   anu| 49| mas|
|   jyo| 12| blr|
|  koti| 29| blr|
|mastan| 30| blr|
+------+---+----+
only showing top 5 rows

25/01/24 13:45:35 INFO SparkContext: Invoking stop() from shutdown hook
25/01/24 13:45:35 INFO SparkContext: SparkContext is stopping with exitCode 0.
25/01/24 13:45:35 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-16-51.ap-south-1.compute.internal:4040
25/01/24 13:45:35 INFO BlockManagerInfo: Removed broadcast_2_piece0 on ip-172-31-16-51.ap-south-1.compute.internal:43751 in memory (size: 42.8 KiB, free: 912.2 MiB)
25/01/24 13:45:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
25/01/24 13:45:35 INFO MemoryStore: MemoryStore cleared
25/01/24 13:45:35 INFO BlockManager: BlockManager stopped
25/01/24 13:45:35 INFO BlockManagerMaster: BlockManagerMaster stopped
25/01/24 13:45:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
25/01/24 13:45:35 INFO SparkContext: Successfully stopped SparkContext
25/01/24 13:45:35 INFO ShutdownHookManager: Shutdown hook called
25/01/24 13:45:35 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-490e265c-7e93-4ec2-ac06-d24953cdfd9c/pyspark-639a6e7e-ad30-4acf-bec4-fc4b7d810086
25/01/24 13:45:35 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-490e265c-7e93-4ec2-ac06-d24953cdfd9c
25/01/24 13:45:35 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-66d17b5a-49fc-404b-9848-a21889b3e274
[hadoop@ip-172-31-16-51 ~]$


Note : Above spark code will only read data from S3 bucket and print 5 records.



Spark code to create a table in HIVE and load processed data :

[hadoop@ip-172-31-16-51 ~]$ cat pyspark-demo.py
from pyspark.sql import *
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("test").master("local[*]").enableHiveSupport().getOrCreate()
input_location = r"s3://arunkumar2025/input1/asl.csv"
df = spark.read.format("csv").option("header","true").option("inferschema","true").load(input_location)
#df.show(5)
ndf = df.withColumn("today", current_date()).where(col("age")>=30)
ndf.write.format("parquet").saveAsTable("asl30plus")
ndf.show()




Spark code to read data from a relational database(RDS service in AWS), apply transformation and create a new table in HIVE and load this transformed data into it :

[hadoop@ip-172-31-16-51 ~]$ cat pyspark-demo.py
from pyspark.sql import *
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("test").master("local[*]").enableHiveSupport().getOrCreate()

host="jdbc:mysql://newpoc.ctucw2ay6r46.ap-south-1.rds.amazonaws.com:3306/arundb"

df = spark.read.format("jdbc").option("url",host).option("user","admin").option("password","Mypassword.01").option("dbtable","emp").load()

ndf = df.withColumn("today", current_date()).where(col("salary")>=60000)

ndf.write.format("orc").saveAsTable("emptable1")

ndf.show()

From HIVE shell :
    > select * from emptable1 limit 5;
OK
1       John Doe        1       Manager 75000.00        2025-01-24
2       Jane Smith      2       Accountant      65000.00        2025-01-24
3       Bob Johnson     3       Engineer        85000.00        2025-01-24
5       Tom Clark       5       Marketing Specialist    60000.00        2025-01-24
7       Steve Adams     7       IT Administrator        70000.00        2025-01-24
Time taken: 1.125 seconds, Fetched: 5 row(s)
hive>



Arun Mathe

Gmail ID : arunkumar.mathe@gmail.com

Contact No : +91 9704117111









Comments

Popular posts from this blog

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

Python : Python for Spark

Python is a general purpose programming language, that is used for variety of tasks like web-development, Data analytics etc. Initially Python is developed as a functional programming language, later object oriented programming concepts are also added to Python. We will see what basics we need in Python to play with Spark. Incase if you want to practice Spark in Big Data environment, you can use Databricks. URL :  https://community.cloud.databricks.com This is the main tool which programmers are using in real time production environment We have both Community edition(Free version with limited support) & paid versions available Register for above tool online for free and practice Indentation is very important in Python. We don't use braces in Python like we do in Java, and the scope of the block/loop/definition is interpreted based on the indentation of code. Correct Indentation : def greet():     print("Hello!")  # Indented correctly     print("Welcome ...

AWS : Boto3 (Accessing AWS using Python)

Boto3 is the Amazon Web Services software development kit for Python, which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2. Boto3 is maintained and published by AWS. Please find latest documentation at : https://boto3.amazonaws.com/v1/documentation/api/latest/index.html Command to install it : pip install boto3 Local storage Vs Cloud storage: Local file system is block oriented, means storage is divided into block with size range 1-4kb Collections of multiple blocks is called a file in local storage Example : 10MB file will be occupying almost 2500 blocks(assuming 4kb each block) We know that we can install softwares in local system (indirectly in blocks) Local system blocks managed by Operating system But Cloud storage is a object oriented storage, means everything is object No size limit, it is used only to store data, we can't install software in cloud storage Cloud storage managed by users We need to install either Pyc...