Category Archives: Java

Understanding Predicates with JAVA8

In mathematics Predicates are functions that can be either True or False. In JAVA8 Predicates are functional interfaces with only functional method test.
As Predicate is defined as a functional interface in JAVA8 it can be used as the assignment target for a lambda expression or method reference.
we can do boolean operations such as and, or, not(negate) with different instances of Predicate. These default methods are –

Default Method Name Explanation
and() It does logical AND of the predicate on which it is called with another predicate. Example: predicate1.and(predicate2)
or() It does logical OR of the predicate on which it is called with another predicate. Example: predicate1.or(predicate2)
negate() It does boolean negation of the predicate on which it is invoked. Example: predicate1.negate()

Following code uses JAVA8 predicate and replaceIf method, now available in collections, to check from the list of transaction to get only those transaction that has a value more than 2lakh and are done online.

public class UnderstandingRemoveIf {

	static Predicate<Transaction> checkTransactionAmount = new Predicate<Transaction>() {

		@Override
		public boolean test(Transaction transaction) {
			return Math.round(transaction.amount) < 200000;
		}
		
	};
	
	static Predicate<Transaction> isTransactionOnline = new Predicate<Transaction>() {

		@Override
		public boolean test(Transaction transaction) {
			return transaction.transactionMethod!='O';
		}
	};

	public static void main(String[] args) {

		List<Transaction> transactions = new ArrayList<>();
		Transaction transaction1=new Transaction(10000.8d, 'C');
		Transaction transaction2=new Transaction(150000d, 'O');
		Transaction transaction3=new Transaction(300000d, 'O');
		transactions.add(transaction1);
		transactions.add(transaction2);
		transactions.add(transaction3);
		
		// Get all transactions that are done online of more then 2 lakh
		transactions.removeIf(checkTransactionAmount.or(isTransactionOnline));
		System.out.println(transactions);
	}

}

class Transaction{
	
	double amount;
	char   transactionMethod; // O for online 
	public Transaction(double amount,char transactionMethod) {
		this.amount=amount;
		this.transactionMethod=transactionMethod;
	}
	
	@Override
	public String toString() {
		return amount+","+transactionMethod;
	}
}

Producing and Consuming Avro Messages with Kafka

Apache Avro is a data serialization system which relies on schema for serializing and deserializing the objets, but the interesting part is we can use different schema to serialize and deserialize the same object.This allows us to decouple the system that is producing the message from the one that is consuming these messages.

Avro schema is defined using json and it supports wide range of data types and collections including all primitive types (int,string,float,double,long)

The advantage of using Avro with Kafka is realized while creating a system bus, where we want to decouple the producers from consumers, providing the producer with freedom to change schema without breaking the system as Avro will take care of being backward compatible.

The following defines a employee schema with FirstName(fname), LastName(lname) being string type and Salary(sal) as int

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" },
{ "name": "sal", "type": "int" }
],
"name": "employee",
"type": "record"
}

The beauty of Avro is we can deserialize the object serialized using above schema using

{
"fields": [
{ "name": "fname", "type": "string" },
{ "name": "lname", "type": "string" }
],
"name": "employee",
"type": "record"
}

As we have deleted the Salary part from schema this is not going to break our system, but it will be handled internally by the Avro framework.

We will be using Bijection API by twitter to serialize and deserialize our Avro objects using above defined Avro schema in json format


public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"employee\","
+ "\"fields\":["
+ " { \"name\":\"fname\", \"type\":\"string\" },"
+ " { \"name\":\"lname\", \"type\":\"string\" },"
+ " { \"name\":\"sal\", \"type\":\"int\" }"
+ "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);

We need to have a Injection object to seralize/deserialize using defined schema.

Injection recordInjection = GenericAvroCodecs.toBinary(schema);

Now we can create a record using Dynamic typing feature of Avro without generating the code using GenericData abstraction
To Serialize the object using defined schema

GenericData.Record record = new GenericData.Record(schema);
avroRecord.put("fname", "James");
avroRecord.put("lname", "Bond");
avroRecord.put("sal", 4200);

byte[] bytes = recordInjection.apply(record);

To deserialize the object


GenericRecord record = recordInjection.invert(avroRecord.value()).get();
String firstName=record.get("fname")

Once we have byte[] representation of our Avro object we can send the it as a message/record to Kafka without changing anything, for kafka every message is just some random bytes

You can find the code for sending and receiving Avro messages on kafka at GitHub

Microservice Registration and Discovery with Spring Cloud

