Skip to main content

Spark : Internals of Spark Job

It is very important to understand the internals of a Spark job, like what are the stages involved when we run a Spark Job etc. It will help understand the performance of the Job and we can decide on which particular steps are impacting performance and we can exclude such steps incase if they are not needed.

Our entire job will be divided into stages, and there are 2 types of dependencies. We need to understand how stages will come, what is the importance of those stages and what are the dependencies. All these are interconnected.

When we trigger some action, a Spark Job will be started.

Spark Job 

  • Stages
    • Tasks (every stage will have set of tasks which execute in parallel)
  • Dependencies
    • Narrow dependency
    • Shuffle dependency
    
Lets understand what are the dependencies. Ideally, below are 4 types of mappings for any actions in Spark.
  1. One to One (one element to another element - each element is independent)
  2. One to Many (This is also independent on each element)
  3. Many to One (This is dependent on one another)
  4. Many to Many (This is dependent on one another)
Important point is : 
  • One to One, One to Many comes under Narrow dependency.
  • Many to One, Many to Many comes under Shuffle dependency
For your understanding, filter(), map(),  union() operation is 1-1/1-many which comes under Narrow dependency BUT sortBy(), groupBy() comes under Many -1/Many-Many which comes under Shuffle dependency.

Let's say, we have below tasks going to execute one after another(N is Narrow, S is Shuffle) :

N,    N,    N,                    S,    N,    N,                    S,                    S

Whenever there is a task which involves a task which falls under Shuffle dependency, then a new Stage will start. From above list of tasks, there will be 4 different stages created on that Spark Job.


Please see below Spark Job :

// Created a RDD, r1
scala> val r1 = sc.parallelize(1 to 6, 2)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:23

// From above RDD r1, we created 2 other RDD's r11, r12
scala> val r11 = r1.filter(x => x % 2 == 0)
r11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at filter at <console>:23

scala> val r12 = r1.filter(x => x % 2 == 1)
r12: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at filter at <console>:23

// Again, created 2 more RDD's from each of r11, r12
scala> val r111 = r11.map(x => x + 1)
r111: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[26] at map at <console>:23

scala> val r121 = r12.map(x => x + 1)
r121: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at map at <console>:23

// Now using union() and combining 2 RDD's r111, r121
scala> val r2 = r111.union(r121)
r2: org.apache.spark.rdd.RDD[Int] = UnionRDD[28] at union at <console>:24

// Now we are using sort() which comes under Shuffle Dependency, hence new stage will start here
// Understand that in sort() operation elements are interdependent on each other, hence costly
scala> val r3 = r2.sortBy(x => x)
r3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at sortBy at <console>:23

// Using map() which is a Narrow dependency
scala> val r31 = r3.map(x => x + 1)
r31: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[34] at map at <console>:23

// Using filter() which is also a Narrow dependency
scala> val r311 = r31.filter(x => x % 2 == 1)
r311: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[35] at filter at <console>:23

// Here, groupBy() comes under Shuffle dependency as elements are interdependent
// Hence a new stage will be created
scala> val r4 = r311.groupBy(x => x % 2 == 0)
r4: org.apache.spark.rdd.RDD[(Boolean, Iterable[Int])] = ShuffledRDD[37] at groupBy at <console>:23

// Finally printing RDD r4
scala> r4.collect()
res4: Array[(Boolean, Iterable[Int])] = Array((false,CompactBuffer(3, 5, 7)))

Now, from above Spark Job, Spark should create 3 different stages. We can visualize it from our local system using URL : http://localhost:4040/jobs/ 

From same URL, we can see the Tasks as well as shown in below image.


We need to explore and understand more about Job, Stage, Dependencies, Tasks to under architecture. We can use local system incase if you have environment setup to execute Spark jobs, or else please use Databricks community addition by creating an account and explore.


toDebugString : Observed when we use toDebugString, it is displaying the information of all 3 stages.

scala> r4.toDebugString
res5: String =
(4) ShuffledRDD[37] at groupBy at <console>:23 []
 +-(4) MapPartitionsRDD[36] at groupBy at <console>:23 []
    |  MapPartitionsRDD[35] at filter at <console>:23 []
    |  MapPartitionsRDD[34] at map at <console>:23 []
    |  MapPartitionsRDD[33] at sortBy at <console>:23 []
    |  ShuffledRDD[32] at sortBy at <console>:23 []
    +-(4) MapPartitionsRDD[29] at sortBy at <console>:23 []
       |  UnionRDD[28] at union at <console>:24 []
       |  MapPartitionsRDD[26] at map at <console>:23 []
       |  MapPartitionsRDD[24] at filter at <console>:23 []
       |  ParallelCollectionRDD[23] at parallelize at <console>:23 []
       |  MapPartitionsRDD[27] at map at <console>:23 []
       |  MapPartitionsRDD[25] at filter at <console>:23 []
       |  Par...

scala> r4.dependencies
res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@4bc0f78e)

dependencies will give what is the dependency on that RDD.


That's all for this blog. See you again!

Thanks,
Arun Mathe
Email ID : arunkumar.mathe@gmail.com

Comments

Popular posts from this blog

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

(AI Blog#1) Deep Learning and Neural Networks

I was curious to learn Artificial Intelligence and thinking what is the best place to start learning, and then realized that Deep Learning and Neural Networks is the heart of AI. Hence started diving into AI from this point. Starting from today, I will write continuous blogs on AI, especially Gen AI & Agentic AI. Incase if you are interested on above topics then please watch out this space. What is Artificial Intelligence, Machine Learning & Deep Learning ? AI can be described as the effort to automate intellectual tasks normally performed by Humans. Is this really possible ? For example, when we see an image with our eyes, we will identify it within a fraction of milliseconds. Isn't it ? For a computer, is it possible to do the same within same time limit ? That's the power we are talking about. To be honest, things seems to be far advanced than we actually thing about AI.  BTW, starting from this blog, it is not just a technical journal, we talk about internals here. ...

Spark Core : Understanding RDD & Partitions in Spark

Let us see how to create an RDD in Spark.   RDD (Resilient Distributed Dataset): We can create RDD in 2 ways. From Collections For small amount of data We can't use it for large amount of data From Datasets  For huge amount of data Text, CSV, JSON, PDF, image etc. When data is large we should go with Dataset approach     How to create an RDD ? Using collections val list = List(1, 2, 3, 4, 5, 6) val rdd = sc.parallelize(list) SC is Spark Context parallelize() method will convert input(collection in this case) into RDD Type of RDD will be based on the values assigned to collection, if we assign integers and RDD will be of type int Let's see below Scala code : # Created an RDD by providing a Collection(List) as input scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23 # Printing RDD using collect() method scala> rdd.collect() res0: Array[Int] = Array(1, 2, 3, 4...