Category Archives: Hbase

Writing Spark Data Frame to HBase

Community behind Spark has made lot of effort’s to make DataFrame Api’s very efficient and scalable. Reading and writing data, to and, from HBase to Spark DataFrame, bridges the gap between complex sql queries that can be performed on spark to that with Key- value store pattern of HBase. The shc connector implements the standard Spark Datasource API, and leverages the Spark Catalyst engine for query optimization.

To map a table in HBase with the table in Spark , we define a Table catalog.Which stores the mapping of keys, column qualifier and column family in HBase with that of table columns in spark.

In order to work more efficiently and making sure not to retrieve unwanted data from region servers.shc connector supports predicate pushdown where the filter conditions are pushed to data as close as possible i.e regionserver in case of HBase.

Support for partition pruning splits the Scan/BulkGet into multiple non-overlapping ranges, only the region servers that has the requested data will perform Scan/BulkGet.

Specifying conditions like Where x >y or Where x

session.sql('Select * from sparkHBase table where x>14567 and x<14568')

will result in a scan operation on HBase with key range between 14567 and 14568

timestamp temp pressure
1501844115 24 760
1501844125 28 800

The above table presented by Spark DataFrame can be saved to HBase by providing the mapping for key, column qualifiers, column name in HBase

def catalog = s"""{
|"table":{"namespace":"default", "name":"ToolLogs"},
|"rowkey":"key",
|"columns":{
|"timestamp":{"cf":"rowkey", "col":"key", "type":"long"},
|"temp":{"cf":"msmt", "col":"temp", "type":"float"},
|"pressure":{"cf":"msmt", "col":"pressure", "type":"float"}
|}
|}""".stripMargin

specifying “cf”:”rowkey” for the key column is mandatory though we had msmt as our column family for HBase table,this is how the API is designed to work. Once we have defined the catalog mapping for our Table Catalog, we can store the data in dataframe directly to HBase using


df.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
.format(“org.apache.spark.sql.execution.datasources.hbase”)
.save()

Happy Reading …. ☺

Nifi+Twitter+Kafka+ElasticSearch+HBase=DataPipeline

Apache Nifi is one of the best tool to go with if you want to create Data Pipeline’s that are reliable and very easy to share.It provides around 135 out of the box Processors ready to use be it Kafka,AMQ,HBase,HDFs,Solr,Elasticsearch ,while providing single interface to implement your own custom processor.

Nifi was designed and built to automate the flow of data between the systems.Lot of things are happening in IOT space and one of the major challenge is to capture and route the data i.e to create reliable Pipeline to ensure data is not lost ,delivered on time with efficiency ,Nifi is the answer

In this post we will see how to Integrate Nifi to help us capture Tweet containing particular word/phrase store it on a queue , index it onto ElasticSearch and store it on HBase
Requirement is to capture all the tweets containing keyword “Hadoop” on tweeter ,index the tweets on ElasticSearch and store it in HBase(It is not a good idea to index all the things but this post is more towards sharing the potential Nifi promises to provide in Big Data Space)
Step 1) Capture Tweet
Nifi ships with GetTwitter Processor for capturing a tweet containing particular word . To use this processor you need to register yourself as twitter developer at https://dev.twitter.com/ for obtaining Consumer/Access keys .Once obtained configure following properties of GetTwitterProcessor

  • Twitter EndPoint -> Filter Endpoint
  • Consumer Key -> Your consumer key
  • Consumer Secret -> Your Consumer Secret
  • Access Token -> Your Access Token
  • Access Token Secret -> Your Access Token Secret
  • Terms to Filter on -> “Hadoop”

Step2) Land Tweet To Kafka
If Twitter is captured successfully route it to kafka using PutKafka Processor ,which again ships with Nifi providing an easy and convenient way to route all the messages on to a kafka topic and also providing facility to configure the number of partitions and key for kafka messages.Properties to Configure

  • KafkaBrokers -> host:port
  • Topic Name -> Topic on which to send messages

