Lessons learned hard way with Elastic Upgrade

In this blog post i want to share my experience and lessons learned while upgrading our ElasticSearch cluster from 5.4 to 5.6 with TB’s of data without any downtime.
To start, i would first like to introduce the play ground and rules that we were required to take care while performing cluster wide upgrade.

  • Indexing and Searching should not get impacted, at time of upgrade we were handling indexing @=~ 20k/sec and search rate @=~ 2500/sec
  • Watcher/alerts should always be running as they provide us valuable actionable insight to TB’s of data we host
  • Security of cluster should not be compramised
  • Plugins/visualizations and dashboards are all expected to function as they were before upgrade

As it was a minor/ rolling upgrade we started with one node at a time things were simple when we moved from one DataNode to other DataNode keeping check of below points

  • Stop cluster wide allocation of shards. As when we stop one of the nodes for upgrading the ES version, Master will mark some primary and replica shards as missing this will lead to assignment of shards, but we don’t want this to happen. Node is going to join back the cluster in some time.

    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "none"
    }
    }
  • Stopping a node will bring the cluster in Yellow state. Wait for the cluster to change color to green before jumping to next node. Each time you bring the node back after upgrading ES version, set the allocation to true.

    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "all"
    }
    }
  • There is always a possibility that even after setting the allocation true . ES will take lot of time to allocate shards and turn the status to green. If that is the case look for the setting

     "index.unassigned.node_left.delayed_timeout":

    . You may require to alter this setting to enable ES assign shards

  • One of the pain points while doing rolling upgrade with ES is , for some time till the upgrade is over you will have nodes running multiple version of x-pack plugin, this will make it impossible to track the progress from Kibana. Your best bet is to rely on Rest API end points. For instance let’s assume a cluster with 2 client nodes on which kibana is running and rest other 5 nodes acting as datanode and masternode. If we upgrade our datanode1. It’s X-pack plugin version will differ from the client node, as client node is not yet upgraded and even kibana is not yet upgraded. This will prevent the datanode1 to send monitoring data points to kibana and thus making it impossible to visualize progress on kibana UI 
  • One natural instinct is to upgrade kibana first and rest of the nodes to avoid any problem of monitoring, but it’s is a vicious circle, which is obvious. Kibana 5.6 will not start until all the nodes are at 5.6. Kibana 5.4 can work with some nodes at 5.4 and some at 5.6 but it won’t be able to display monitoring for all the nodes that are at 5.6
  • Another pain point is watcher. Watcher runs on Master node. Always,i would repeat always remember to upgrade the master nodes at end. If you upgrade the master nodes first while your some of the data nodes are running on 5.4 your watchers will fail. Even they will keep failing until you have upgraded kibana.
  • As the watchers uses .monitoring indexes ,all the watchers will fail even if you have upgraded all your client,data and master nodes until and unless kibana is also not upgraded.

Demystifying ElasticSearch refresh !

By default ElasticSearch is configured to refresh the index every 1 second. This means it will take atleast 1 second to propagate the changes that are made to a document to be made visible during search.But what if we have a requirement to trigger a process only when the search results are made available.

We want index/update or insert request to wait , until the changes made to the documents are available for search before it returns.

refresh parameter is available for these API’s to control when we want our index to refresh and changes made available to user.

  • Setting it to true , refresh = true will cause relevant Primary and Secondary shard , not complete index to be refreshed immediately.
  • Setting it to wait_for, refresh=wait_for will cause the request to wait until the index is refreshed by ElasticSearch based on index.refresh-interval i.e 1 sec by default. Once the index is refreshed the request returns.
  • Setting it to false, refresh=false has no impact on refresh and request returns immediately. It simply means the data will be available in near future.

Note: ElasticSearch will refresh only those shards that have changed, not the entire index

But there is catch in these simple parameters, there are cases that will cause refresh to happen irrespective of the value of refresh parameter you have set

  • if index.max_refresh_listeners which defaults to 1000 is reached. refresh=wait_for will cause the relevant shard to be refreshed immediately.
  • By default GET is realtime i.e each time a GET request is made, it issues index refresh for the appropriate segment. Causing all changes to be made available.

Understanding Co-partitions and Co-Grouping In Spark

The RDD’s in spark are partitioned, using Hash Partitioner by default. Co-partitioned RDD’s uses same partitioner and thus have their data distributed across partitions in same manner.

val data = Array(1, 2, 3, 4, 5)
val rdd1= sc.parallelize(data,10)
val data2 = Array(5,8,9,10,2)
rdd2=sc.parallelize(data2,10)

In both of the above defined RDD’s ,same partitioner is used i.e HashPartitioner. HashPartitioner will partition the data in the same way for both RDD’s,same data values in two different RDD will give same Hashvalue. As the number of partitiones specified is also same. These co-partitioned RDD’s reduces the shuffling in network to a great extent. As all the keys required for keyBy transformations will be present in two same partitions of two different RDD’s.

