Skip to main content

Spark Core : Understanding RDD & Partitions in Spark

Let us see how to create an RDD in Spark.

 

RDD (Resilient Distributed Dataset): We can create RDD in 2 ways.

  1. From Collections
    • For small amount of data
    • We can't use it for large amount of data
  2. From Datasets 
    • For huge amount of data
    • Text, CSV, JSON, PDF, image etc.
    • When data is large we should go with Dataset approach    
How to create an RDD ?
  • Using collections
    • val list = List(1, 2, 3, 4, 5, 6)
    • val rdd = sc.parallelize(list)
    • SC is Spark Context
    • parallelize() method will convert input(collection in this case) into RDD
    • Type of RDD will be based on the values assigned to collection, if we assign integers and RDD will be of type int
Let's see below Scala code :

# Created an RDD by providing a Collection(List) as input
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

# Printing RDD using collect() method
scala> rdd.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5)                                         

# Creating one more RDD(rdd1) using existing RDD(rdd) : Transformation
# map(x => x + 1) will increment each element in the given list by 1
# map() is a kind of transformation
scala> val rdd1 =rdd.map(x => x + 1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23

# Printing first RDD(rdd), note that it won't change(Immutability)
scala> rdd.collect()
res1: Array[Int] = Array(1, 2, 3, 4, 5)

# Printing second RDD(rdd1)
scala> rdd1.collect()
res2: Array[Int] = Array(2, 3, 4, 5, 6)


Let's see more information about RDD from below code :

# Created rdd1 with a list of values 1, 2, 3, 4, 5
scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:23

# Printed rdd1 using collect() 
scala> rdd1.collect()
res5: Array[Int] = Array(1, 2, 3, 4, 5)                                         

# Created another RDD, rdd2 and applied some transformation on the top of rdd1
scala> val rdd2 = rdd1.map(x => x + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:23

# Printed rdd2, note values have changed in new RDD as per the map() operation
scala> rdd2.collect()
res6: Array[Int] = Array(2, 3, 4, 5, 6)


# Apllied 3 transformations on the top of rdd2 and collected that RDD
scala> rdd2.map(x => x + 1).map(x => x +1).map(x => x + 1).collect()
res7: Array[Int] = Array(5, 6, 7, 8, 9)


Remember that starting point is RDD and ending point is collect(), in between we can have 'n' number of transformations. 

Example :
rdd
.map((x : Int) => {x + 1})
.map((x : Int) => {x + 1})
.map((x : Int) => {x + 1})
.map((x : Int) => {x + 1})
.map((x : Int) => {x + 1})
.collect()


We can do any transformation, see we have concatenated a string to each element.

scala> val rdd3 = rdd2.map(x => x + " Hi " + x)
rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:23

scala> rdd3.collect()
res8: Array[String] = Array(2 Hi 2, 3 Hi 3, 4 Hi 4, 5 Hi 5, 6 Hi 6)


Scala provide below simplified feature to write code.

rdd.map((x : Int) => {x + 1}).collect()  ==> Spark follow Type Infer, so we can remove Int
rdd.map(x => {x + 1}).collect()  ==> {x + 1} is not a very big expression, removed { }
rdd.map(x => x + 1).collect()  ==> we can use _ and remove x
rdd.map(_ + 1).collect()  ==> "_" is place holder, rdd is having a value and that value will place in here 

This is called lambda expression in Scala.

Same logic in Python :
  • Can use lambda and perform this operation
  • Can write function (so that you can reuse this function)

>>> rdd1 = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd1.collect()
[1, 2, 3, 4, 5]                                                                 
>>> 
>>> rdd1.map(lambda x : x + 1).collect()
[2, 3, 4, 5, 6]                                                                 
>>> 

>>> rdd1.map(lambda _: _ + 1).collect()
[2, 3, 4, 5, 6]                                                                 
>>>

(OR)

def myfunc(x):
    return x +1
rdd1.map(myfunc([1, 2, 3, 4, 5])).collect()


Partitions :
Partitioning is an integral concept in Spark that controls how the data is physically distributed across various nodes in the cluster during data processing.
  • It depends on the number of processors you are using in your environment
  • Currently my local system is having 2 processors, hence 2
  • We can configure this partitions 

Use below methods to get the details of partitions.
scala> rdd3.getNumPartitions
res10: Int = 2

scala> rdd3.partitions.length
res11: Int = 2

Use glom().collect() to see the partition information. BTW, glom() is a transformation.
scala> rdd3.glom().collect()
res14: Array[Array[String]] = Array(Array(2 Hi 2, 3 Hi 3), Array(4 Hi 4, 5 Hi 5, 6 Hi 6))
  • Array(Array(2 Hi 2, 3 Hi 3) went to 1st partition while creating RDD
  • Array(4 Hi 4, 5 Hi 5, 6 Hi 6)) went to 2nd partition while creating RDD
  • We can also manage which data goes to which partition, let's see that later
How to configure number of partitions while creating RDD ?

# Created a RDD (Mentioned num of partitions as 1)
scala> val rdd5 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 1)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

# Printing RDD information
scala> rdd5.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)                                      

# Checking how many partitions created
scala> rdd5.getNumPartitions
res2: Int = 1

# Visualizing partition information 
scala> rdd5.glom().collect()
res3: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6))

You can try to increase that partition number to 2, 3, 4, and so  on and see how it work using rdd.glom().collect(). We can always customize it with a number we want.


