Category Archives: Hadoop

Get Rapid with RAPIDS

RAPIDS is a collection of open source libraries to write, deploy and manage data pipelines end-to-end on GPUs. It uses  NVIDIA CUDA® for optimizing compute resources, but exposes parallelism through well known Python interfaces.

The Focus of this post is not to share the details for RAPIDS  but to detail steps to get started with it without many difficulties. The RAPIDS team has done a great job in compiling the Startup Guide. But, if you are someone like me who is very new to the world of GPU’s but got some decent experience in designing data pipelines then this post will help you very much in getting up and running using AWS platform .

These are the prerequisites mentioned on the Startup Guide.

Container Host Prerequisites

  • NVIDIA Pascal™ GPU architecture or better
  • CUDA 9.2 or 10.0 compatible nvidia driver
  • Ubuntu 16.04 or 18.04
  • Docker CE v18+
  • nvidia-docker v2+

Well,  I was not aware of most of these requirements and what they mean when I first started working on RAPIDS . To make it easy I have compiled these below steps –
Note:- You need to have AWS account in place for this, GCP and other cloud providers also provides support for machines required for RAPIDS but for the sake of this post I have selected AWS

Step 1) In AWS console launch an instance with ami Deep Learning Base AMI (Amazon Linux) Version 16.2 (ami-038f5aa6f8673b785) .

Step2) Once you have selected the AMI, make sure to choose GPU instances in filter-by, else you won’t be able to run the docker image of RAPIDS as the NVIDIA CUDA®  framework requires GPU in place

AWS_GPUMachine
Step3) Once the instance is up and running

      •  Install docker
         yum install docker
      •  once docker is installed you need to download the image for RAPIDS  from docker repository, there are many other places where you can find the image , follow Startup Guide for more details on this. Run below command to download RAPIDS  image
        $ docker pull nvcr.io/nvidia/rapidsai/rapidsai:latest
        
      • once the image is downloaded run below command to start RAPIDS container
        $ docker run --runtime=nvidia \
                        --rm -it \
                        -p 8888:8888 \
                        -p 8787:8787 \
                        -p 8786:8786 \
                        -p 8889:8889 \
                        nvcr.io/nvidia/rapidsai/rapidsai:latest

        Note the port mentioned in this command it is required that the jupyter gets started on of the mentioned port else you won’t be able to access the notebook and will get error

         File "/conda/envs/gdf/lib/python3.5/site-packages/tornado/netutil.py", line 168, in bind_sockets
            sock.bind(sockaddr)
        OSError: [Errno 99] Cannot assign requested address
        

        It is mentioned in Startup Guide that the above command will start jupyter but it was not the case with me . I had to start the jupyter separately. If, this happens with you too use below command to start the jupyter

        jupyter notebook --ip=0.0.0.0  --port=8889 --allow-root

         

And YA!! you got the jupyter notebook running which using RAPIDS to perform ETL and many other transformations. Follow Startup Guide for all ETL operations as the intent for this post was just to get RAPIDS  up and running using docker image.

For reference here is the cheat sheet for RAPIDS

Lessons learned hard way with Elastic Upgrade

In this blog post i want to share my experience and lessons learned while upgrading our ElasticSearch cluster from 5.4 to 5.6 with TB’s of data without any downtime.
To start, i would first like to introduce the play ground and rules that we were required to take care while performing cluster wide upgrade.

  • Indexing and Searching should not get impacted, at time of upgrade we were handling indexing @=~ 20k/sec and search rate @=~ 2500/sec
  • Watcher/alerts should always be running as they provide us valuable actionable insight to TB’s of data we host
  • Security of cluster should not be compramised
  • Plugins/visualizations and dashboards are all expected to function as they were before upgrade

