Category Archives: ElasticSearch

Handling dangling Elasticsearch watcher index.

A few weeks back, our Elasticsearch cluster stopped executing any watchers. Doing initial analysis it looked like there is some problem with AWS SMTP service. As we use AWS SMTP for sending mail alerts to our LDAP accounts. After going through more logs and spending some time in understanding the sent mail statistics on AWS, thanks to AWS for providing intuitive UI to get insights of emails that are getting rejected. We were sure there is no problem with sending of email but something is wrong on the current master. Analyzing below log line it was clear that there is some issue with .watcher index.

Resolution

  • Delete the local directory: The log line tells the node name that is holding a stale copy of index along with the directory name. In our case it was es-master-1 node name with the directory 23nm9NSrSkeZaK4Dtyughg under data folder for the master.
  • Restart Watcher Service: Once the stale index directory is deleted, restart the watcher service

    POST _xpack/watcher/_restart

Lessons learned hard way with Elastic Upgrade

In this blog post i want to share my experience and lessons learned while upgrading our ElasticSearch cluster from 5.4 to 5.6 with TB’s of data without any downtime.
To start, i would first like to introduce the play ground and rules that we were required to take care while performing cluster wide upgrade.

  • Indexing and Searching should not get impacted, at time of upgrade we were handling indexing @=~ 20k/sec and search rate @=~ 2500/sec
  • Watcher/alerts should always be running as they provide us valuable actionable insight to TB’s of data we host
  • Security of cluster should not be compramised
  • Plugins/visualizations and dashboards are all expected to function as they were before upgrade

As it was a minor/ rolling upgrade we started with one node at a time things were simple when we moved from one DataNode to other DataNode keeping check of below points

  • Stop cluster wide allocation of shards. As when we stop one of the nodes for upgrading the ES version, Master will mark some primary and replica shards as missing this will lead to assignment of shards, but we don’t want this to happen. Node is going to join back the cluster in some time.

    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "none"
    }
    }
  • Stopping a node will bring the cluster in Yellow state. Wait for the cluster to change color to green before jumping to next node. Each time you bring the node back after upgrading ES version, set the allocation to true.

    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "all"
    }
    }
  • There is always a possibility that even after setting the allocation true . ES will take lot of time to allocate shards and turn the status to green. If that is the case look for the setting


    . You may require to alter this setting to enable ES assign shards
  • One of the pain points while doing rolling upgrade with ES is , for some time till the upgrade is over you will have nodes running multiple version of x-pack plugin, this will make it impossible to track the progress from Kibana. Your best bet is to rely on Rest API end points. For instance let’s assume a cluster with 2 client nodes on which kibana is running and rest other 5 nodes acting as datanode and masternode. If we upgrade our datanode1. It’s X-pack plugin version will differ from the client node, as client node is not yet upgraded and even kibana is not yet upgraded. This will prevent the datanode1 to send monitoring data points to kibana and thus making it impossible to visualize progress on kibana UI 
  • One natural instinct is to upgrade kibana first and rest of the nodes to avoid any problem of monitoring, but it’s is a vicious circle, which is obvious. Kibana 5.6 will not start until all the nodes are at 5.6. Kibana 5.4 can work with some nodes at 5.4 and some at 5.6 but it won’t be able to display monitoring for all the nodes that are at 5.6
  • Another pain point is watcher. Watcher runs on Master node. Always,i would repeat always remember to upgrade the master nodes at end. If you upgrade the master nodes first while your some of the data nodes are running on 5.4 your watchers will fail. Even they will keep failing until you have upgraded kibana.
  • As the watchers uses .monitoring indexes ,all the watchers will fail even if you have upgraded all your client,data and master nodes until and unless kibana is also not upgraded.

Demystifying ElasticSearch refresh !

By default ElasticSearch is configured to refresh the index every 1 second. This means it will take atleast 1 second to propagate the changes that are made to a document to be made visible during search.But what if we have a requirement to trigger a process only when the search results are made available.

We want index/update or insert request to wait , until the changes made to the documents are available for search before it returns.

refresh parameter is available for these API’s to control when we want our index to refresh and changes made available to user.

  • Setting it to true , refresh = true will cause relevant Primary and Secondary shard , not complete index to be refreshed immediately.
  • Setting it to wait_for, refresh=wait_for will cause the request to wait until the index is refreshed by ElasticSearch based on index.refresh-interval i.e 1 sec by default. Once the index is refreshed the request returns.
  • Setting it to false, refresh=false has no impact on refresh and request returns immediately. It simply means the data will be available in near future.

Note: ElasticSearch will refresh only those shards that have changed, not the entire index

But there is catch in these simple parameters, there are cases that will cause refresh to happen irrespective of the value of refresh parameter you have set

  • if index.max_refresh_listeners which defaults to 1000 is reached. refresh=wait_for will cause the relevant shard to be refreshed immediately.
  • By default GET is realtime i.e each time a GET request is made, it issues index refresh for the appropriate segment. Causing all changes to be made available.

