Skip to main content

Spark Core : RDD operations

We have different types of transformations and actions available that we can perform on the top of RDD's. This blog will explain most of the important functions that we can use on RDD's. Please try to focus on the scala code to understand what that particular function(action/transformation) does.

aggregate() :

Aggregate the elements of each partition, and then the results of all the partitions, using given combine functions and a neutral "zero value" (initial value). This function can return a different result type, U, than the type of this RDD, T.

This is very important function which we regularly use in real time projects. If you have a clear understanding on this aggregate() then you will understand how a distributed parallel processing will work in distributed environments.

Syntax :

def aggregate[U](zeroValue: U) (SeqOp: (U, T) => U, combOp: (U, T) => U) U

  • zeroValue 
    • The initial value of the accumulated result of each partition for the seqOp
    • And also the initial value for the combine results from different partitions for the combOp
    • This will typically be the neutral element (Nil for list conca, 0 for Sum )
  • seqOp
    • An operator used to accumulate results within a partition
  • combOp
    • An associative operate used to combine results from different partitions

Lets take a problem :

  • Sum of all numbers in RDD
val initialValue : Int  = 0

def seqOp(a: Int, b: Int) : Int = {
    a + b
}
def combOp(x: Int, y: Int): Int = {
    x + y
}
rdd.aggregate(initialValue) (seqOp, combOp)


Databricks execution :

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

val initialValue : Int  = 0

def seqOp(a: Int, b: Int) : Int = {
    a + b
}
def combOp(x: Int, y: Int): Int = {
    x + y
}
rdd.aggregate(initialValue) (seqOp, combOp)

Output :

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@15248f9e
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at command-240706818333614:7
initialValue: Int = 0
seqOp: (a: Int, b: Int)Int
combOp: (x: Int, y: Int)Int
res1: Int = 21

Same code as above but executed in Scala prompt :

scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at parallelize at <console>:23

scala> val initialValue : Int  = 0
initialValue: Int = 0

scala> def seqOp(a: Int, b: Int) : Int = {
     |     a + b
     | }
seqOp: (a: Int, b: Int)Int

scala> def combOp(x: Int, y: Int): Int = {
     |     x + y
     | }
combOp: (x: Int, y: Int)Int

scala> rdd.aggregate(initialValue) (seqOp, combOp)
res62: Int = 21


Another problem  : Multiplication of all elements 

Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

val initialValue : Int  = 1

def seqOp(a: Int, b: Int) : Int = {
    a * b
}
def combOp(x: Int, y: Int): Int = {
    x * y
}
rdd.aggregate(initialValue) (seqOp, combOp)

Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@158beb21
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at command-240706818333614:7
initialValue: Int = 1
seqOp: (a: Int, b: Int)Int
combOp: (x: Int, y: Int)Int
res2: Int = 720


Another problem : Minimum of all elements

Databricks execution :

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

val initialValue : Int  = Int.MaxValue

def seqOp(a: Int, b: Int) : Int = {
    a min b
}
def combOp(x: Int, y: Int): Int = {
    x min y
}
rdd.aggregate(initialValue) (seqOp, combOp)

Output :

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@4298bf3f
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at command-240706818333614:7
initialValue: Int = 2147483647
seqOp: (a: Int, b: Int)Int
combOp: (x: Int, y: Int)Int
res4: Int = 1


Another problem : Count to total elements in the RDD

Databricks execution : 
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

val initialValue : Int  = 0

def seqOp(result: Int, data: Int) : Int = {
    result + 1
}
def combOp(result1: Int, result2: Int): Int = {
    result1 + result2
}
rdd.aggregate(initialValue) (seqOp, combOp)

Output :

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2e3acc68
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at command-240706818333614:7
initialValue: Int = 0
seqOp: (result: Int, data: Int)Int
combOp: (result1: Int, result2: Int)Int
res7: Int = 6