As it was a minor/ rolling upgrade we started with one node at a time things were simple when we moved from one DataNode to other DataNode keeping check of below points

  • Stop cluster wide allocation of shards. As when we stop one of the nodes for upgrading the ES version, Master will mark some primary and replica shards as missing this will lead to assignment of shards, but we don’t want this to happen. Node is going to join back the cluster in some time.

    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "none"
    }
    }
  • Stopping a node will bring the cluster in Yellow state. Wait for the cluster to change color to green before jumping to next node. Each time you bring the node back after upgrading ES version, set the allocation to true.

    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "all"
    }
    }
  • There is always a possibility that even after setting the allocation true . ES will take lot of time to allocate shards and turn the status to green. If that is the case look for the setting

     "index.unassigned.node_left.delayed_timeout":

    . You may require to alter this setting to enable ES assign shards

  • One of the pain points while doing rolling upgrade with ES is , for some time till the upgrade is over you will have nodes running multiple version of x-pack plugin, this will make it impossible to track the progress from Kibana. Your best bet is to rely on Rest API end points. For instance let’s assume a cluster with 2 client nodes on which kibana is running and rest other 5 nodes acting as datanode and masternode. If we upgrade our datanode1. It’s X-pack plugin version will differ from the client node, as client node is not yet upgraded and even kibana is not yet upgraded. This will prevent the datanode1 to send monitoring data points to kibana and thus making it impossible to visualize progress on kibana UI 
  • One natural instinct is to upgrade kibana first and rest of the nodes to avoid any problem of monitoring, but it’s is a vicious circle, which is obvious. Kibana 5.6 will not start until all the nodes are at 5.6. Kibana 5.4 can work with some nodes at 5.4 and some at 5.6 but it won’t be able to display monitoring for all the nodes that are at 5.6
  • Another pain point is watcher. Watcher runs on Master node. Always,i would repeat always remember to upgrade the master nodes at end. If you upgrade the master nodes first while your some of the data nodes are running on 5.4 your watchers will fail. Even they will keep failing until you have upgraded kibana.
  • As the watchers uses .monitoring indexes ,all the watchers will fail even if you have upgraded all your client,data and master nodes until and unless kibana is also not upgraded.

Understanding Co-partitions and Co-Grouping In Spark

The RDD’s in spark are partitioned, using Hash Partitioner by default. Co-partitioned RDD’s uses same partitioner and thus have their data distributed across partitions in same manner.

val data = Array(1, 2, 3, 4, 5)
val rdd1= sc.parallelize(data,10)
val data2 = Array(5,8,9,10,2)
rdd2=sc.parallelize(data2,10)

In both of the above defined RDD’s ,same partitioner is used i.e HashPartitioner. HashPartitioner will partition the data in the same way for both RDD’s,same data values in two different RDD will give same Hashvalue. As the number of partitiones specified is also same. These co-partitioned RDD’s reduces the shuffling in network to a great extent. As all the keys required for keyBy transformations will be present in two same partitions of two different RDD’s.

Co-grouping utilizes concept of Co-Partitioning to provide efficient performance improvement when multiple RDD’s are to be joined, over using join again and again. As with every join operation the destination RDD will either have supplied or default value of partitions and the join may or may not require shuffling of two RDD’s that are to be joined based on, if they were co-partitioned and had same number of partitions.

rdd3=rdd1.join(rdd2)

Since rdd1 and rdd2 used same partitioner and also had same number of partitions, the join operation that produces rdd3 will not require any shuffle. But if rdd1 and rdd2 had different number of partitions than the content of rdd with small number of partitions would have been reshuffled.Since number of partitions are not specified, the will depend on default configuration.

Performing another join using rdd3 and rdd4 to create rdd5 will lead to chances of more shuffling. All these shuffling and expensive operations can be avoided by using cogroup when we have multiple RDD’s to be joined.

rdd5=rdd1.cogroup(rdd2,rdd3)

As the cogroup will create co-partitioned RDD’s

Producing and Consuming Avro Messages with Kafka

Apache Avro is a data serialization system which relies on schema for serializing and deserializing the objets, but the interesting part is we can use different schema to serialize and deserialize the same object.This allows us to decouple the system that is producing the message from the one that is consuming these messages.

Avro schema is defined using json and it supports wide range of data types and collections including all primitive types (int,string,float,double,long)

The advantage of using Avro with Kafka is realized while creating a system bus, where we want to decouple the producers from consumers, providing the producer with freedom to change schema without breaking the system as Avro will take care of being backward compatible.

The following defines a employee schema with FirstName(fname), LastName(lname) being string type and Salary(sal) as int

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" },
{ "name": "sal", "type": "int" }
],
"name": "employee",
"type": "record"
}

The beauty of Avro is we can deserialize the object serialized using above schema using

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" }
],
"name": "employee",
"type": "record"
}

