Tag Archives: Kafka

Producing and Consuming Avro Messages with Kafka

Apache Avro is a data serialization system which relies on schema for serializing and deserializing the objets, but the interesting part is we can use different schema to serialize and deserialize the same object.This allows us to decouple the system that is producing the message from the one that is consuming these messages.

Avro schema is defined using json and it supports wide range of data types and collections including all primitive types (int,string,float,double,long)

The advantage of using Avro with Kafka is realized while creating a system bus, where we want to decouple the producers from consumers, providing the producer with freedom to change schema without breaking the system as Avro will take care of being backward compatible.

The following defines a employee schema with FirstName(fname), LastName(lname) being string type and Salary(sal) as int

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" },
{ "name": "sal", "type": "int" }
],
"name": "employee",
"type": "record"
}

The beauty of Avro is we can deserialize the object serialized using above schema using

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" }
],
"name": "employee",
"type": "record"
}

As we have deleted the Salary part from schema this is not going to break our system, but it will be handled internally by the Avro framework.

We will be using Bijection API by twitter to serialize and deserialize our Avro objects using above defined Avro schema in json format


public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"employee\","
+ "\"fields\":["
+ " { \"name\":\"fname\", \"type\":\"string\" },"
+ " { \"name\":\"lname\", \"type\":\"string\" },"
+ " { \"name\":\"sal\", \"type\":\"int\" }"
+ "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

We need to have a Injection object to seralize/deserialize using defined schema.

Injection recordInjection = GenericAvroCodecs.toBinary(schema);

Now we can create a record using Dynamic typing feature of Avro without generating the code using GenericData abstraction
To Serialize the object using defined schema

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("fname", "James");
avroRecord.put("lname", "Bond");
avroRecord.put("sal", 4200);

byte[] bytes = recordInjection.apply(record);

To deserialize the object


GenericRecord record = recordInjection.invert(avroRecord.value()).get();
String firstName=record.get("fname")

Once we have byte[] representation of our Avro object we can send the it as a message/record to Kafka without changing anything, for kafka every message is just some random bytes

You can find the code for sending and receiving Avro messages on kafka at GitHub

Nifi+Twitter+Kafka+ElasticSearch+HBase=DataPipeline

Apache Nifi is one of the best tool to go with if you want to create Data Pipeline’s that are reliable and very easy to share.It provides around 135 out of the box Processors ready to use be it Kafka,AMQ,HBase,HDFs,Solr,Elasticsearch ,while providing single interface to implement your own custom processor.

Nifi was designed and built to automate the flow of data between the systems.Lot of things are happening in IOT space and one of the major challenge is to capture and route the data i.e to create reliable Pipeline to ensure data is not lost ,delivered on time with efficiency ,Nifi is the answer

In this post we will see how to Integrate Nifi to help us capture Tweet containing particular word/phrase store it on a queue , index it onto ElasticSearch and store it on HBase
Requirement is to capture all the tweets containing keyword “Hadoop” on tweeter ,index the tweets on ElasticSearch and store it in HBase(It is not a good idea to index all the things but this post is more towards sharing the potential Nifi promises to provide in Big Data Space)
Step 1) Capture Tweet
Nifi ships with GetTwitter Processor for capturing a tweet containing particular word . To use this processor you need to register yourself as twitter developer at https://dev.twitter.com/ for obtaining Consumer/Access keys .Once obtained configure following properties of GetTwitterProcessor

  • Twitter EndPoint -> Filter Endpoint
  • Consumer Key -> Your consumer key
  • Consumer Secret -> Your Consumer Secret
  • Access Token -> Your Access Token
  • Access Token Secret -> Your Access Token Secret
  • Terms to Filter on -> “Hadoop”

Step2) Land Tweet To Kafka
If Twitter is captured successfully route it to kafka using PutKafka Processor ,which again ships with Nifi providing an easy and convenient way to route all the messages on to a kafka topic and also providing facility to configure the number of partitions and key for kafka messages.Properties to Configure

  • KafkaBrokers -> host:port
  • Topic Name -> Topic on which to send messages

Step3)Index and Store
On successful receipt of message we need to index it on ElasticSearch and Store it on HBase in parallel .Nifi again ships with two processors to achieve this task .PutElasticSearch and PutHbaseJSon
Configure PutElasticSearch to point to your ElasticSearch cluster and with index name and type of document to be indexed with.Properties to configure

  • Cluster Name -> ES Cluster Name
  • ElasticSearch Hosts -> host:9093
  • Index -> Index name on which to index
  • Type -> Document Type

PutHbaseJson converts all the attributes/properties in json to column qualifier with their value as value of that specific cell .Point it to your Hbase and specify the table name and column family to use using properties

  • TableName -> Table Name on which to insert
  • Column Family -> ColumnFamily in that table
  • HBaseClientService -> You need to enable this service by providing Zookeeper Quorum and Node of hbase(/hbase or /hbase-unsecure with Hortonworks) ,this service is responsible for integrating with HBase

With every Processor you have an option of specifying the batch size to insert/index .With every relation you create a queue that hold the message for the time it is configured and not entertained by end service of relationship

Data Pipeline

Nifi + Tweet Pipeline