Producing and Consuming Avro Messages with Kafka

Apache Avro is a data serialization system which relies on schema for serializing and deserializing the objets, but the interesting part is we can use different schema to serialize and deserialize the same object.This allows us to decouple the system that is producing the message from the one that is consuming these messages.

Avro schema is defined using json and it supports wide range of data types and collections including all primitive types (int,string,float,double,long)

The advantage of using Avro with Kafka is realized while creating a system bus, where we want to decouple the producers from consumers, providing the producer with freedom to change schema without breaking the system as Avro will take care of being backward compatible.

The following defines a employee schema with FirstName(fname), LastName(lname) being string type and Salary(sal) as int

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" },
{ "name": "sal", "type": "int" }
],
"name": "employee",
"type": "record"
}

The beauty of Avro is we can deserialize the object serialized using above schema using

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" }
],
"name": "employee",
"type": "record"
}

As we have deleted the Salary part from schema this is not going to break our system, but it will be handled internally by the Avro framework.

We will be using Bijection API by twitter to serialize and deserialize our Avro objects using above defined Avro schema in json format


public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"employee\","
+ "\"fields\":["
+ " { \"name\":\"fname\", \"type\":\"string\" },"
+ " { \"name\":\"lname\", \"type\":\"string\" },"
+ " { \"name\":\"sal\", \"type\":\"int\" }"
+ "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

We need to have a Injection object to seralize/deserialize using defined schema.

Injection recordInjection = GenericAvroCodecs.toBinary(schema);

Now we can create a record using Dynamic typing feature of Avro without generating the code using GenericData abstraction
To Serialize the object using defined schema

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("fname", "James");
avroRecord.put("lname", "Bond");
avroRecord.put("sal", 4200);

byte[] bytes = recordInjection.apply(record);

To deserialize the object


GenericRecord record = recordInjection.invert(avroRecord.value()).get();
String firstName=record.get("fname")

Once we have byte[] representation of our Avro object we can send the it as a message/record to Kafka without changing anything, for kafka every message is just some random bytes

You can find the code for sending and receiving Avro messages on kafka at GitHub

Using nconf to manage configuration

Managing configuration is one of the most important part of software release. we always have different production,development and quality environment having flexibility to point to any of these environment with no or very less change in the source code is highly desirable feature. I was looking for a solution that can help me achieve this at same point does not break my code if i decide to change the underline configuration file location.

nconf is a nodejs module that helps in organizing configuration quickly by providing integration with environment variables,arguments and a configuration file (config.json). Depending on the order in which we have configured the sources they take precedence over each other.

This blog post focuses on using nconf with a configuration file and also reading arguments from command line using argv
Our config.json looks like


{
"DATABASE_HOST": "10.10.2.3",
"DATABASE_PORT": "8080"
}

We declared two properties in config.json. To read these properties in our .js file we need to first require the nconf module and then add the config file. Once done we can access the key/value pairs using get,set,remove methods

var nconf=require('nconf');
var fs=require('fs');
var nconfargv=nconf.argv();
//
// Setup nconf to use the 'file' store and set a couple of values;
//
nconf.add('file', { file: 'config.json' });
//
// overrides the database host value
//
nconf.set('DATABASE_HOST','10.10.2.3');
//
// Save the configuration object to disk
//
nconf.save(function (err) {
fs.readFile('config.json', function (err, data) {
// console.dir(JSON.parse(data.toString()))
});
});
console.log(nconfargv.get('DATABASE_HOST'));

Running the above code using

node .js

results in 10.10.2.3 printed on console. As we have also used argv() with nconf to get the reference of command line arguments passed. If we run our App using

node .js --DATABASE_HOST localhost

we get localhost printed on console.It is that easy to switch between different environments and different configuration options available using nconf.

Demystifying Supply Chain management with Neo4j

Supply Chain management is very complex topic in itself, rather than focusing on details relating to Supply Chain management this blog post targets modelling it using Neo4j (a Graph Database) to answer questions like
1)Distance between retailer and wholesaler.
2)Best route to get a product involving raw manufacturer, supplier, wholesaler and retailer.

It’s natural to visualize entire supply chain as a big graph with different nodes/vertices representing sellers and products.
SupplyChain
We have different suppliers,products,wholesalers and retailers in our supply chain each represented by a vertex/node in graph (Refer to the image above)
1)All Suppliers are connected to Product they supply
2)All Products are connected to Wholesalers.
3)All Retailers are connected to Wholesalers for specific product they sale.

