Tag Archives: JAVA

Understanding Predicates with JAVA8

In mathematics Predicates are functions that can be either True or False. In JAVA8 Predicates are functional interfaces with only functional method test.
As Predicate is defined as a functional interface in JAVA8 it can be used as the assignment target for a lambda expression or method reference.
we can do boolean operations such as and, or, not(negate) with different instances of Predicate. These default methods are –

Default Method Name Explanation
and() It does logical AND of the predicate on which it is called with another predicate. Example: predicate1.and(predicate2)
or() It does logical OR of the predicate on which it is called with another predicate. Example: predicate1.or(predicate2)
negate() It does boolean negation of the predicate on which it is invoked. Example: predicate1.negate()

Following code uses JAVA8 predicate and replaceIf method, now available in collections, to check from the list of transaction to get only those transaction that has a value more than 2lakh and are done online.

public class UnderstandingRemoveIf {

	static Predicate<Transaction> checkTransactionAmount = new Predicate<Transaction>() {

		@Override
		public boolean test(Transaction transaction) {
			return Math.round(transaction.amount) < 200000;
		}
		
	};
	
	static Predicate<Transaction> isTransactionOnline = new Predicate<Transaction>() {

		@Override
		public boolean test(Transaction transaction) {
			return transaction.transactionMethod!='O';
		}
	};

	public static void main(String[] args) {

		List<Transaction> transactions = new ArrayList<>();
		Transaction transaction1=new Transaction(10000.8d, 'C');
		Transaction transaction2=new Transaction(150000d, 'O');
		Transaction transaction3=new Transaction(300000d, 'O');
		transactions.add(transaction1);
		transactions.add(transaction2);
		transactions.add(transaction3);
		
		// Get all transactions that are done online of more then 2 lakh
		transactions.removeIf(checkTransactionAmount.or(isTransactionOnline));
		System.out.println(transactions);
	}

}

class Transaction{
	
	double amount;
	char   transactionMethod; // O for online 
	public Transaction(double amount,char transactionMethod) {
		this.amount=amount;
		this.transactionMethod=transactionMethod;
	}
	
	@Override
	public String toString() {
		return amount+","+transactionMethod;
	}
}

Microservice Registration and Discovery with Spring Cloud

Microservices are at center while designing any application with multiple parts which are required to be scaled horizontally to serve growing demand. It helps breaking system into small independent components making it easier to manage/develop and scale. But before deploying the architecture we need to answer few questions
How these microservices will discover each other?
How we will ensure High Availability of these microservices?
How our end service dealing with user will consume these microservices?

How these microservices will discover each other?
We need a central location where these services running on different machines can register themselves and declare the ip address and port at which they are running for example we have an addition service running on ip address 192.2.3.4 and port 8080 this service will register itself to some central location with name additionservice and ip, port

Name IP/Port
additionservice 192.2.3.4:8080

Any other service who wants to perform addition will discover details of addition service by looking up to this central repository using service name additionservice

Spring Cloud Netflix provides many different components to perform service discovery , load balancing and handling failures of these services
Eureka is the Netflix Service Discovery Server and Client and act as central location for service registry every service in our ecosystem will register it self with Eureka along with some meta data information such as ip and port
If we scale our micorservice by launching one more instance to run on some different ip say 192.2.3.5 and port 8080. We will have entry in our Eureka Server as

Name IP/Port No.Ofinstances
additionservice 192.2.3.4:8080,192.2.3.5:8080 2

We can deploy some smart load balancing at client side so when it discovers the service endpoints from our service discovery Eureka Server gets two ip addresses and smartly distributes the load across these endpoints

Having multiple instances of service makes our service Highly Available and at same point by introducing the concept of central service registry it decouples client from having know how of where the services are deployed and running even if we change the location of these services at run time it won’t impact client consuming our services via Eureka.

Eureka Server does not have its own back-end and the service instances have to send heartbeats to server to keep their registrations up to date. Clients also have an in-memory cache of eureka, thus avoiding trip to server for every request.

