Category Archives: Apache Spark

Understanding Co-partitions and Co-Grouping In Spark

The RDD’s in spark are partitioned, using Hash Partitioner by default. Co-partitioned RDD’s uses same partitioner and thus have their data distributed across partitions in same manner.

val data = Array(1, 2, 3, 4, 5)
val rdd1= sc.parallelize(data,10)
val data2 = Array(5,8,9,10,2)

In both of the above defined RDD’s ,same partitioner is used i.e HashPartitioner. HashPartitioner will partition the data in the same way for both RDD’s,same data values in two different RDD will give same Hashvalue. As the number of partitiones specified is also same. These co-partitioned RDD’s reduces the shuffling in network to a great extent. As all the keys required for keyBy transformations will be present in two same partitions of two different RDD’s.

Co-grouping utilizes concept of Co-Partitioning to provide efficient performance improvement when multiple RDD’s are to be joined, over using join again and again. As with every join operation the destination RDD will either have supplied or default value of partitions and the join may or may not require shuffling of two RDD’s that are to be joined based on, if they were co-partitioned and had same number of partitions.


Since rdd1 and rdd2 used same partitioner and also had same number of partitions, the join operation that produces rdd3 will not require any shuffle. But if rdd1 and rdd2 had different number of partitions than the content of rdd with small number of partitions would have been reshuffled.Since number of partitions are not specified, the will depend on default configuration.

Performing another join using rdd3 and rdd4 to create rdd5 will lead to chances of more shuffling. All these shuffling and expensive operations can be avoided by using cogroup when we have multiple RDD’s to be joined.


As the cogroup will create co-partitioned RDD’s

Writing Spark Data Frame to HBase

Community behind Spark has made lot of effort’s to make DataFrame Api’s very efficient and scalable. Reading and writing data, to and, from HBase to Spark DataFrame, bridges the gap between complex sql queries that can be performed on spark to that with Key- value store pattern of HBase. The shc connector implements the standard Spark Datasource API, and leverages the Spark Catalyst engine for query optimization.

To map a table in HBase with the table in Spark , we define a Table catalog.Which stores the mapping of keys, column qualifier and column family in HBase with that of table columns in spark.

In order to work more efficiently and making sure not to retrieve unwanted data from region servers.shc connector supports predicate pushdown where the filter conditions are pushed to data as close as possible i.e regionserver in case of HBase.

Support for partition pruning splits the Scan/BulkGet into multiple non-overlapping ranges, only the region servers that has the requested data will perform Scan/BulkGet.

Specifying conditions like Where x >y or Where x

session.sql('Select * from sparkHBase table where x>14567 and x<14568')

will result in a scan operation on HBase with key range between 14567 and 14568

timestamp temp pressure
1501844115 24 760
1501844125 28 800

The above table presented by Spark DataFrame can be saved to HBase by providing the mapping for key, column qualifiers, column name in HBase

def catalog = s"""{
|"table":{"namespace":"default", "name":"ToolLogs"},
|"timestamp":{"cf":"rowkey", "col":"key", "type":"long"},
|"temp":{"cf":"msmt", "col":"temp", "type":"float"},
|"pressure":{"cf":"msmt", "col":"pressure", "type":"float"}

specifying “cf”:”rowkey” for the key column is mandatory though we had msmt as our column family for HBase table,this is how the API is designed to work. Once we have defined the catalog mapping for our Table Catalog, we can store the data in dataframe directly to HBase using

Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))

Happy Reading …. ☺

Hidden Secrets while trying SQL Dataframes

There are few points that one should keep in mind while using DataFrames with SparkSql like when to import sqlcontext.implicits._  and how to deal with No Type Definetion found error

Let’s try to understand it with an example

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
* @author amithora
object SqlContextJsonRDD {
case class Employee(name: String,rank: Int)// note this class declaration if you declare the class inside the method you will get type not found error
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setMaster("local[2]").setAppName("ImplicitExampleSQL")
val sparkContext=new SparkContext(sparkConf)
val sqlContext=new SQLContext(sparkContext)
import sqlContext.implicits._ // note this import if you try to import this after package declaration you will get compilation error this is used to use toDF() method if you don't import this you will not find toDF()
val textRdd=sparkContext.textFile("Person.txt", 2)

val { line =>{
line.split(",") }
}.map { p => Employee(p(0),p(1).trim().toInt)


Spark updateStateByKey Explained!

Spark streaming is one of the great component of spark engine as it applies the computations to all the micro batches that gets created based on batch interval on the fly and also helps in combining our streaming results with the archive result database.

Spark stream introduces concept of DStream(Discretized stream) a Dstream is nothing but streams of RDD where one RDD holds one time of data i.e 1 batch=1 RDD, One of the powerful function exposed by DStream is updateStateByKey that helps us maintain state ,

Function specified in updateStateByKey is applied to all the existing keys in every batch, regardless of whether they have a new data or not, if the update function return null, the key value will be eliminated

The function updates the prevoious state of all the key with the new value if present for example


Batch 1 = 1,1,1 are the values we get in Streaming engine after reducing these by key we will get (1,3) as output
Batch 2 = 2,3 are the values we get in streaming engine output (1,3)(2,1)(3,1)
Batch 3 =1 streaming engine output (1,4)(2,1)(3,1)

def updateFunction(newValues: Seq[(Int)], runningCount: Option[(Int)]): Option[(Int)] = {

var result: Option[(Int)] = null
if(newValues.isEmpty){ //check if the key is present in new batch if not then return the old values
newValues.foreach { x => {// if we have keys in new batch ,iterate over them and add it
result=Some(x)// if no previous value return the new one
result=Some(x+runningCount.get) // update and return the value
} }

// call it as
val reducedRDD=keyValuelines.reduceByKey((x,y)=>(x+y))
val updatedRdd= reducedRDD.updateStateByKey(updateFunction)

Understanding Apache Spark’s Fold operation

As described on Apache Spark’s website the fold definition states

“Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.”

Thus fold operates by folding first each element of partition and then the result of partition
it is partition dependent

let’s examine the below code

JavaRDD jrdd=jsc.parallelize(Arrays.asList("FirstFile"));
String resultfold = jrdd.fold("0",new Function2() {

public String call(String arg0, String arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println(" the arg0 .. >> "+arg0);
System.out.println(" the arg1 .. >> "+arg1);
return arg0+","+arg1;

gives output

arg0 is assigned with value 0 initially for a partition
arg1 with value FirstFile
resulting in 0,FirstFile –> as aggregation for one partition
In more than one core machine when number of threads given to spark are more than 1 the above RDD will have more than one partition since an empty partition ends up folding down to zero element we will get bepw values for empty partitions
arg0 is 0
arg1 is 0
result is 0,0

Once each partition is aggregated the result of all the partitions will be aggregated using the same function and supplied value resulting in

The above result may wary each time we execute it depending on which partition gets executed first

Let’s re-partition the above rdd and set the number of partition count to 1 explicitly

JavaRDD jdpar = jrdd.repartition(1);
String resultfold = jdpar.fold("0",new Function2() {

public String call(String arg0, String arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println(" the arg0 .. >> "+arg0);
System.out.println(" the arg1 .. >> "+arg1);
return arg0+","+arg1;

Since there is a single partition with value FirstFile

1st iteration
arg0 0
arg1 FirstFile

2nd iteration
arg0 0
arg1 0,FirstFile
result 0,0,FirstFile