Graph Database are very natural choice for this kind of data, as all the data is interrelated to each other with some relationship and Graph DBs store these relationships as in , like pointers on disk avoiding any lookup and disk seek. Modelling these kind of data in any Relational Database or NOSql will result in either joins or lookup to be performed while traversing.For this blog post we will be modelling data around Neo4j a Graph Database.
Neo4j Supports property graph – Graph with any number of vertices and edges each can have any number of properties (key / value pairs)

In our example we have Supplier with properties
Latitude
Longitude
TimeToSupplyProduct
SupplierID

and relationship Delivers with property
km

It’s natural to have suppliers across different geographic locations, represented by Latitude and Longitude in graph and connected to wholesalers by product that supplier is supplying.

For this blog post what important is the distance between supplier, product and wholesaler and then to retailer.
If we look in above diagram each supplier is connected to product they are supplying which are then connected to Wholesalers and Retailers by relationship Delivers with km as one of the property calculated using Haversine formula representing distance between their geographic locations.

Now we can easily traverse graph to calculate shortest path, time between different suppliers and retailers for different products.

If a Wholesaler wants to know the nearest supplier for a specific products, we need to start with that product find all suppliers supplying it and then sort by the distance between supplier and Wholeselar which is sum of
Distance b/w Supplier and product + Distance between Product and Wholeselar

Happy reading … ☺

Microservice Registration and Discovery with Spring Cloud

Microservices are at center while designing any application with multiple parts which are required to be scaled horizontally to serve growing demand. It helps breaking system into small independent components making it easier to manage/develop and scale. But before deploying the architecture we need to answer few questions
How these microservices will discover each other?
How we will ensure High Availability of these microservices?
How our end service dealing with user will consume these microservices?

How these microservices will discover each other?
We need a central location where these services running on different machines can register themselves and declare the ip address and port at which they are running for example we have an addition service running on ip address 192.2.3.4 and port 8080 this service will register itself to some central location with name additionservice and ip, port

Name IP/Port
additionservice 192.2.3.4:8080

Any other service who wants to perform addition will discover details of addition service by looking up to this central repository using service name additionservice

Spring Cloud Netflix provides many different components to perform service discovery , load balancing and handling failures of these services
Eureka is the Netflix Service Discovery Server and Client and act as central location for service registry every service in our ecosystem will register it self with Eureka along with some meta data information such as ip and port
If we scale our micorservice by launching one more instance to run on some different ip say 192.2.3.5 and port 8080. We will have entry in our Eureka Server as

Name IP/Port No.Ofinstances
additionservice 192.2.3.4:8080,192.2.3.5:8080 2

We can deploy some smart load balancing at client side so when it discovers the service endpoints from our service discovery Eureka Server gets two ip addresses and smartly distributes the load across these endpoints

Having multiple instances of service makes our service Highly Available and at same point by introducing the concept of central service registry it decouples client from having know how of where the services are deployed and running even if we change the location of these services at run time it won’t impact client consuming our services via Eureka.

Eureka Server does not have its own back-end and the service instances have to send heartbeats to server to keep their registrations up to date. Clients also have an in-memory cache of eureka, thus avoiding trip to server for every request.

There is concept of zones in Eureka Server stay tuned for updates on them !! ☺
Follow below Github repo to get code for EurekaServer, EurekaClient and EurkaConsumer
Eureka

Boot Says Hello To Azure

Microsoft Azure is the new and big kid in Cloud Computing. I thought to get hands-on in deploying spring boot web service to Microsoft Azure. This blog post is not aimed to brief about how to develop a spring boot app/service rather focus is on knowing how to deploy a spring boot app on Azure.

We need to create an Web-App once we have our account created on Azure. Follow below snapshot for creating same on Azure

Azure

Once created we have to configure our Web-App environment according to our App need, like selecting the Java version, server.
Azure provides many different ways to deploy the App. We can do it from Azure Plugin for Eclipse / Intellij or directly from GitHub but for this exercise let’s stick to the most basic of all, uploading our spring boot app jar using FTP.

To load the jar via FTP we need to configure FTP user, this can be done easily by going in deployment credentials in deployed Web-App Service.

As Spring boot uses embedded Tomcat sever, we have to tell Azure Web App Service about this, to do this an extra file (web.config) is required that details the jar location and port to be selected at random.The web.config file goes into the wwwroot folder.web.config

Once you are connected to remote FTP server you will see wwwroot folder as in below image

FileZilla

In our case we have uploaded the Jar and Web.Config in same wwwroot folder. Once done we are good to go and access our Spring Boot saying “Hello” to Azure.