As we have deleted the Salary part from schema this is not going to break our system, but it will be handled internally by the Avro framework.

We will be using Bijection API by twitter to serialize and deserialize our Avro objects using above defined Avro schema in json format


public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"employee\","
+ "\"fields\":["
+ " { \"name\":\"fname\", \"type\":\"string\" },"
+ " { \"name\":\"lname\", \"type\":\"string\" },"
+ " { \"name\":\"sal\", \"type\":\"int\" }"
+ "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

We need to have a Injection object to seralize/deserialize using defined schema.

Injection recordInjection = GenericAvroCodecs.toBinary(schema);

Now we can create a record using Dynamic typing feature of Avro without generating the code using GenericData abstraction
To Serialize the object using defined schema

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("fname", "James");
avroRecord.put("lname", "Bond");
avroRecord.put("sal", 4200);

byte[] bytes = recordInjection.apply(record);

To deserialize the object


GenericRecord record = recordInjection.invert(avroRecord.value()).get();
String firstName=record.get("fname")

Once we have byte[] representation of our Avro object we can send the it as a message/record to Kafka without changing anything, for kafka every message is just some random bytes

You can find the code for sending and receiving Avro messages on kafka at GitHub

Nifi+Twitter+Kafka+ElasticSearch+HBase=DataPipeline

Apache Nifi is one of the best tool to go with if you want to create Data Pipeline’s that are reliable and very easy to share.It provides around 135 out of the box Processors ready to use be it Kafka,AMQ,HBase,HDFs,Solr,Elasticsearch ,while providing single interface to implement your own custom processor.

Nifi was designed and built to automate the flow of data between the systems.Lot of things are happening in IOT space and one of the major challenge is to capture and route the data i.e to create reliable Pipeline to ensure data is not lost ,delivered on time with efficiency ,Nifi is the answer

In this post we will see how to Integrate Nifi to help us capture Tweet containing particular word/phrase store it on a queue , index it onto ElasticSearch and store it on HBase
Requirement is to capture all the tweets containing keyword “Hadoop” on tweeter ,index the tweets on ElasticSearch and store it in HBase(It is not a good idea to index all the things but this post is more towards sharing the potential Nifi promises to provide in Big Data Space)
Step 1) Capture Tweet
Nifi ships with GetTwitter Processor for capturing a tweet containing particular word . To use this processor you need to register yourself as twitter developer at https://dev.twitter.com/ for obtaining Consumer/Access keys .Once obtained configure following properties of GetTwitterProcessor

  • Twitter EndPoint -> Filter Endpoint
  • Consumer Key -> Your consumer key
  • Consumer Secret -> Your Consumer Secret
  • Access Token -> Your Access Token
  • Access Token Secret -> Your Access Token Secret
  • Terms to Filter on -> “Hadoop”

Step2) Land Tweet To Kafka
If Twitter is captured successfully route it to kafka using PutKafka Processor ,which again ships with Nifi providing an easy and convenient way to route all the messages on to a kafka topic and also providing facility to configure the number of partitions and key for kafka messages.Properties to Configure

  • KafkaBrokers -> host:port
  • Topic Name -> Topic on which to send messages

Step3)Index and Store
On successful receipt of message we need to index it on ElasticSearch and Store it on HBase in parallel .Nifi again ships with two processors to achieve this task .PutElasticSearch and PutHbaseJSon
Configure PutElasticSearch to point to your ElasticSearch cluster and with index name and type of document to be indexed with.Properties to configure

  • Cluster Name -> ES Cluster Name
  • ElasticSearch Hosts -> host:9093
  • Index -> Index name on which to index
  • Type -> Document Type

PutHbaseJson converts all the attributes/properties in json to column qualifier with their value as value of that specific cell .Point it to your Hbase and specify the table name and column family to use using properties

  • TableName -> Table Name on which to insert
  • Column Family -> ColumnFamily in that table
  • HBaseClientService -> You need to enable this service by providing Zookeeper Quorum and Node of hbase(/hbase or /hbase-unsecure with Hortonworks) ,this service is responsible for integrating with HBase

With every Processor you have an option of specifying the batch size to insert/index .With every relation you create a queue that hold the message for the time it is configured and not entertained by end service of relationship

Data Pipeline

Nifi + Tweet Pipeline