Step3)Index and Store
On successful receipt of message we need to index it on ElasticSearch and Store it on HBase in parallel .Nifi again ships with two processors to achieve this task .PutElasticSearch and PutHbaseJSon
Configure PutElasticSearch to point to your ElasticSearch cluster and with index name and type of document to be indexed with.Properties to configure

  • Cluster Name -> ES Cluster Name
  • ElasticSearch Hosts -> host:9093
  • Index -> Index name on which to index
  • Type -> Document Type

PutHbaseJson converts all the attributes/properties in json to column qualifier with their value as value of that specific cell .Point it to your Hbase and specify the table name and column family to use using properties

  • TableName -> Table Name on which to insert
  • Column Family -> ColumnFamily in that table
  • HBaseClientService -> You need to enable this service by providing Zookeeper Quorum and Node of hbase(/hbase or /hbase-unsecure with Hortonworks) ,this service is responsible for integrating with HBase

With every Processor you have an option of specifying the batch size to insert/index .With every relation you create a queue that hold the message for the time it is configured and not entertained by end service of relationship

Data Pipeline

Nifi + Tweet Pipeline

HBase Timeline consistent standby region servers

Before HBase 1.0.0 when a machine in a cluster goes down the data is not lost, If it was replicated using HDFS but can leave critical implications.As before HBase 1.0.0 version a single region was served by a single region server for read and write requests if the RegionServer goes down no read and write requests can be entertained until and unless the automatic fail over recovery completes .
But a distributed system relies on time-outs to discover if something has gone wrong and lowering the heart-beats timeouts to a very small value can result in false predictions while the larger timeouts contributing to unavailability of system until the fail over recovery finishes

The good news HBase 1.0.0 supports the concept of a single region being served by two region servers where One of the replicas for the region will be primary, accepting writes, and other replicas will share the same data files. Read requests can be done against any replica for the region with backup

HBase Master-Slave Architecture Favourable or Unfavourable

HBase is designed on top of HDFS and thus has a Master-Slave architecture same as underlying HDFS.
The HMaster is responsible for Load balancing ,for moving regions from one region-server to another whenever required ,assigning regions on start up and many more operations .

Although the client directly interacts with the HRegionServer for Data but with few thousands node cluster having Single HMaster can be a Single Point of Failure ,one can also create the HMaster cluster by having HMaster in multiple nodes but at any single point in time only 1 HMaster will be active leading to bottle neck.

Designing HBase Schema to Handle TimeSeries Data

In this post I will share the steps , design principles that I have taken to handle time series data in HBase.
Real Problem
In HBase since the keys are Lexicographically sorted ,when we try to insert time-series data all the keys tend to fall in same region served by a single Region Server leading to generation of a Hot-Spot for the time till the region splits into two and during this time we are not utilizing the entire cluster capacity ( as one Region is only served by a single Region Server — no load distribution takes place all keys falls onto same Region/Region Server)

To avoid this problem I made use of the design principles of OpenTSDB
Lets create 5 buckets now the keys will fall in one of these buckets

Original_Key%Bucket+OriginalKey=NewKey

Like if we want to store data from
7-July-3:00 pm 1436261508
7-July-3:01 pm 1436261509
now lets use our formulae

Original Key=1436261508
Bucket=5
NewKey=1436261508 % 5 + 1436261508 = 3 + 1436261508 = 1436261511
when Original Key=1436261509
NewKey=1436261509 % 5 + 1436261509 = 4 +1436261509 = 1436261513

when user will ask for data from T1 to T2
our T1 = T1 % Nu. Of Buckets + T1
our T2 = T2 % Nu. Of Buckets + T2

By this we made sure that even while storing the Time-Series data all keys never ends up on a single Region/Region Server
but we lost our original key in order to preserve it we add the original key separated by some separator as suffix

This will solve HotRegion Server Problem but will lead to small rows (Less wide rows)
To deal with this problem that is to have wide rows we can have the key rounded up till minute granularity and then store the timestamp in seconds as the col qualifier

Minute —> Sec:Header –> Value