Nifi+Twitter+Kafka+ElasticSearch+HBase=DataPipeline

Apache Nifi is one of the best tool to go with if you want to create Data Pipeline’s that are reliable and very easy to share.It provides around 135 out of the box Processors ready to use be it Kafka,AMQ,HBase,HDFs,Solr,Elasticsearch ,while providing single interface to implement your own custom processor.

Nifi was designed and built to automate the flow of data between the systems.Lot of things are happening in IOT space and one of the major challenge is to capture and route the data i.e to create reliable Pipeline to ensure data is not lost ,delivered on time with efficiency ,Nifi is the answer

In this post we will see how to Integrate Nifi to help us capture Tweet containing particular word/phrase store it on a queue , index it onto ElasticSearch and store it on HBase
Requirement is to capture all the tweets containing keyword “Hadoop” on tweeter ,index the tweets on ElasticSearch and store it in HBase(It is not a good idea to index all the things but this post is more towards sharing the potential Nifi promises to provide in Big Data Space)
Step 1) Capture Tweet
Nifi ships with GetTwitter Processor for capturing a tweet containing particular word . To use this processor you need to register yourself as twitter developer at https://dev.twitter.com/ for obtaining Consumer/Access keys .Once obtained configure following properties of GetTwitterProcessor

  • Twitter EndPoint -> Filter Endpoint
  • Consumer Key -> Your consumer key
  • Consumer Secret -> Your Consumer Secret
  • Access Token -> Your Access Token
  • Access Token Secret -> Your Access Token Secret
  • Terms to Filter on -> “Hadoop”

Step2) Land Tweet To Kafka
If Twitter is captured successfully route it to kafka using PutKafka Processor ,which again ships with Nifi providing an easy and convenient way to route all the messages on to a kafka topic and also providing facility to configure the number of partitions and key for kafka messages.Properties to Configure

  • KafkaBrokers -> host:port
  • Topic Name -> Topic on which to send messages

Step3)Index and Store
On successful receipt of message we need to index it on ElasticSearch and Store it on HBase in parallel .Nifi again ships with two processors to achieve this task .PutElasticSearch and PutHbaseJSon
Configure PutElasticSearch to point to your ElasticSearch cluster and with index name and type of document to be indexed with.Properties to configure

  • Cluster Name -> ES Cluster Name
  • ElasticSearch Hosts -> host:9093
  • Index -> Index name on which to index
  • Type -> Document Type

PutHbaseJson converts all the attributes/properties in json to column qualifier with their value as value of that specific cell .Point it to your Hbase and specify the table name and column family to use using properties

  • TableName -> Table Name on which to insert
  • Column Family -> ColumnFamily in that table
  • HBaseClientService -> You need to enable this service by providing Zookeeper Quorum and Node of hbase(/hbase or /hbase-unsecure with Hortonworks) ,this service is responsible for integrating with HBase

With every Processor you have an option of specifying the batch size to insert/index .With every relation you create a queue that hold the message for the time it is configured and not entertained by end service of relationship

Data Pipeline

Nifi + Tweet Pipeline

ImageSearch with ElasticSearch

ElasticSearch+LIRE (Lucene Image REtrieval) is all that you need to perform image searching and suggesting best matched image.It is a very interesting use case and I am very excited to get my Hands dirty with this feature

LIRE (Lucene Image REtrieval) is a plugin for Lucene to index and search images.It does content based ,give image as input and get all the matching images as output.LIRE supports many different features LIRE FEATURES

Indexing Images
Indexing Images is no different then indexing document in ElasticSearch ,we need to set the image object data type as (image) and value to be the Base64 of image, where Base64 encoding is a transformation that encodes 3 bytes of data into four printable charachters.

You will find a Github link at the end of this article hosting actual code used in this project with ElasticSearch Mapping

To use this feature we need to define our image object as image(data type) in ES mapping


"image": {
"properties": {
"name": {
"type": "string",
"index":"not_analyzed"
},
"path":{
"type": "string",
"index":"not_analyzed"
}
"image": {
"type": "image",
"feature": {
"CEDD": {
"hash": "BIT_SAMPLING"
},
"JCD": {
"hash": ["BIT_SAMPLING", "LSH"]
},
"FCTH": {}
}
}
}
}

We define 3 fields here

  • name -> Name of Image
  • path-> Path where actual image is stored
  • image-> Actual Image (Base64)

Application Flow
We will insert our images in HDFS and will index them on ES while converting them in Base64
For example Say we have Image MickeyMouse.jpg
We will store this MickeyMouse.jpg at location hdfs://:/esimage/MickeyMouse.jpg,and our ES will have

{
"name":"MickeyMouse.jpg",
"path":"/esimage/MickeyMouse.jpg",
"image":""
}

 

All the queries from UI will be directed to our ES ,in result we will have name and path to be returned .When we want to display actual image we will fetch it from HDFS using path present in result

GitHub EsImageSearch