Lets find average :  aggregate() Vs fold() Vs reduce()

  • We need to understand that if an operation follows both associative & cumutative law then ONLY we can perform those operations in Distributed environments
    • Example : Addition follows both the laws, so we can perform addition in Distributed parallel processing
    • (a +b)+c = a+(b+c)
  • We CAN'T perform average in distributed environment directly
    • (((a+b)/2)+c)/2 is not equal to (a+((b+c)/2)/2


Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

// We need to have a Tuple, to have both sum of elements & total number of elements
// (sum, count_of_elements)
val initialValue : (Int, Int)  = (0, 0)

def seqOp(result: (Int, Int), data: Int) : (Int, Int) = {
// (initialvalue + data, count_of_elements + 1)
// (We are adding all the elements in partition, incrementing count_of_elements to 1)
    (result._1 + data, result._2 + 1)
}
def combOp(result1: (Int, Int), result2: (Int, Int)): (Int, Int) = {
    (result1._1 + result2._1, result1._2 + result2._2)
}
val rdd1 = rdd.aggregate(initialValue) (seqOp, combOp)
val avg = rdd1._1.toDouble / rdd1._2

Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@446b580d
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at command-240706818333614:7
initialValue: (Int, Int) = (0,0)
seqOp: (result: (Int, Int), data: Int)(Int, Int)
combOp: (result1: (Int, Int), result2: (Int, Int))(Int, Int)
rdd1: (Int, Int) = (21,6)
avg: Double = 3.5

Note :
Incase if the sequence & combine operations are doing same thing(like adding 2 variables in the above example), then instead of using aggregate() we can use fold() 

Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

val initialValue : Int  = 0

def op(result: Int, data: Int) : Int = {
  result + data
}
rdd.fold(initialValue)(op)

Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2bec7618
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at command-240706818333614:7
initialValue: Int = 0
op: (result: Int, data: Int)Int
res12: Int = 21


We can use reduce() and remove initial value as well.

Databricks execution :

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

def op(result: Int, data: Int) : Int = {
  result + data
}

rdd.reduce(op)

Output :
import org.apache.spark.SparkContext import org.apache.spark.SparkConf conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2af4aa80 sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at command-240706818333614:7 op: (result: Int, data: Int)Int res14: Int = 21


Note :
  • In real time, use take() instead of collect(), because collect() will print entire partition information, incase if this code is executing in client machine where there is not enough size to print entire information then it will be an issue. Hence use take(), it is like using limit in SQL to print only few records as per given input.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))

rdd.take(2)

Output :
import org.apache.spark.SparkContext import org.apache.spark.SparkConf conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@664dfec4 sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at parallelize at command-240706818333614:7 res15: Array[Int] = Array(1, 2)


context() Vs sparkContext() : Both methods are same, which helps to understand which spark context is in use.
  • context() is older way
  • sparkContext() is new way

scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd.context
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@d95b04e

scala> rdd.sparkContext
res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@d95b04e



count() Vs countApprox() : Give the count of elements in RDD (mostly useful in Spark ML)
  • Created an RDD with 1000000 elements
  • Tried using rdd.count(), it gave us exact number
  • Tried countApprox(), it will give output based on the input time in milliseconds
    • If we mention enough time, it will give use exact value
    • If we mention less time, it will give approximate value

scala> val rdd1 = sc.parallelize(1 to 1000000)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:23

scala> rdd1.count()
res4: Long = 1000000   

scala> rdd1.countApprox()
<console>:24: error: not enough arguments for method countApprox: (timeout: Long, confidence: Double)org.apache.spark.partial.PartialResult[org.apache.spark.partial.BoundedDouble].
Unspecified value parameter timeout.
       rdd1.countApprox()
                       ^
scala> rdd1.countApprox(40)
res12: org.apache.spark.partial.PartialResult[org.apache.spark.partial.BoundedDouble] = (partial: [998615.000, 1001386.000])

scala> rdd1.countApprox(50)
res13: org.apache.spark.partial.PartialResult[org.apache.spark.partial.BoundedDouble] = (final: [1000000.000, 1000000.000])


count() Vs countByValue()  :  This is one of the important interview question
  • count() will display the total count of elements in RDD
  • countByValue() will display the count of each element in RDD. It will each each record how many times occur.

