Category Archives: Hadoop

ImageSearch with ElasticSearch

ElasticSearch+LIRE (Lucene Image REtrieval) is all that you need to perform image searching and suggesting best matched image.It is a very interesting use case and I am very excited to get my Hands dirty with this feature

LIRE (Lucene Image REtrieval) is a plugin for Lucene to index and search images.It does content based ,give image as input and get all the matching images as output.LIRE supports many different features LIRE FEATURES

Indexing Images
Indexing Images is no different then indexing document in ElasticSearch ,we need to set the image object data type as (image) and value to be the Base64 of image, where Base64 encoding is a transformation that encodes 3 bytes of data into four printable charachters.

You will find a Github link at the end of this article hosting actual code used in this project with ElasticSearch Mapping

To use this feature we need to define our image object as image(data type) in ES mapping

"image": {
"properties": {
"name": {
"type": "string",
"type": "string",
"image": {
"type": "image",
"feature": {
"CEDD": {
"hash": "BIT_SAMPLING"
"JCD": {
"hash": ["BIT_SAMPLING", "LSH"]
"FCTH": {}

We define 3 fields here

  • name -> Name of Image
  • path-> Path where actual image is stored
  • image-> Actual Image (Base64)

Application Flow
We will insert our images in HDFS and will index them on ES while converting them in Base64
For example Say we have Image MickeyMouse.jpg
We will store this MickeyMouse.jpg at location hdfs://:/esimage/MickeyMouse.jpg,and our ES will have



All the queries from UI will be directed to our ES ,in result we will have name and path to be returned .When we want to display actual image we will fetch it from HDFS using path present in result

GitHub EsImageSearch

Hidden Secrets while trying SQL Dataframes

There are few points that one should keep in mind while using DataFrames with SparkSql like when to import sqlcontext.implicits._  and how to deal with No Type Definetion found error

Let’s try to understand it with an example

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
* @author amithora
object SqlContextJsonRDD {
case class Employee(name: String,rank: Int)// note this class declaration if you declare the class inside the method you will get type not found error
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setMaster("local[2]").setAppName("ImplicitExampleSQL")
val sparkContext=new SparkContext(sparkConf)
val sqlContext=new SQLContext(sparkContext)
import sqlContext.implicits._ // note this import if you try to import this after package declaration you will get compilation error this is used to use toDF() method if you don't import this you will not find toDF()
val textRdd=sparkContext.textFile("Person.txt", 2)

val { line =>{
line.split(",") }
}.map { p => Employee(p(0),p(1).trim().toInt)


Spark updateStateByKey Explained!

Spark streaming is one of the great component of spark engine as it applies the computations to all the micro batches that gets created based on batch interval on the fly and also helps in combining our streaming results with the archive result database.

Spark stream introduces concept of DStream(Discretized stream) a Dstream is nothing but streams of RDD where one RDD holds one time of data i.e 1 batch=1 RDD, One of the powerful function exposed by DStream is updateStateByKey that helps us maintain state ,

Function specified in updateStateByKey is applied to all the existing keys in every batch, regardless of whether they have a new data or not, if the update function return null, the key value will be eliminated

The function updates the prevoious state of all the key with the new value if present for example


Batch 1 = 1,1,1 are the values we get in Streaming engine after reducing these by key we will get (1,3) as output
Batch 2 = 2,3 are the values we get in streaming engine output (1,3)(2,1)(3,1)
Batch 3 =1 streaming engine output (1,4)(2,1)(3,1)

def updateFunction(newValues: Seq[(Int)], runningCount: Option[(Int)]): Option[(Int)] = {

var result: Option[(Int)] = null
if(newValues.isEmpty){ //check if the key is present in new batch if not then return the old values
newValues.foreach { x => {// if we have keys in new batch ,iterate over them and add it
result=Some(x)// if no previous value return the new one
result=Some(x+runningCount.get) // update and return the value
} }

// call it as
val reducedRDD=keyValuelines.reduceByKey((x,y)=>(x+y))
val updatedRdd= reducedRDD.updateStateByKey(updateFunction)

HBase Timeline consistent standby region servers

Before HBase 1.0.0 when a machine in a cluster goes down the data is not lost, If it was replicated using HDFS but can leave critical implications.As before HBase 1.0.0 version a single region was served by a single region server for read and write requests if the RegionServer goes down no read and write requests can be entertained until and unless the automatic fail over recovery completes .
But a distributed system relies on time-outs to discover if something has gone wrong and lowering the heart-beats timeouts to a very small value can result in false predictions while the larger timeouts contributing to unavailability of system until the fail over recovery finishes

The good news HBase 1.0.0 supports the concept of a single region being served by two region servers where One of the replicas for the region will be primary, accepting writes, and other replicas will share the same data files. Read requests can be done against any replica for the region with backup

Hadoop Map Side Joins

Joins done at mapper side are known as MapSide join while the joins done at reducer side are known as ReduceSide join

Map Side join
*) The two data set to be joined must be sorted based on the same key
*)The two data set to be joined must have same number of partitions
all the keys for any record should be in the same partition
*)Out of the two dataset one must be small enough to fir in memory

If in case the data-sets are not sorted based on the same key we can run a marker Hadoop job that just output’s the field on which
sorting is to be done as the key and by specifying the exact same number of reducer for all the data sets we will have over data ready for
a Map Side join

While performing the map side join the records are merged before they reach the mapper. We will use CompositeInputFormat with the following
configuration to specify
1) Separator for separating the keys and value
2) Join that we will be doing(Inner,Outer…)

Configuration config = new Configuration();
config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);//Separator
String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);//Key format and paths of file to join
config.set("mapred.join.expr", joinExpression);//join to be done (inner/outer...)

Once the join is done the mapper is called it will receive the values in Text that contains the key and TupleWritable that is composed of the values joined from our input files for a given key.