We have different types of transformations and actions available that we can perform on the top of RDD's. This blog will explain most of the important functions that we can use on RDD's. Please try to focus on the scala code to understand what that particular function(action/transformation) does.
aggregate() :
Aggregate the elements of each partition, and then the results of all the partitions, using given combine functions and a neutral "zero value" (initial value). This function can return a different result type, U, than the type of this RDD, T.
This is very important function which we regularly use in real time projects. If you have a clear understanding on this aggregate() then you will understand how a distributed parallel processing will work in distributed environments.
Syntax :
def aggregate[U](zeroValue: U) (SeqOp: (U, T) => U, combOp: (U, T) => U) U
- zeroValue
- The initial value of the accumulated result of each partition for the seqOp
- And also the initial value for the combine results from different partitions for the combOp
- This will typically be the neutral element (Nil for list conca, 0 for Sum )
- seqOp
- An operator used to accumulate results within a partition
- combOp
- An associative operate used to combine results from different partitions
Lets take a problem :
- Sum of all numbers in RDD
val initialValue : Int = 0
def seqOp(a: Int, b: Int) : Int = {
a + b
}
def combOp(x: Int, y: Int): Int = {
x + y
}
rdd.aggregate(initialValue) (seqOp, combOp)
Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
val initialValue : Int = 0
def seqOp(a: Int, b: Int) : Int = {
a + b
}
def combOp(x: Int, y: Int): Int = {
x + y
}
rdd.aggregate(initialValue) (seqOp, combOp)
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@15248f9e
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at command-240706818333614:7
initialValue: Int = 0
seqOp: (a: Int, b: Int)Int
combOp: (x: Int, y: Int)Int
res1: Int = 21
Same code as above but executed in Scala prompt :
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at parallelize at <console>:23
scala> val initialValue : Int = 0
initialValue: Int = 0
scala> def seqOp(a: Int, b: Int) : Int = {
| a + b
| }
seqOp: (a: Int, b: Int)Int
scala> def combOp(x: Int, y: Int): Int = {
| x + y
| }
combOp: (x: Int, y: Int)Int
scala> rdd.aggregate(initialValue) (seqOp, combOp)
res62: Int = 21
Another problem : Multiplication of all elements
Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
val initialValue : Int = 1
def seqOp(a: Int, b: Int) : Int = {
a * b
}
def combOp(x: Int, y: Int): Int = {
x * y
}
rdd.aggregate(initialValue) (seqOp, combOp)
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@158beb21
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at command-240706818333614:7
initialValue: Int = 1
seqOp: (a: Int, b: Int)Int
combOp: (x: Int, y: Int)Int
res2: Int = 720
Another problem : Minimum of all elements
Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
val initialValue : Int = Int.MaxValue
def seqOp(a: Int, b: Int) : Int = {
a min b
}
def combOp(x: Int, y: Int): Int = {
x min y
}
rdd.aggregate(initialValue) (seqOp, combOp)
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@4298bf3f
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at command-240706818333614:7
initialValue: Int = 2147483647
seqOp: (a: Int, b: Int)Int
combOp: (x: Int, y: Int)Int
res4: Int = 1
Another problem : Count to total elements in the RDD
Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
val initialValue : Int = 0
def seqOp(result: Int, data: Int) : Int = {
result + 1
}
def combOp(result1: Int, result2: Int): Int = {
result1 + result2
}
rdd.aggregate(initialValue) (seqOp, combOp)
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2e3acc68
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at command-240706818333614:7
initialValue: Int = 0
seqOp: (result: Int, data: Int)Int
combOp: (result1: Int, result2: Int)Int
res7: Int = 6
Lets find average : aggregate() Vs fold() Vs reduce()
- We need to understand that if an operation follows both associative & cumutative law then ONLY we can perform those operations in Distributed environments
- Example : Addition follows both the laws, so we can perform addition in Distributed parallel processing
- (a +b)+c = a+(b+c)
- We CAN'T perform average in distributed environment directly
- (((a+b)/2)+c)/2 is not equal to (a+((b+c)/2)/2
Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
// We need to have a Tuple, to have both sum of elements & total number of elements
// (sum, count_of_elements)
val initialValue : (Int, Int) = (0, 0)
def seqOp(result: (Int, Int), data: Int) : (Int, Int) = {
// (initialvalue + data, count_of_elements + 1)
// (We are adding all the elements in partition, incrementing count_of_elements to 1)
(result._1 + data, result._2 + 1)
}
def combOp(result1: (Int, Int), result2: (Int, Int)): (Int, Int) = {
(result1._1 + result2._1, result1._2 + result2._2)
}
val rdd1 = rdd.aggregate(initialValue) (seqOp, combOp)
val avg = rdd1._1.toDouble / rdd1._2
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@446b580d
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at command-240706818333614:7
initialValue: (Int, Int) = (0,0)
seqOp: (result: (Int, Int), data: Int)(Int, Int)
combOp: (result1: (Int, Int), result2: (Int, Int))(Int, Int)
rdd1: (Int, Int) = (21,6)
avg: Double = 3.5
Note :
Incase if the sequence & combine operations are doing same thing(like adding 2 variables in the above example), then instead of using aggregate() we can use fold()
Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
val initialValue : Int = 0
def op(result: Int, data: Int) : Int = {
result + data
}
rdd.fold(initialValue)(op)
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2bec7618
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at command-240706818333614:7
initialValue: Int = 0
op: (result: Int, data: Int)Int
res12: Int = 21
We can use reduce() and remove initial value as well.
Databricks execution :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
def op(result: Int, data: Int) : Int = {
result + data
}
rdd.reduce(op)
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2af4aa80
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at command-240706818333614:7
op: (result: Int, data: Int)Int
res14: Int = 21
Note :
- In real time, use take() instead of collect(), because collect() will print entire partition information, incase if this code is executing in client machine where there is not enough size to print entire information then it will be an issue. Hence use take(), it is like using limit in SQL to print only few records as per given input.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("example").setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd.take(2)
Output :
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@664dfec4
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3633dc3a
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[57] at parallelize at command-240706818333614:7
res15: Array[Int] = Array(1, 2)
context() Vs sparkContext() : Both methods are same, which helps to understand which spark context is in use.
- context() is older way
- sparkContext() is new way
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> rdd.context
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@d95b04e
scala> rdd.sparkContext
res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@d95b04e
count() Vs countApprox() : Give the count of elements in RDD (mostly useful in Spark ML)
- Created an RDD with 1000000 elements
- Tried using rdd.count(), it gave us exact number
- Tried countApprox(), it will give output based on the input time in milliseconds
- If we mention enough time, it will give use exact value
- If we mention less time, it will give approximate value
scala> val rdd1 = sc.parallelize(1 to 1000000)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:23
scala> rdd1.count()
res4: Long = 1000000
scala> rdd1.countApprox()
<console>:24: error: not enough arguments for method countApprox: (timeout: Long, confidence: Double)org.apache.spark.partial.PartialResult[org.apache.spark.partial.BoundedDouble].
Unspecified value parameter timeout.
rdd1.countApprox()
^
scala> rdd1.countApprox(40)
res12: org.apache.spark.partial.PartialResult[org.apache.spark.partial.BoundedDouble] = (partial: [998615.000, 1001386.000])
scala> rdd1.countApprox(50)
res13: org.apache.spark.partial.PartialResult[org.apache.spark.partial.BoundedDouble] = (final: [1000000.000, 1000000.000])
count() Vs countByValue() : This is one of the important interview question
- count() will display the total count of elements in RDD
- countByValue() will display the count of each element in RDD. It will each each record how many times occur.
scala> val rdd = sc.parallelize(List(1, 2, 3, 4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> rdd.count()
res0: Long = 4
scala> val rdd = sc.parallelize(List(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:23
scala> rdd.countByValue()
res2: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
distinct() :
- It will print unique elements from give RDD
scala> val rdd = sc.parallelize(List(1, 2, 2, 3, 4, 4, 4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23
scala> rdd.distinct()
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at distinct at <console>:24
scala> rdd.distinct().collect()
res9: Array[Int] = Array(4, 2, 1, 3)
filter() :
scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> rdd1.filter(x => x % 2 == 0)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:24
scala> rdd1.filter(x => x % 2 == 0).collect()
res2: Array[Int] = Array(2, 4, 6, 8)
scala> rdd1.filter(x => x < 3 | x > 6).collect()
res4: Array[Int] = Array(1, 2, 7, 8)
first() : It will get the first element
scala> rdd1.filter(x => x < 3 | x > 6).collect()
res4: Array[Int] = Array(1, 2, 7, 8)
scala> rdd1.first()
res5: Int = 1
map() Vs flatMap() : Both are transformations.
- map() is 1-1 operation
- flatMap() is 1-many operation
- One element as input
- Can have multiple elements as output
- flatMap() must be iterable, it should return list of elements
- When to Use map() vs flatMap()
- Use map() when you want a one-to-one transformation.
- Use flatMap() when you want a one-to-many transformation and need to flatten the results.
Example 1 :
val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.map(x => Seq(x, x * 2))
println(result.collect().mkString(", "))
The result is a collection of lists, similar to Python.
val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.flatMap(x => Seq(x, x * 2))
println(result.collect().mkString(", "))
The nested structure is flattened into a single sequence.
Example 2 :
scala> val rdd2 = sc.parallelize(List("I am Arun", "Good morning", "How are you", "Have a good day"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:23
scala> rdd2.collect()
res6: Array[String] = Array(I am Arun, Good morning, How are you, Have a good day)
scala> rdd2.collect().foreach(println)
I am Arun
Good morning
How are you
Have a good day
scala> rdd2.map(line => line.split(" ")).collect()
res9: Array[Array[String]] = Array(Array(I, am, Arun), Array(Good, morning), Array(How, are, you), Array(Have, a, good, day))
scala> rdd2.map(line => line.split(" ")).count()
res10: Long = 4
scala> rdd2.flatMap(line => line.split(" ")).collect()
res11: Array[String] = Array(I, am, Arun, Good, morning, How, are, you, Have, a, good, day)
scala> rdd2.flatMap(line => line.split(" ")).count()
res12: Long = 12
scala> rdd2.flatMap(line => line).collect()
res14: Array[Char] = Array(I, , a, m, , A, r, u, n, G, o, o, d, , m, o, r, n, i, n, g, H, o, w, , a, r, e, , y, o, u, H, a, v, e, , a, , g, o, o, d, , d, a, y)
foreach() Vs foreachPartition() :
- foreach() : It is on each element of RDD
- forreachPartition() : It is on each partition of RDD
scala> val rdd = sc.parallelize(List("I am going", "to hyd", "I am learning", "hadoop course"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> rdd.collect()
res0: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)
scala> rdd.foreach()
<console>:24: error: not enough arguments for method foreach: (f: String => Unit)Unit.
Unspecified value parameter f.
rdd.foreach()
^
scala> rdd.foreach(x => println(x))
I am learning
hadoop course
I am going
to hyd
scala> rdd.foreach(println(_))
I am going
to hyd
I am learning
hadoop course
scala> rdd.foreach(println)
I am going
to hyd
I am learning
hadoop course
scala> val rdd1 =sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:23
scala> rdd1.collect()
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd1.glom().collect()
res7: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))
scala> rdd1.foreachPartition()
<console>:24: error: not enough arguments for method foreachPartition: (f: Iterator[Int] => Unit)Unit.
Unspecified value parameter f.
rdd1.foreachPartition()
^
scala> rdd1.foreachPartition(x => println(x.mkString(",")))
4,5,6
1,2,3
scala> rdd1.foreachPartition(x => println(x.size))
3
3
groupBy() :
- How you want to groupBy your data ?
- By name wise ?
- By first character wise ?
scala> val rdd4 = sc.parallelize(List("raj", "venkat", "sunil", "kalyan", "anvith", "raju", "dev", "hari"), 2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:23
scala> rdd4.glom().collect()
res15: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))
scala> rdd4.groupBy()
<console>:24: error: overloaded method value groupBy with alternatives:
[K](f: String => K, p: org.apache.spark.Partitioner)(implicit kt: scala.reflect.ClassTag[K], implicit ord: Ordering[K])org.apache.spark.rdd.RDD[(K, Iterable[String])] <and>
[K](f: String => K, numPartitions: Int)(implicit kt: scala.reflect.ClassTag[K])org.apache.spark.rdd.RDD[(K, Iterable[String])] <and>
[K](f: String => K)(implicit kt: scala.reflect.ClassTag[K])org.apache.spark.rdd.RDD[(K, Iterable[String])]
cannot be applied to ()
rdd4.groupBy()
^
scala> rdd4.groupBy(name => name)
res17: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[8] at groupBy at <console>:24
scala> rdd4.groupBy(name => name).collect()
res18: Array[(String, Iterable[String])] = Array((kalyan,CompactBuffer(kalyan)), (anvith,CompactBuffer(anvith)), (raju,CompactBuffer(raju)), (hari,CompactBuffer(hari)), (venkat,CompactBuffer(venkat)), (sunil,CompactBuffer(sunil)), (dev,CompactBuffer(dev)), (raj,CompactBuffer(raj)))
scala> rdd4.groupBy(name => name).collect().foreach(println)
(kalyan,CompactBuffer(kalyan))
(anvith,CompactBuffer(anvith))
(raju,CompactBuffer(raju))
(hari,CompactBuffer(hari))
(venkat,CompactBuffer(venkat))
(sunil,CompactBuffer(sunil))
(dev,CompactBuffer(dev))
(raj,CompactBuffer(raj))
scala> rdd4.groupBy(name => name.chatAt(0)).collect().foreach(println)
<console>:24: error: value chatAt is not a member of String
rdd4.groupBy(name => name.chatAt(0)).collect().foreach(println)
^
scala> rdd4.groupBy(name => name.charAt(0)).collect().foreach(println)
(d,CompactBuffer(dev))
(h,CompactBuffer(hari))
(v,CompactBuffer(venkat))
(r,CompactBuffer(raj, raju))
(s,CompactBuffer(sunil))
(a,CompactBuffer(anvith))
(k,CompactBuffer(kalyan))
Here we are grouping by using the first character in each name in the list.
scala> rdd4.groupBy(name => name.charAt(1)).collect().foreach(println)
(n,CompactBuffer(anvith))
(e,CompactBuffer(venkat, dev))
(a,CompactBuffer(raj, kalyan, raju, hari))
(u,CompactBuffer(sunil))
scala> rdd4.groupBy(name => name.length).getNumPartitions
res23: Int = 2
We can re-partition while doing groupBy as below.
- It is an advantage while working in real time to reduce the volume if data is less and distributed across multiple partitions
scala> rdd4.groupBy(name => name.length, 3).getNumPartitions
res24: Int = 3
rdd.id :
- For every RDD one ID will be assigned
- Note this is not the id() method in Python, that is a different concept where we can get the address of a variable using id(<variable_name>)
scala> rdd.id
res12: Int = 0
rdd.name :
- We can assign a name to RDD
- By default it is null
scala> rdd.name
res13: String = null
scala> rdd.setName("First RDD")
res15: rdd.type = First RDD ParallelCollectionRDD[0] at parallelize at <console>:23
scala> rdd.name
res16: String = First RDD
union() Vs intersection() Vs subtract() :
- union() will combine total elements on both the RDD's
- intersection() will get common elements from both the RDD's
- subtract() will remove elements from RDD1 which are available in RDD2
scala> val rdd1 = sc.parallelize(1 to 6, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala> val rdd2 = sc.parallelize(4 to 9, 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:23
scala> val rdd3 = sc.parallelize(7 to 12, 2)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:23
scala> rdd1.intersection(rdd2).collect()
res20: Array[Int] = Array(4, 6, 5)
scala> rdd1.union(rdd2).collect()
res21: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4, 5, 6, 7, 8, 9)
scala> rdd1.union(rdd2).distinct().collect()
res22: Array[Int] = Array(4, 8, 1, 9, 5, 6, 2, 3, 7)
scala> rdd1.subtract(rdd2).collect()
res24: Array[Int] = Array(2, 1, 3)
scala> rdd2.subtract(rdd1).collect()
res25: Array[Int] = Array(8, 7, 9)
keyBy() :
- Based on the given value, we are generating a key
- keyBy() is a transformation which will convert existing RDD as per the mentioned condition
- In the below example, we are using the length of element as KEY and actual element as value
scala> val rdd4 = sc.parallelize(List("raj", "venkat", "sunil", "kalyan", "anvith", "raju", "dev", "hari"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:23
scala> rdd4.glom().collect()
res26: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))
scala> rdd4.keyBy(name => name.length)
res28: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[30] at keyBy at <console>:24
scala> rdd4.keyBy(name => name.length).collect()
res29: Array[(Int, String)] = Array((3,raj), (6,venkat), (5,sunil), (6,kalyan), (6,anvith), (4,raju), (3,dev), (4,hari))
Note :
- Main difference between groupBy() and keyBy() is
- groupBy() groups elements based on a function applied to each element
- It returns pair RDD (key, iterable)
- key is the group key
- iterable contains all values belongs to that key
- keyBy() converts each value into key, value pair
- It doesn't group elements, instead assign key to each element
- Result is a pair RDD, key, value
Example for groupBy() :
val rdd = sc.parallelize(Seq("apple", "banana", "avocado", "blueberry"))
val groupedRdd = rdd.groupBy(word => word.charAt(0)) // Group by first letter
groupedRdd.collect().foreach(println)
Output :
(a, CompactBuffer(apple, avocado))
(b, CompactBuffer(banana, blueberry))
Example for keyBy() :
val rdd = sc.parallelize(Seq("apple", "banana", "avocado", "blueberry"))
val keyedRdd = rdd.keyBy(word => word.charAt(0)) // Assign key as first letter
keyedRdd.collect().foreach(println)
Output :
(a, apple)
(a, avocado)
(b, banana)
(b, blueberry)
map() Vs mapPartitions() Vs mapPartitionsWithIndex() :
- map() will have control at top level (not at partition level)
- mapPartitoins() will have control at partition level
- mapPartitionsWithIndex() will have control at index level on each partition
mapPartitionsWithIndex()
- When we use mapPartitionsWithIndex(), we will be having control at index level on each partition
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:23
scala> rdd.glom().collect()
res31: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))
scala> rdd.mapPartitionsWithIndex(myfunc1).collect()
res33: Array[String] = Array(Index = 0, data = 1, Index = 0, data = 2, Index = 0, data = 3, Index = 1, data = 4, Index = 1, data = 5, Index = 1, data = 6)
scala> rdd.mapPartitionsWithIndex(myfunc1).collect().foreach(println)
Index = 0, data = 1
Index = 0, data = 2
Index = 0, data = 3
Index = 1, data = 4
Index = 1, data = 5
Index = 1, data = 6
partitions :
- When we call getNumPartitions(), internally it will run rdd.partitions.length
scala> rdd4.glom().collect()
res35: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))
scala> rdd4.partitions
res36: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@b0d, org.apache.spark.rdd.ParallelCollectionPartition@b0e)
scala> rdd4.partitions.length
res37: Int = 2
scala> rdd4.getNumPartitions
res38: Int = 2
pipe() :
- pipe() helps us to run some unix commands on partition level
scala> rdd4.glom().collect()
res39: Array[Array[String]] = Array(Array(raj, venkat, sunil, kalyan), Array(anvith, raju, dev, hari))
scala> rdd4.pipe("head -n 1").collect()
res41: Array[String] = Array(raj, anvith)
scala> rdd4.pipe("head -n 2").collect()
res42: Array[String] = Array(raj, venkat, anvith, raju)
scala> rdd4.pipe("tail -n 1").collect()
res43: Array[String] = Array(kalyan, hari)
scala> rdd4.pipe("tail -n 2").collect()
res44: Array[String] = Array(sunil, kalyan, dev, hari)
sortBy() :
- To sort elements
- Based on alphabetical order
- Based on length of elements
scala> rdd4.collect()
res45: Array[String] = Array(raj, venkat, sunil, kalyan, anvith, raju, dev, hari)
scala> rdd4.sortBy(name => name).collect()
res46: Array[String] = Array(anvith, dev, hari, kalyan, raj, raju, sunil, venkat)
scala> rdd4.sortBy(name => name.length).collect()
res47: Array[String] = Array(raj, dev, raju, hari, sunil, venkat, kalyan, anvith)
Sorting in Ascending, Descending order :
scala> rdd4.sortBy(name => name, true).collect()
res48: Array[String] = Array(anvith, dev, hari, kalyan, raj, raju, sunil, venkat)
scala> rdd4.sortBy(name => name, false).collect()
res49: Array[String] = Array(venkat, sunil, raju, raj, kalyan, hari, dev, anvith)
we can also pass 3rd optional parameter, number of partitions.
scala> rdd4.sortBy(name => name, false).getNumPartitions
res50: Int = 2
scala> rdd4.sortBy(name => name, false, 3).getNumPartitions
res51: Int = 3
scala> rdd4.take(2)
res57: Array[String] = Array(raj, venkat)
takeOrdererd() will pick elements based on order.
scala> rdd4.takeOrdered(2)
res58: Array[String] = Array(anvith, dev)
top() :
scala> rdd4.top(2)
res59: Array[String] = Array(venkat, sunil)
scala> rdd4.sortBy(name => name, false).collect()
res60: Array[String] = Array(venkat, sunil, raju, raj, kalyan, hari, dev, anvith)
scala> rdd4.sortBy(name => name, false).take(2)
res61: Array[String] = Array(venkat, sunil)
treeAgrregate() :
Incase if you know that your data is heavy, instead of going for aggregate() use treeAggregate(), because you can define the depth of the tree which make computations faster(compared with normal aggregate function) - make sure resources are available. This is like a Merge Sort.
- treeAggregate() is almost same as aggregate function but we have a additional parameter(2) as below
- scala> rdd.treeAggregate(initialValue) (seqOp, combOp, 2)
- res64: Int = 21
- defautl value is 2 which is the depth of the tree
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at parallelize at <console>:23
scala> val initialValue : Int = 0
initialValue: Int = 0
scala> def seqOp(a: Int, b: Int) : Int = {
| a + b
| }
seqOp: (a: Int, b: Int)Int
scala> def combOp(x: Int, y: Int): Int = {
| x + y
| }
combOp: (x: Int, y: Int)Int
scala> rdd.aggregate(initialValue) (seqOp, combOp)
res62: Int = 21
scala> rdd.treeAggregate(initialValue) (seqOp, combOp)
res63: Int = 21
zip()
- zip() is a transformation in Spark
- zip() method available in both Python & Scala but Spark zip() is a bit different as it expects same number of elements at partition level
- We have one more method called zipPartitions(), always use this function instead of just zip()
scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at parallelize at <console>:23
scala> val rdd2 = sc.parallelize(List('a', 'b', 'c', 'd'), 2)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[89] at parallelize at <console>:23
scala> val rdd3 = sc.parallelize(List(1.1, 2.2, 3.3, 4.4, 5.5, 6.6), 2)
rdd3: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[90] at parallelize at <console>:23
scala> rdd1.zip(rdd2)
res65: org.apache.spark.rdd.RDD[(Int, Char)] = ZippedPartitionsRDD2[91] at zip at <console>:25
scala> rdd1.zip(rdd3)
res66: org.apache.spark.rdd.RDD[(Int, Double)] = ZippedPartitionsRDD2[92] at zip at <console>:25
scala> rdd1.zip(rdd2).collect()
res68: Array[(Int, Char)] = Array((1,a), (2,b), (3,c), (4,d))
Note : We should have same number of elements in each PARTITION (this condition is not there in Scala & Python because they are not distributed systems)
scala> rdd1.zip(rdd3).collect()
25/03/27 17:55:54 ERROR Executor: Exception in task 0.0 in stage 69.0 (TID 138)
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
at org.apache.spark.errors.SparkCoreErrors$.canOnlyZipRDDsWithSamePartitionSizeError(SparkCoreErrors.scala:142)
zipWithIndex() & zipWithUniqueID()
scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> val rdd2 = sc.parallelize(List("raj", "venkat", "Arun", "Thoshi", "anupama", "sumathi"), 2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:23
scala> rdd1.collect()
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd2.collect()
res3: Array[String] = Array(raj, venkat, Arun, Thoshi, anupama, sumathi)
scala> rdd1.zipWithIndex().collect().foreach(println)
(1,0)
(2,1)
(3,2)
(4,3)
(5,4)
(6,5)
scala> rdd2.zipWithIndex().collect().foreach(println)
(raj,0)
(venkat,1)
(Arun,2)
(Thoshi,3)
(anupama,4)
(sumathi,5)
Note : zipWithUniqueID() can have ID which is more than length of the list, it will generate the response very fast. It will allocate indexes fast but order won't be there.
Question : Can we get RDD data based on Index ?
Answer : If we go with zipWithIndex, we can get data based on the index. We will use it in SparkSQL
scala> rdd2.zipWithUniqueId().collect().foreach(println)
(raj,0)
(venkat,2)
(Arun,4)
(Thoshi,1)
(anupama,3)
(sumathi,5)
STATISTICAL OPERATIONS
==> randomSplit()
- In Machine learning, we use randomSplit() to divide the data based on a percentage
- In the below example, we divided into 2 partitions with 50% on each partition
- rdd.randomSplit(0.5, 0.5)
- Output won't be consistent as it is randomSplit, see below examaples giving different elements in both the partitions even for same command when we run it multiple times
- We can increase number of partitons and divide percentage based on it as below.
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:23
scala> rdd.glom().collect()
res8: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))
scala> rdd.randomSplit(Array(0.5, 0.5))
res9: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[7] at randomSplit at <console>:24, MapPartitionsRDD[8] at randomSplit at <console>:24)
scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect()))
[I@21fb98a1
[I@299a5e10
scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
3,4,6
1,2,5
scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
2,3,4
1,5,6
scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
1,2,3
4,5,6
scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
2,3,5
1,4,6
scala> rdd.randomSplit(Array(0.5, 0.5)).foreach(r => println(r.collect().mkString(",")))
2,3
1,4,5,6
scala> rdd.randomSplit(Array(0.4, 0.3, 0.3)).foreach(r => println(r.collect().mkString(",")))
2,3,6
1,5
4
==> sample()
- sample() is get randomsized sample from given RDD
- We can mentioned % of elements that we want to get randomly
scala> rdd.sample(false, 1).collect()
res33: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd.sample(true, 1).collect()
res37: Array[Int] = Array(2, 3, 4, 4, 5, 6, 6, 6)
Sample RDD's for some other statistical operation : These are for Data scientists, not for Data engineers.
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> rdd.stats()
res2: org.apache.spark.util.StatCounter = (count: 6, mean: 3.500000, stdev: 1.707825, max: 6.000000, min: 1.000000)
scala> rdd.stdev()
res4: Double = 1.707825127659933
scala> rdd.variance()
res5: Double = 2.9166666666666665
// POP means total data
scala> rdd.popStdev()
res6: Double = 1.707825127659933
scala> rdd.popVariance()
res7: Double = 2.9166666666666665
// Sample means set of actual data, it won't pick up entire data from RDD
scala> rdd.sampleStdev()
res8: Double = 1.8708286933869707
scala> rdd.sampleVariance()
res9: Double = 3.5
That's all for this blog. See you again!
Thanks,
Arun Mathe
Email ID : arunkumar.mathe@gmail.com
Contact ID : 9704117111
Comments
Post a Comment