Tag Archives: Hadoop

Writing Spark Data Frame to HBase

Community behind Spark has made lot of effort’s to make DataFrame Api’s very efficient and scalable. Reading and writing data, to and, from HBase to Spark DataFrame, bridges the gap between complex sql queries that can be performed on spark to that with Key- value store pattern of HBase. The shc connector implements the standard Spark Datasource API, and leverages the Spark Catalyst engine for query optimization.

To map a table in HBase with the table in Spark , we define a Table catalog.Which stores the mapping of keys, column qualifier and column family in HBase with that of table columns in spark.

In order to work more efficiently and making sure not to retrieve unwanted data from region servers.shc connector supports predicate pushdown where the filter conditions are pushed to data as close as possible i.e regionserver in case of HBase.

Support for partition pruning splits the Scan/BulkGet into multiple non-overlapping ranges, only the region servers that has the requested data will perform Scan/BulkGet.

Specifying conditions like Where x >y or Where x

session.sql('Select * from sparkHBase table where x>14567 and x<14568')

will result in a scan operation on HBase with key range between 14567 and 14568

timestamp temp pressure
1501844115 24 760
1501844125 28 800

The above table presented by Spark DataFrame can be saved to HBase by providing the mapping for key, column qualifiers, column name in HBase

def catalog = s"""{
|"table":{"namespace":"default", "name":"ToolLogs"},
|"timestamp":{"cf":"rowkey", "col":"key", "type":"long"},
|"temp":{"cf":"msmt", "col":"temp", "type":"float"},
|"pressure":{"cf":"msmt", "col":"pressure", "type":"float"}

specifying “cf”:”rowkey” for the key column is mandatory though we had msmt as our column family for HBase table,this is how the API is designed to work. Once we have defined the catalog mapping for our Table Catalog, we can store the data in dataframe directly to HBase using

Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))

Happy Reading …. ☺

Producing and Consuming Avro Messages with Kafka

Apache Avro is a data serialization system which relies on schema for serializing and deserializing the objets, but the interesting part is we can use different schema to serialize and deserialize the same object.This allows us to decouple the system that is producing the message from the one that is consuming these messages.

Avro schema is defined using json and it supports wide range of data types and collections including all primitive types (int,string,float,double,long)

The advantage of using Avro with Kafka is realized while creating a system bus, where we want to decouple the producers from consumers, providing the producer with freedom to change schema without breaking the system as Avro will take care of being backward compatible.

The following defines a employee schema with FirstName(fname), LastName(lname) being string type and Salary(sal) as int

"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" },
{ "name": "sal", "type": "int" }
"name": "employee",
"type": "record"

The beauty of Avro is we can deserialize the object serialized using above schema using

"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" }
"name": "employee",
"type": "record"

As we have deleted the Salary part from schema this is not going to break our system, but it will be handled internally by the Avro framework.

We will be using Bijection API by twitter to serialize and deserialize our Avro objects using above defined Avro schema in json format

public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"employee\","
+ "\"fields\":["
+ " { \"name\":\"fname\", \"type\":\"string\" },"
+ " { \"name\":\"lname\", \"type\":\"string\" },"
+ " { \"name\":\"sal\", \"type\":\"int\" }"
+ "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

We need to have a Injection object to seralize/deserialize using defined schema.

Injection recordInjection = GenericAvroCodecs.toBinary(schema);

Now we can create a record using Dynamic typing feature of Avro without generating the code using GenericData abstraction
To Serialize the object using defined schema

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("fname", "James");
avroRecord.put("lname", "Bond");
avroRecord.put("sal", 4200);

byte[] bytes = recordInjection.apply(record);

To deserialize the object

GenericRecord record = recordInjection.invert(avroRecord.value()).get();
String firstName=record.get("fname")

Once we have byte[] representation of our Avro object we can send the it as a message/record to Kafka without changing anything, for kafka every message is just some random bytes

You can find the code for sending and receiving Avro messages on kafka at GitHub

Hadoop Map Side Joins

Joins done at mapper side are known as MapSide join while the joins done at reducer side are known as ReduceSide join

Map Side join
*) The two data set to be joined must be sorted based on the same key
*)The two data set to be joined must have same number of partitions
all the keys for any record should be in the same partition
*)Out of the two dataset one must be small enough to fir in memory

If in case the data-sets are not sorted based on the same key we can run a marker Hadoop job that just output’s the field on which
sorting is to be done as the key and by specifying the exact same number of reducer for all the data sets we will have over data ready for
a Map Side join

While performing the map side join the records are merged before they reach the mapper. We will use CompositeInputFormat with the following
configuration to specify
1) Separator for separating the keys and value
2) Join that we will be doing(Inner,Outer…)

Configuration config = new Configuration();
config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);//Separator
String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);//Key format and paths of file to join
config.set("mapred.join.expr", joinExpression);//join to be done (inner/outer...)

Once the join is done the mapper is called it will receive the values in Text that contains the key and TupleWritable that is composed of the values joined from our input files for a given key.

Map Reduce Shuffle Sort Phase (Part 2)

This post is in the continuation with the Map Reduce Shuffle Sot Phase (Part 1),so it is clear that the mapper passes (K,V) key/value pair to reducer since sorting,grouping and
partitioning took place in Sort Shuffle Phase the basic requirement is
to have a way to compare the keys in order to sort,partition and group
them for this purpose the key class must implement
WritableComparable interface and value class should
be of a Writable type.

The method that is being called is

public int compareTo(WritableComparable o){}

Based on the result of this method sorting is done by default the sorting is done on compareTo method of the key class

Other than sorting, shuffle-sort phase is also used to distribute the mapper values amongst different reducers. This is done by partitioning the key values. Partitioner decides which {K, V} pair should be assigned to which reducer for processing. It is implemented by using a subclass of the Partitioner class. This class has a method:

public int getPartitions(K key, V value,int numReduceTasks) {}

While partitioning decides the reduce task, grouping decides which {K, V} pairs should be clubbed up in one reduce call. Each reduce task’s data is divided into various groups and reduce () method is called for each group (multiple reduce method calls may be made within one reduce task. Number of groups = number of reduce calls)

To make surethat the same keys are grouped in one group,The group comparator class should extend from WritableComparator class and override its compareTo() method. While grouping, {K, V} pairs are considered to be in the same group till the compareTo() call returns a 0. A new group is formed once a non-zero value is returned from the group comparator. This also implies that the keys should be sorted prior to grouping (which is already taken care in the sort phase); else grouping may emit incorrect results. By default, the key class’ compareTo() that was used for sorting is also used for group comparison.

Map Reduce Sort Shuffle Phase (Part 1)

Shuffle and Sort phase is a hand off process happens after the map completes and before reduce phase begins.

Data from mapper are moved to nodes where the reduce task will be run.When the mapper task completes the o/p is sorted and partitioned according to the number of reduce tasks defined and then written to disk.Data is made immediately available to reducers as soon as the output is available for a record from a map task rather than waiting for the last map task to complete,Although reducers will have lot of mapper tasks o/p in their memory but they cant execute it until the all mapper tasks are finished.Thus the processing speed will be controlled by the slowest mapper task,to avoid such a scenario Hadoop implements speculative execution concept

Cases where a mapper task is running slower than a reasonable amount of time the application master(Jobtracker ) will spawn a duplicate mapper tasks ,whichever task is finished first the o/p is stored on disk and the other is killed

The o/p’s are stored on local disk of the nodes where the mapper tasks were running not on HDFS