Let's say, while you are creating RDD, spark mentioned 'm' no of partitions itself. But you felt like that number 'm' is less/more based on your data. In this scenario, we can either increase/decrease this number by creating a new RDD with a new number of partitions(as partitions are immutable, we can't change existing RDD, hence need to create new RDD with required number of partitions). 


coalesce()  Vs repartition() :

  • We can use above methods incase if we have to change the default partitions. In Apache Spark, coalesce() is a transformation function used to reduce the number of partitions in a DataFrame or RDD without performing a full shuffle. 
  • It is often used for performance optimization, especially in scenarios where you need to decrease the number of partitions for efficient execution.
  • Reduces Partitions – Unlike repartition(), which can increase or decrease partitions and involves a full shuffle, coalesce() only reduces the number of partitions without shuffling data across all nodes.
  • coalesce() will ONLY merge the partitions when we try to reduce no of partitions, it won't move the data from one partition to other, it will just merge the partition. Moving data in a distributed environment is very costly operation.
  • Efficient for Narrow Dependencies – Since it avoids full shuffling, coalesce() is more efficient when you are simply reducing the number of partitions.
  • Best for Reducing Partitions after a Filter Operation – When many partitions become empty after a filter, coalesce() helps combine non-empty partitions efficiently.
  • coalesce() will accept 2 params, first param is partition number, 2nd param is Boolean
    • By default this flag will be false, means don't shuffle(just merge data)
    • If we mention true, then instead of merge, data will be shuffled which will be a huge performance impact
    • DO NOT set that to True
  • repartition() will shuffle the data across the partitions. It is more time taken process, try to avoid using repartition(). Incase if you need to reduce the number of partition, then use coalesce() instead of repartition()
When to Use coalesce() ?
  • After filtering a DataFrame where many partitions are empty.
  • When writing to disk to avoid small file issues (e.g., df.coalesce(1).write.csv("output")).
  • When improving performance by reducing unnecessary partitions.

Basic Actions and Transformations :

  • We can use all basic actions like min(), max(), count() etc.
  • Instead of using all the above actions, just we stats() as shown below.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("Spark Basics").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
rdd1.collect()

// Basic actions
rdd1.min()
rdd1.max()
rdd1.count()

//stats will do all basic transformations
rdd1.stats()

Output :

import org.apache.spark.SparkContext import org.apache.spark.SparkConf conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@31a471b9 sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3ac95f39 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at command-4386469042563736:7 res1: org.apache.spark.util.StatCounter = (count: 10, mean: 5.500000, stdev: 2.872281, max: 10.000000, min: 1.000000)


Map() transformation using lambda functions :
  • map() operation comes under 1-1, means it will take one input and convert into another

// Databricks notebook source
// Spark context is an entry point for any operation in Spark
// We need to create it when we are using Databricks approach (but when you use terminal Spark Context will be automaiclly connected)

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("Spark Basics").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd1.collect()

// Appyling basic transformation
val rdd2 = rdd1.map( x => x + 1)
rdd2.collect()

// Another way of applying transformation using lambda expression
val rdd3 = rdd1.map( _ + 1)
rdd3.collect()

// Multiple transformations
val rdd4 = rdd1.map(x => x + 1).map(x => x + 1).map(x => x + 1).map(x => x + 1).map(x => x + 1)
rdd4.collect()

// Getting partition information
rdd4.getNumPartitions


map() Vs filter() :

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd1.collect()
// Output rdd1 = Array[Int] = Array(1, 2, 3, 4, 5, 6)

rdd1.map(x => x + 1).collect()
// Output = res5: Array[Int] = Array(2, 3, 4, 5, 6, 7)

rdd1.filter(x => x % 2 == 1).collect()
// Output = res6: Array[Int] = Array(1, 3, 5)



That's all for this blog. See you again!

Thanks,
Arun Mathe
Email ID : arunkumar.mathe@gmail.com
Contact ID : 9704117111

Comments

Popular posts from this blog

Python : Python for Spark

Python is a general purpose programming language, that is used for variety of tasks like web-development, Data analytics etc. Initially Python is developed as a functional programming language, later object oriented programming concepts are also added to Python. We will see what basics we need in Python to play with Spark. Incase if you want to practice Spark in Big Data environment, you can use Databricks. URL :  https://community.cloud.databricks.com This is the main tool which programmers are using in real time production environment We have both Community edition(Free version with limited support) & paid versions available Register for above tool online for free and practice Indentation is very important in Python. We don't use braces in Python like we do in Java, and the scope of the block/loop/definition is interpreted based on the indentation of code. Correct Indentation : def greet():     print("Hello!")  # Indented correctly     print("Welcome ...

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

AWS : Boto3 (Accessing AWS using Python)

Boto3 is the Amazon Web Services software development kit for Python, which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2. Boto3 is maintained and published by AWS. Please find latest documentation at : https://boto3.amazonaws.com/v1/documentation/api/latest/index.html Command to install it : pip install boto3 Local storage Vs Cloud storage: Local file system is block oriented, means storage is divided into block with size range 1-4kb Collections of multiple blocks is called a file in local storage Example : 10MB file will be occupying almost 2500 blocks(assuming 4kb each block) We know that we can install softwares in local system (indirectly in blocks) Local system blocks managed by Operating system But Cloud storage is a object oriented storage, means everything is object No size limit, it is used only to store data, we can't install software in cloud storage Cloud storage managed by users We need to install either Pyc...