There is concept of zones in Eureka Server stay tuned for updates on them !! ☺
Follow below Github repo to get code for EurekaServer, EurekaClient and EurkaConsumer
Eureka

Understanding Apache Spark’s Fold operation

As described on Apache Spark’s website the fold definition states

“Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value”.”

Thus fold operates by folding first each element of partition and then the result of partition
it is partition dependent

let’s examine the below code

JavaRDD jrdd=jsc.parallelize(Arrays.asList("FirstFile"));
String resultfold = jrdd.fold("0",new Function2() {

public String call(String arg0, String arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println(" the arg0 .. >> "+arg0);
System.out.println(" the arg1 .. >> "+arg1);
return arg0+","+arg1;
}
});

gives output
0,0,0,FirstFile

arg0 is assigned with value 0 initially for a partition
arg1 with value FirstFile
resulting in 0,FirstFile –> as aggregation for one partition
In more than one core machine when number of threads given to spark are more than 1 the above RDD will have more than one partition since an empty partition ends up folding down to zero element we will get bepw values for empty partitions
arg0 is 0
arg1 is 0
result is 0,0

Once each partition is aggregated the result of all the partitions will be aggregated using the same function and supplied value resulting in
0,0,0,FirstFile

The above result may wary each time we execute it depending on which partition gets executed first

Let’s re-partition the above rdd and set the number of partition count to 1 explicitly

JavaRDD jdpar = jrdd.repartition(1);
String resultfold = jdpar.fold("0",new Function2() {

public String call(String arg0, String arg1) throws Exception {
// TODO Auto-generated method stub
System.out.println(" the arg0 .. >> "+arg0);
System.out.println(" the arg1 .. >> "+arg1);
return arg0+","+arg1;
}
});

Since there is a single partition with value FirstFile

1st iteration
arg0 0
arg1 FirstFile

2nd iteration
arg0 0
arg1 0,FirstFile
result 0,0,FirstFile

Lambda Expressions JAVA 8

To understand the concept of lambda expression we need to have a clear understanding of what is a Functional Interface
An interface which declares only one abstract method is called as Functional Interface and lambda expressions let us provide the implementation of the abstract method of a Functional Interface


interface Finterface{
public abstract void functionalInterfaceMethod();
}

Now we can have the implementation of this functional interface either by using anonymous inner classes or by using the Lambda expressions
Using Anonymous inner class

Finterface finterface=new Finterface(){
public void functionalInterfaceMethod(){
System.out.println("Example lambda!");
}
};

or using Lambda expression

Finterface finterface=()->System.out.println("Example Lambda!");

()->System.out.println(“Example Lambda!”);–> provide the implementation of Finterface.

The signature of the abstract method of a functional interface describes the signature of lambda expression called as Functional Descriptor as in the above example the functionalInterfaceMethod declares that is doesn’t accepts any parameter and returns void
the same is the description of its Lambda
()# Zero parameters
-># separates the parameter with expression
System.out.println()# Expression returning void

HBase and Sharding

The basic unit of Sharding(scalability and load balancing) is region,each region consist of a sequence of row keys, the regions are created and merged depending upon the configurable parameter

The table is initially created with a single region since the Hbase itself can not know in advance where to create the split points when the size of region grows more than what is configured it is auto-split  into different regions from middle of the row key.(Is is again Configurable)

Whenever data in the HBase table is updated it is first written in the WAL(Write Ahead Log)File and stored in memstore(memory store) once the data in memory exceeds the limit the data is flushed into the HFile on disk,As the store file increases the region server compacts them into combined larger files.As the store file accumulates or after the compaction finishes a Region-Split request is queued to decide if Splitting the Region is required ,Since the HFiles are immutable the newly created region if created will not rewrite the data Instead, they will create  small sym-link like files, named Reference files, which point to either top or bottom part of the parent store file according to the split point.

Thus Splitting is very fast as the newly created region will read from the original storage files until a compaction rewrites them all into separate one asynchronously

For further Reading I will suggest to go through the below link

http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/