Co-grouping utilizes concept of Co-Partitioning to provide efficient performance improvement when multiple RDD’s are to be joined, over using join again and again. As with every join operation the destination RDD will either have supplied or default value of partitions and the join may or may not require shuffling of two RDD’s that are to be joined based on, if they were co-partitioned and had same number of partitions.

rdd3=rdd1.join(rdd2)

Since rdd1 and rdd2 used same partitioner and also had same number of partitions, the join operation that produces rdd3 will not require any shuffle. But if rdd1 and rdd2 had different number of partitions than the content of rdd with small number of partitions would have been reshuffled.Since number of partitions are not specified, the will depend on default configuration.

Performing another join using rdd3 and rdd4 to create rdd5 will lead to chances of more shuffling. All these shuffling and expensive operations can be avoided by using cogroup when we have multiple RDD’s to be joined.

rdd5=rdd1.cogroup(rdd2,rdd3)

As the cogroup will create co-partitioned RDD’s

Charts for Data Analysis

Visualizing data efficiently is the first step in understanding the type of distribution( e.g normal distribution) present in available data Set.It also helps in finding skewness,outliers and many other properties present in data , to help us normalize/ clean it before performing any data-analytics on top of it.

Below are the few charts that are most commonly used in Datascience.

Histogram
It shows the underlying frequency distribution of set of continuous data, divided in intervals bins.The x-axis represents the values present in the data, while the y-axis (and thus the height of each bar) represents the frequency.
Each bin contains the number of occurrences of scores in the data that are contained withing that distribution. The size of bins should be chosen wisely to make sure the resulting graph is able to depict the underlying frequency distribution of data.

Histogram

Use a histogram when you have numerical data and want to understand the data distribution, including its shape and central tendency

ScatterPlot
Typically used with large dataset, when we want to find out if there is any relation between variables, provided both are numeric.If there is any relationship between the variables plot across x and y axis the points would scatter across in a way, as if there existed a invisible line.If the relationship is weaker, the dots will be arranged more loosely but still show a tendency for the y variable to either increase or decrease as the x variable increases.If no relationship exists between variables they would be scattered randomly.
scatter
“Use this type of graph when you have two numerical variables and are interested in the relationship between them”

Box-and-Whiskers Plot
These are useful when you are comparing numerical data across multiple groups or categories. With a boxplot you can quickly get information about the mean or median of the data, the overall distribution and degree of variation, and the existence of outliers.
box-whisker

“It is especially useful for indicating whether a distribution is skewed and whether there are potential unusual observations (outliers) in the data set. Box and whisker plots are also very useful when large numbers of observations are involved and when two or more data sets are being compared”

Happy reading .. ☺

Understanding Predicates with JAVA8

In mathematics Predicates are functions that can be either True or False. In JAVA8 Predicates are functional interfaces with only functional method test.
As Predicate is defined as a functional interface in JAVA8 it can be used as the assignment target for a lambda expression or method reference.
we can do boolean operations such as and, or, not(negate) with different instances of Predicate. These default methods are –

Default Method Name Explanation
and() It does logical AND of the predicate on which it is called with another predicate. Example: predicate1.and(predicate2)
or() It does logical OR of the predicate on which it is called with another predicate. Example: predicate1.or(predicate2)
negate() It does boolean negation of the predicate on which it is invoked. Example: predicate1.negate()

Following code uses JAVA8 predicate and replaceIf method, now available in collections, to check from the list of transaction to get only those transaction that has a value more than 2lakh and are done online.

public class UnderstandingRemoveIf {

	static Predicate<Transaction> checkTransactionAmount = new Predicate<Transaction>() {

		@Override
		public boolean test(Transaction transaction) {
			return Math.round(transaction.amount) < 200000;
		}
		
	};
	
	static Predicate<Transaction> isTransactionOnline = new Predicate<Transaction>() {

		@Override
		public boolean test(Transaction transaction) {
			return transaction.transactionMethod!='O';
		}
	};

	public static void main(String[] args) {

		List<Transaction> transactions = new ArrayList<>();
		Transaction transaction1=new Transaction(10000.8d, 'C');
		Transaction transaction2=new Transaction(150000d, 'O');
		Transaction transaction3=new Transaction(300000d, 'O');
		transactions.add(transaction1);
		transactions.add(transaction2);
		transactions.add(transaction3);
		
		// Get all transactions that are done online of more then 2 lakh
		transactions.removeIf(checkTransactionAmount.or(isTransactionOnline));
		System.out.println(transactions);
	}

}

class Transaction{
	
	double amount;
	char   transactionMethod; // O for online 
	public Transaction(double amount,char transactionMethod) {
		this.amount=amount;
		this.transactionMethod=transactionMethod;
	}
	
	@Override
	public String toString() {
		return amount+","+transactionMethod;
	}
}