scala> val rdd = sc.parallelize(List(1, 2, 3, 4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd.count()
res0: Long = 4 

scala> val rdd = sc.parallelize(List(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:23

scala> rdd.countByValue()
res2: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)


distinct() :
  • It will print unique elements from give RDD

scala> val rdd = sc.parallelize(List(1, 2, 2, 3, 4, 4, 4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23

scala> rdd.distinct()
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at distinct at <console>:24

scala> rdd.distinct().collect()
res9: Array[Int] = Array(4, 2, 1, 3)


filter() :

scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd1.filter(x => x % 2 == 0)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:24

scala> rdd1.filter(x => x % 2 == 0).collect()
res2: Array[Int] = Array(2, 4, 6, 8)                                            

scala> rdd1.filter(x => x < 3 | x > 6).collect()
res4: Array[Int] = Array(1, 2, 7, 8)


first() : It will get the first element

scala> rdd1.filter(x => x < 3 | x > 6).collect()
res4: Array[Int] = Array(1, 2, 7, 8)

scala> rdd1.first()
res5: Int = 1


map() Vs flatMap()   : Both are transformations.

  • map() is 1-1 operation
  • flatMap() is 1-many operation
    • One element as input
    • Can have multiple elements as output
    • flatMap() must be iterable, it should return list of elements
  • When to Use map() vs flatMap()
    • Use map() when you want a one-to-one transformation.
    • Use flatMap() when you want a one-to-many transformation and need to flatten the results.

Example 1 :

val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.map(x => Seq(x, x * 2))
println(result.collect().mkString(", "))

The result is a collection of lists, similar to Python.


val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.flatMap(x => Seq(x, x * 2))
println(result.collect().mkString(", "))

The nested structure is flattened into a single sequence.


Example 2 :

scala> val rdd2 = sc.parallelize(List("I am Arun", "Good morning", "How are you", "Have a good day"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:23

scala> rdd2.collect()
res6: Array[String] = Array(I am Arun, Good morning, How are you, Have a good day)

scala> rdd2.collect().foreach(println)
I am Arun
Good morning
How are you
Have a good day

scala> rdd2.map(line => line.split(" ")).collect()
res9: Array[Array[String]] = Array(Array(I, am, Arun), Array(Good, morning), Array(How, are, you), Array(Have, a, good, day))

scala> rdd2.map(line => line.split(" ")).count()
res10: Long = 4

scala> rdd2.flatMap(line => line.split(" ")).collect()
res11: Array[String] = Array(I, am, Arun, Good, morning, How, are, you, Have, a, good, day)

scala> rdd2.flatMap(line => line.split(" ")).count()
res12: Long = 12

scala> rdd2.flatMap(line => line).collect()
res14: Array[Char] = Array(I,  , a, m,  , A, r, u, n, G, o, o, d,  , m, o, r, n, i, n, g, H, o, w,  , a, r, e,  , y, o, u, H, a, v, e,  , a,  , g, o, o, d,  , d, a, y)


foreach() Vs foreachPartition() :

  • foreach()  : It is on each element of RDD
  • forreachPartition() : It is on each partition of RDD
scala> val rdd = sc.parallelize(List("I am going", "to hyd", "I am learning", "hadoop course"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd.collect()
res0: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)   

scala> rdd.foreach()
<console>:24: error: not enough arguments for method foreach: (f: String => Unit)Unit.
Unspecified value parameter f.
       rdd.foreach()
                  ^

scala> rdd.foreach(x => println(x))
I am learning
hadoop course
I am going
to hyd

scala> rdd.foreach(println(_))
I am going
to hyd
I am learning
hadoop course

scala> rdd.foreach(println)
I am going
to hyd
I am learning
hadoop course

scala> val rdd1 =sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:23

scala> rdd1.collect()
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.glom().collect()
res7: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))

scala> rdd1.foreachPartition()
<console>:24: error: not enough arguments for method foreachPartition: (f: Iterator[Int] => Unit)Unit.
Unspecified value parameter f.
       rdd1.foreachPartition()
                            ^

scala> rdd1.foreachPartition(x => println(x.mkString(",")))
4,5,6
1,2,3

scala> rdd1.foreachPartition(x => println(x.size))
3
3


groupBy() :

  • How you want to groupBy your data ?
    • By name wise ?
    • By first character wise ?


scala> val rdd4 = sc.parallelize(List("raj", "venkat", "sunil", "kalyan", "anvith", "raju", "dev", "hari"), 2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:23

scala> rdd4.glom().collect()
res15: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))

scala> rdd4.groupBy()
<console>:24: error: overloaded method value groupBy with alternatives:
  [K](f: String => K, p: org.apache.spark.Partitioner)(implicit kt: scala.reflect.ClassTag[K], implicit ord: Ordering[K])org.apache.spark.rdd.RDD[(K, Iterable[String])] <and>
  [K](f: String => K, numPartitions: Int)(implicit kt: scala.reflect.ClassTag[K])org.apache.spark.rdd.RDD[(K, Iterable[String])] <and>
  [K](f: String => K)(implicit kt: scala.reflect.ClassTag[K])org.apache.spark.rdd.RDD[(K, Iterable[String])]
 cannot be applied to ()
       rdd4.groupBy()
            ^

scala> rdd4.groupBy(name => name)
res17: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[8] at groupBy at <console>:24

scala> rdd4.groupBy(name => name).collect()
res18: Array[(String, Iterable[String])] = Array((kalyan,CompactBuffer(kalyan)), (anvith,CompactBuffer(anvith)), (raju,CompactBuffer(raju)), (hari,CompactBuffer(hari)), (venkat,CompactBuffer(venkat)), (sunil,CompactBuffer(sunil)), (dev,CompactBuffer(dev)), (raj,CompactBuffer(raj)))

scala> rdd4.groupBy(name => name).collect().foreach(println)
(kalyan,CompactBuffer(kalyan))
(anvith,CompactBuffer(anvith))
(raju,CompactBuffer(raju))
(hari,CompactBuffer(hari))
(venkat,CompactBuffer(venkat))
(sunil,CompactBuffer(sunil))
(dev,CompactBuffer(dev))
(raj,CompactBuffer(raj))

scala> rdd4.groupBy(name => name.chatAt(0)).collect().foreach(println)
<console>:24: error: value chatAt is not a member of String
       rdd4.groupBy(name => name.chatAt(0)).collect().foreach(println)
                                 ^

scala> rdd4.groupBy(name => name.charAt(0)).collect().foreach(println)
(d,CompactBuffer(dev))
(h,CompactBuffer(hari))
(v,CompactBuffer(venkat))
(r,CompactBuffer(raj, raju))
(s,CompactBuffer(sunil))
(a,CompactBuffer(anvith))
(k,CompactBuffer(kalyan))

Here we are grouping by using the first character in each name in the list.

scala> rdd4.groupBy(name => name.charAt(1)).collect().foreach(println)
(n,CompactBuffer(anvith))
(e,CompactBuffer(venkat, dev))
(a,CompactBuffer(raj, kalyan, raju, hari))
(u,CompactBuffer(sunil))

scala> rdd4.groupBy(name => name.length).getNumPartitions
res23: Int = 2


We can re-partition while doing groupBy as below.
  • It is an advantage while working in real time to reduce the volume if data is less and distributed across multiple partitions
scala> rdd4.groupBy(name => name.length, 3).getNumPartitions
res24: Int = 3


rdd.id :

  • For every RDD one ID will be assigned
  • Note this is not the id() method in Python, that is a different concept where we can get the address of a variable using id(<variable_name>)
scala> rdd.id
res12: Int = 0


rdd.name :
  • We can assign a name to RDD
  • By default it is null
scala> rdd.name
res13: String = null

scala> rdd.setName("First RDD")
res15: rdd.type = First RDD ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd.name
res16: String = First RDD


union() Vs intersection() Vs subtract() :

  • union() will combine total elements on both the RDD's
  • intersection() will get common elements from both the RDD's
  • subtract() will remove elements from RDD1 which are available in RDD2 


scala> val rdd1 = sc.parallelize(1 to 6, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:23

scala> val rdd2 = sc.parallelize(4 to 9, 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:23

scala> val rdd3 = sc.parallelize(7 to 12, 2)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:23

scala> rdd1.intersection(rdd2).collect()
res20: Array[Int] = Array(4, 6, 5)

scala> rdd1.union(rdd2).collect()
res21: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4, 5, 6, 7, 8, 9)

scala> rdd1.union(rdd2).distinct().collect()
res22: Array[Int] = Array(4, 8, 1, 9, 5, 6, 2, 3, 7)

scala> rdd1.subtract(rdd2).collect()
res24: Array[Int] = Array(2, 1, 3)

scala> rdd2.subtract(rdd1).collect()
res25: Array[Int] = Array(8, 7, 9)


keyBy() :
  • Based on the given value, we are generating a key
  • keyBy() is a transformation which will convert existing RDD as per the mentioned condition
  • In the below example, we are using the length of element as KEY and actual element as value

scala> val rdd4 = sc.parallelize(List("raj", "venkat", "sunil", "kalyan", "anvith", "raju", "dev", "hari"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:23

scala> rdd4.glom().collect()
res26: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))

scala> rdd4.keyBy(name => name.length)
res28: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[30] at keyBy at <console>:24

scala> rdd4.keyBy(name => name.length).collect()
res29: Array[(Int, String)] = Array((3,raj), (6,venkat), (5,sunil), (6,kalyan), (6,anvith), (4,raju), (3,dev), (4,hari))


Note :
  • Main difference between groupBy() and keyBy() is
    • groupBy() groups elements based on a function applied to each element
      • It returns pair RDD (key, iterable)
      • key is the group key
      • iterable contains all values belongs to that key
    • keyBy() converts each value into key, value pair
      • It doesn't group elements, instead assign key to each element
      • Result is a pair RDD, key, value
Example for groupBy() :

val rdd = sc.parallelize(Seq("apple", "banana", "avocado", "blueberry"))
val groupedRdd = rdd.groupBy(word => word.charAt(0)) // Group by first letter
groupedRdd.collect().foreach(println)

Output :
(a, CompactBuffer(apple, avocado))
(b, CompactBuffer(banana, blueberry))


Example for keyBy() :

val rdd = sc.parallelize(Seq("apple", "banana", "avocado", "blueberry"))
val keyedRdd = rdd.keyBy(word => word.charAt(0)) // Assign key as first letter
keyedRdd.collect().foreach(println)

Output :
(a, apple)
(a, avocado)
(b, banana)
(b, blueberry)




map()   Vs    mapPartitions()    Vs     mapPartitionsWithIndex() :

  • map() will have control at top level (not at partition level)
  • mapPartitoins() will have control at partition level
  • mapPartitionsWithIndex() will have control at index level on each partition

mapPartitionsWithIndex() 
  • When we use mapPartitionsWithIndex(), we will be having control at index level on each partition

scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:23

scala> rdd.glom().collect()
res31: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))

scala> rdd.mapPartitionsWithIndex(myfunc1).collect()
res33: Array[String] = Array(Index = 0, data = 1, Index = 0, data = 2, Index = 0, data = 3, Index = 1, data = 4, Index = 1, data = 5, Index = 1, data = 6)

scala> rdd.mapPartitionsWithIndex(myfunc1).collect().foreach(println)
Index = 0, data = 1
Index = 0, data = 2
Index = 0, data = 3
Index = 1, data = 4
Index = 1, data = 5
Index = 1, data = 6


partitions :
  • When we call getNumPartitions(), internally it will run rdd.partitions.length

scala> rdd4.glom().collect()
res35: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))

scala> rdd4.partitions
res36: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@b0d, org.apache.spark.rdd.ParallelCollectionPartition@b0e)

scala> rdd4.partitions.length
res37: Int = 2

scala> rdd4.getNumPartitions
res38: Int = 2


pipe() :
  • pipe() helps us to run some unix commands on partition level

scala> rdd4.glom().collect()
res39: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))

