Producing and Consuming Avro Messages with Kafka

Apache Avro is a data serialization system which relies on schema for serializing and deserializing the objets, but the interesting part is we can use different schema to serialize and deserialize the same object.This allows us to decouple the system that is producing the message from the one that is consuming these messages.

Avro schema is defined using json and it supports wide range of data types and collections including all primitive types (int,string,float,double,long)

The advantage of using Avro with Kafka is realized while creating a system bus, where we want to decouple the producers from consumers, providing the producer with freedom to change schema without breaking the system as Avro will take care of being backward compatible.

The following defines a employee schema with FirstName(fname), LastName(lname) being string type and Salary(sal) as int

"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" },
{ "name": "sal", "type": "int" }
"name": "employee",
"type": "record"

The beauty of Avro is we can deserialize the object serialized using above schema using

"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" }
"name": "employee",
"type": "record"

As we have deleted the Salary part from schema this is not going to break our system, but it will be handled internally by the Avro framework.

We will be using Bijection API by twitter to serialize and deserialize our Avro objects using above defined Avro schema in json format

public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"employee\","
+ "\"fields\":["
+ " { \"name\":\"fname\", \"type\":\"string\" },"
+ " { \"name\":\"lname\", \"type\":\"string\" },"
+ " { \"name\":\"sal\", \"type\":\"int\" }"
+ "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

We need to have a Injection object to seralize/deserialize using defined schema.

Injection recordInjection = GenericAvroCodecs.toBinary(schema);

Now we can create a record using Dynamic typing feature of Avro without generating the code using GenericData abstraction
To Serialize the object using defined schema

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("fname", "James");
avroRecord.put("lname", "Bond");
avroRecord.put("sal", 4200);

byte[] bytes = recordInjection.apply(record);

To deserialize the object

GenericRecord record = recordInjection.invert(avroRecord.value()).get();
String firstName=record.get("fname")

Once we have byte[] representation of our Avro object we can send the it as a message/record to Kafka without changing anything, for kafka every message is just some random bytes

You can find the code for sending and receiving Avro messages on kafka at GitHub

1 thought on “Producing and Consuming Avro Messages with Kafka

Leave a Reply

Your email address will not be published.