Microservices are at center while designing any application with multiple parts which are required to be scaled horizontally to serve growing demand. It helps breaking system into small independent components making it easier to manage/develop and scale. But before deploying the architecture we need to answer few questions
How these microservices will discover each other?
How we will ensure High Availability of these microservices?
How our end service dealing with user will consume these microservices?

How these microservices will discover each other?
We need a central location where these services running on different machines can register themselves and declare the ip address and port at which they are running for example we have an addition service running on ip address 192.2.3.4 and port 8080 this service will register itself to some central location with name additionservice and ip, port

Name IP/Port
additionservice 192.2.3.4:8080

Any other service who wants to perform addition will discover details of addition service by looking up to this central repository using service name additionservice

Spring Cloud Netflix provides many different components to perform service discovery , load balancing and handling failures of these services
Eureka is the Netflix Service Discovery Server and Client and act as central location for service registry every service in our ecosystem will register it self with Eureka along with some meta data information such as ip and port
If we scale our micorservice by launching one more instance to run on some different ip say 192.2.3.5 and port 8080. We will have entry in our Eureka Server as

Name IP/Port No.Ofinstances
additionservice 192.2.3.4:8080,192.2.3.5:8080 2

We can deploy some smart load balancing at client side so when it discovers the service endpoints from our service discovery Eureka Server gets two ip addresses and smartly distributes the load across these endpoints

Having multiple instances of service makes our service Highly Available and at same point by introducing the concept of central service registry it decouples client from having know how of where the services are deployed and running even if we change the location of these services at run time it won’t impact client consuming our services via Eureka.

Eureka Server does not have its own back-end and the service instances have to send heartbeats to server to keep their registrations up to date. Clients also have an in-memory cache of eureka, thus avoiding trip to server for every request.

There is concept of zones in Eureka Server stay tuned for updates on them !! ☺
Follow below Github repo to get code for EurekaServer, EurekaClient and EurkaConsumer
Eureka

Method can take data type as parameter

I am in love and this time in love with Scala ,what a great language .Very elegantly designed  and written i thought i knew everything about method overriding when i came across this loving ,fantastic and a  great feature of Scala that allows one to pass the data type as parameter

Function syntax

def <function-name>[type-name](<parameter-name>:<type-name>) : <type-name> ..


object ParametrizeFuncType {
def main(args:Array[String]){
/**
* prints true and takes String
*/
println(identityfunc[String]("amit":String).isInstanceOf[String])
/**
* prints true and takes int
*/
println(identityfunc[Int](1:Int).isInstanceOf[Int])
/*
* wow! classic way for method overriding made data type as parameter
*/
}
def identityfunc[T](a:T)=a
}

you just need not to write multiple methods taking different data types .Just write once and run it with different types and you too will fall in Love 🙂

ImageSearch with ElasticSearch

ElasticSearch+LIRE (Lucene Image REtrieval) is all that you need to perform image searching and suggesting best matched image.It is a very interesting use case and I am very excited to get my Hands dirty with this feature

LIRE (Lucene Image REtrieval) is a plugin for Lucene to index and search images.It does content based ,give image as input and get all the matching images as output.LIRE supports many different features LIRE FEATURES

Indexing Images
Indexing Images is no different then indexing document in ElasticSearch ,we need to set the image object data type as (image) and value to be the Base64 of image, where Base64 encoding is a transformation that encodes 3 bytes of data into four printable charachters.

You will find a Github link at the end of this article hosting actual code used in this project with ElasticSearch Mapping

To use this feature we need to define our image object as image(data type) in ES mapping


"image": {
"properties": {
"name": {
"type": "string",
"index":"not_analyzed"
},
"path":{
"type": "string",
"index":"not_analyzed"
}
"image": {
"type": "image",
"feature": {
"CEDD": {
"hash": "BIT_SAMPLING"
},
"JCD": {
"hash": ["BIT_SAMPLING", "LSH"]
},
"FCTH": {}
}
}
}
}

We define 3 fields here

  • name -> Name of Image
  • path-> Path where actual image is stored
  • image-> Actual Image (Base64)

Application Flow
We will insert our images in HDFS and will index them on ES while converting them in Base64
For example Say we have Image MickeyMouse.jpg
We will store this MickeyMouse.jpg at location hdfs://:/esimage/MickeyMouse.jpg,and our ES will have

{
"name":"MickeyMouse.jpg",
"path":"/esimage/MickeyMouse.jpg",
"image":""
}

 

All the queries from UI will be directed to our ES ,in result we will have name and path to be returned .When we want to display actual image we will fetch it from HDFS using path present in result

GitHub EsImageSearch