scala> rdd4.pipe("head -n 1").collect()
res41: Array[String] = Array(raj, anvith)

scala> rdd4.pipe("head -n 2").collect()
res42: Array[String] = Array(raj, venkat, anvith, raju)

scala> rdd4.pipe("tail -n 1").collect()
res43: Array[String] = Array(kalyan, hari)

scala> rdd4.pipe("tail -n 2").collect()
res44: Array[String] = Array(sunil, kalyan, dev, hari)


sortBy()  :

  • To sort elements 
  • Based on alphabetical order
  • Based on length of elements
scala> rdd4.collect()
res45: Array[String] = Array(raj, venkat, sunil, kalyan, anvith, raju, dev, hari)

scala> rdd4.sortBy(name => name).collect()
res46: Array[String] = Array(anvith, dev, hari, kalyan, raj, raju, sunil, venkat)

scala> rdd4.sortBy(name => name.length).collect()
res47: Array[String] = Array(raj, dev, raju, hari, sunil, venkat, kalyan, anvith)

Sorting in Ascending, Descending order :

scala> rdd4.sortBy(name => name, true).collect()
res48: Array[String] = Array(anvith, dev, hari, kalyan, raj, raju, sunil, venkat)

scala> rdd4.sortBy(name => name, false).collect()
res49: Array[String] = Array(venkat, sunil, raju, raj, kalyan, hari, dev, anvith)

