Community behind Spark has made lot of effort’s to make DataFrame Api’s very efficient and scalable. Reading and writing data, to and, from HBase to Spark DataFrame, bridges the gap between complex sql queries that can be performed on spark to that with Key- value store pattern of HBase. The shc connector implements the standard Spark Datasource API, and leverages the Spark Catalyst engine for query optimization.
To map a table in HBase with the table in Spark , we define a Table catalog.Which stores the mapping of keys, column qualifier and column family in HBase with that of table columns in spark.
In order to work more efficiently and making sure not to retrieve unwanted data from region servers.shc connector supports predicate pushdown where the filter conditions are pushed to data as close as possible i.e regionserver in case of HBase.
Support for partition pruning splits the Scan/BulkGet into multiple non-overlapping ranges, only the region servers that has the requested data will perform Scan/BulkGet.
Specifying conditions like Where x >y or Where x
session.sql('Select * from sparkHBase table where x>14567 and x<14568')
will result in a scan operation on HBase with key range between 14567 and 14568
timestamp | temp | pressure |
---|---|---|
1501844115 | 24 | 760 |
1501844125 | 28 | 800 |
The above table presented by Spark DataFrame can be saved to HBase by providing the mapping for key, column qualifiers, column name in HBase
def catalog = s"""{
|"table":{"namespace":"default", "name":"ToolLogs"},
|"rowkey":"key",
|"columns":{
|"timestamp":{"cf":"rowkey", "col":"key", "type":"long"},
|"temp":{"cf":"msmt", "col":"temp", "type":"float"},
|"pressure":{"cf":"msmt", "col":"pressure", "type":"float"}
|}
|}""".stripMargin
specifying “cf”:”rowkey” for the key column is mandatory though we had msmt as our column family for HBase table,this is how the API is designed to work. Once we have defined the catalog mapping for our Table Catalog, we can store the data in dataframe directly to HBase using
df.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
.format(“org.apache.spark.sql.execution.datasources.hbase”)
.save()
Happy Reading …. ☺