we can also pass 3rd optional parameter, number of partitions.

scala> rdd4.sortBy(name => name, false).getNumPartitions
res50: Int = 2

scala> rdd4.sortBy(name => name, false, 3).getNumPartitions
res51: Int = 3

scala> rdd4.take(2)
res57: Array[String] = Array(raj, venkat)

takeOrdererd() will pick elements based on order.

scala> rdd4.takeOrdered(2)
res58: Array[String] = Array(anvith, dev)

top() :

scala> rdd4.top(2)
res59: Array[String] = Array(venkat, sunil)

scala> rdd4.sortBy(name => name, false).collect()
res60: Array[String] = Array(venkat, sunil, raju, raj, kalyan, hari, dev, anvith)

scala> rdd4.sortBy(name => name, false).take(2)
res61: Array[String] = Array(venkat, sunil)


treeAgrregate() : 

Incase if you know that your data is heavy, instead of going for aggregate() use treeAggregate(), because you can define the depth of the tree which make computations faster(compared with normal aggregate function) - make sure resources are available. This is like a Merge Sort.

  • treeAggregate() is almost same as aggregate function but we have a additional parameter(2) as below
    • scala> rdd.treeAggregate(initialValue) (seqOp, combOp, 2)   
    • res64: Int = 21
    • defautl value is 2 which is the depth of the tree


scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at parallelize at <console>:23

scala> val initialValue : Int  = 0
initialValue: Int = 0

scala> def seqOp(a: Int, b: Int) : Int = {
     |     a + b
     | }
seqOp: (a: Int, b: Int)Int

scala> def combOp(x: Int, y: Int): Int = {
     |     x + y
     | }
combOp: (x: Int, y: Int)Int

scala> rdd.aggregate(initialValue) (seqOp, combOp)
res62: Int = 21

scala> rdd.treeAggregate(initialValue) (seqOp, combOp)
res63: Int = 21


zip() 
  • zip() is a transformation in Spark 
  • zip() method available in both Python & Scala but Spark zip() is a bit different as it expects same number of elements at partition level
  • We have one more method called zipPartitions(), always use this function instead of just zip()
scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at parallelize at <console>:23

scala> val rdd2 = sc.parallelize(List('a', 'b', 'c', 'd'), 2)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[89] at parallelize at <console>:23

scala> val rdd3 = sc.parallelize(List(1.1, 2.2, 3.3, 4.4, 5.5, 6.6), 2)
rdd3: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[90] at parallelize at <console>:23

scala> rdd1.zip(rdd2)
res65: org.apache.spark.rdd.RDD[(Int, Char)] = ZippedPartitionsRDD2[91] at zip at <console>:25

scala> rdd1.zip(rdd3)
res66: org.apache.spark.rdd.RDD[(Int, Double)] = ZippedPartitionsRDD2[92] at zip at <console>:25

scala> rdd1.zip(rdd2).collect()
res68: Array[(Int, Char)] = Array((1,a), (2,b), (3,c), (4,d))


Note : We should have same number of elements in each PARTITION (this condition is not there in Scala & Python because they are not distributed systems)

scala> rdd1.zip(rdd3).collect()
25/03/27 17:55:54 ERROR Executor: Exception in task 0.0 in stage 69.0 (TID 138)
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
at org.apache.spark.errors.SparkCoreErrors$.canOnlyZipRDDsWithSamePartitionSizeError(SparkCoreErrors.scala:142)


zipWithIndex()  & zipWithUniqueID() 


scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> val rdd2 = sc.parallelize(List("raj", "venkat", "Arun", "Thoshi", "anupama", "sumathi"), 2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:23

scala> rdd1.collect()
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6)                                      

scala> rdd2.collect()
res3: Array[String] = Array(raj, venkat, Arun, Thoshi, anupama, sumathi)

scala> rdd1.zipWithIndex().collect().foreach(println)
(1,0)
(2,1)
(3,2)
(4,3)
(5,4)
(6,5)

scala> rdd2.zipWithIndex().collect().foreach(println)
(raj,0)
(venkat,1)
(Arun,2)
(Thoshi,3)
(anupama,4)
(sumathi,5)

Note : zipWithUniqueID() can have ID which is more than length of the list, it will generate the response very fast. It will allocate indexes fast but order won't be there.

Question : Can we get RDD data based on Index ?
Answer : If we go with zipWithIndex, we can get data based on the index. We will use it in SparkSQL

scala> rdd2.zipWithUniqueId().collect().foreach(println)
(raj,0)
(venkat,2)
(Arun,4)
(Thoshi,1)
(anupama,3)
(sumathi,5)



STATISTICAL OPERATIONS 

==> randomSplit()
  • In Machine learning, we use randomSplit() to divide the data based on a percentage
  • In the below example, we divided into 2 partitions with 50% on each partition
    • rdd.randomSplit(0.5, 0.5)
  • Output won't be consistent as it is randomSplit, see below examaples giving different elements in both the partitions even for same command when we run it multiple times
  • We can increase number of partitons and divide percentage based on it as below.


scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:23

scala> rdd.glom().collect()
res8: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))

scala> rdd.randomSplit(Array(0.5, 0.5))
res9: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:24, MapPartitionsRDD[8] at randomSplit at <console>:24)

scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect()))
[I@21fb98a1
[I@299a5e10

scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
3,4,6
1,2,5

scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
2,3,4
1,5,6

scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
1,2,3
4,5,6

scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
2,3,5
1,4,6

scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
2,3
1,4,5,6

scala> rdd.randomSplit(Array(0.4, 0.3, 0.3)).foreach(r => println(r.collect().mkString(",")))
2,3,6
1,5
4

==> sample()
  • sample() is get randomsized sample from given RDD 
  • We can mentioned % of elements that we want to get randomly

scala> rdd.sample(false, 1).collect()
res33: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd.sample(true, 1).collect()
res37: Array[Int] = Array(2, 3, 4, 4, 5, 6, 6, 6)


Sample RDD's for some other statistical operation : These are for Data scientists, not for Data engineers.

scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> rdd.stats()
res2: org.apache.spark.util.StatCounter = (count: 6, mean: 3.500000, stdev: 1.707825, max: 6.000000, min: 1.000000)

scala> rdd.stdev()
res4: Double = 1.707825127659933

scala> rdd.variance()
res5: Double = 2.9166666666666665

// POP means total data
scala> rdd.popStdev()
res6: Double = 1.707825127659933

scala> rdd.popVariance()
res7: Double = 2.9166666666666665

// Sample means set of actual data, it won't pick up entire data from RDD
scala> rdd.sampleStdev()
res8: Double = 1.8708286933869707

scala> rdd.sampleVariance()
res9: Double = 3.5


That's all for this blog. See you again!

Thanks,
Arun Mathe
Email ID : arunkumar.mathe@gmail.com
Contact ID : 9704117111

Comments

Popular posts from this blog

Python : Python for Spark

Python is a general purpose programming language, that is used for variety of tasks like web-development, Data analytics etc. Initially Python is developed as a functional programming language, later object oriented programming concepts are also added to Python. We will see what basics we need in Python to play with Spark. Incase if you want to practice Spark in Big Data environment, you can use Databricks. URL :  https://community.cloud.databricks.com This is the main tool which programmers are using in real time production environment We have both Community edition(Free version with limited support) & paid versions available Register for above tool online for free and practice Indentation is very important in Python. We don't use braces in Python like we do in Java, and the scope of the block/loop/definition is interpreted based on the indentation of code. Correct Indentation : def greet():     print("Hello!")  # Indented correctly     print("Welcome ...

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

AWS : Boto3 (Accessing AWS using Python)

Boto3 is the Amazon Web Services software development kit for Python, which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2. Boto3 is maintained and published by AWS. Please find latest documentation at : https://boto3.amazonaws.com/v1/documentation/api/latest/index.html Command to install it : pip install boto3 Local storage Vs Cloud storage: Local file system is block oriented, means storage is divided into block with size range 1-4kb Collections of multiple blocks is called a file in local storage Example : 10MB file will be occupying almost 2500 blocks(assuming 4kb each block) We know that we can install softwares in local system (indirectly in blocks) Local system blocks managed by Operating system But Cloud storage is a object oriented storage, means everything is object No size limit, it is used only to store data, we can't install software in cloud storage Cloud storage managed by users We need to install either Pyc...