Tag Archives: ElasticSearch

Handling dangling Elasticsearch watcher index.

A few weeks back, our Elasticsearch cluster stopped executing any watchers. Doing initial analysis it looked like there is some problem with AWS SMTP service. As we use AWS SMTP for sending mail alerts to our LDAP accounts. After going through more logs and spending some time in understanding the sent mail statistics on AWS, thanks to AWS for providing intuitive UI to get insights of emails that are getting rejected. We were sure there is no problem with sending of email but something is wrong on the current master. Analyzing below log line it was clear that there is some issue with .watcher index.

2018-05-10T07:05:16,969][WARN ][o.e.g.DanglingIndicesState] [es-master-1] [[.watches/23nm9NSrSkeZaK4Dtyughg]] cannot be imported as a dangling index, as index with same name already exists in cluster metadata

Resolution

  • Delete the local directory: The log line tells the node name that is holding a stale copy of index along with the directory name. In our case it was es-master-1 node name with the directory 23nm9NSrSkeZaK4Dtyughg under data folder for the master.
  • Restart Watcher Service: Once the stale index directory is deleted, restart the watcher service

    POST _xpack/watcher/_restart

Demystifying ElasticSearch refresh !

By default ElasticSearch is configured to refresh the index every 1 second. This means it will take atleast 1 second to propagate the changes that are made to a document to be made visible during search.But what if we have a requirement to trigger a process only when the search results are made available.

We want index/update or insert request to wait , until the changes made to the documents are available for search before it returns.

refresh parameter is available for these API’s to control when we want our index to refresh and changes made available to user.

  • Setting it to true , refresh = true will cause relevant Primary and Secondary shard , not complete index to be refreshed immediately.
  • Setting it to wait_for, refresh=wait_for will cause the request to wait until the index is refreshed by ElasticSearch based on index.refresh-interval i.e 1 sec by default. Once the index is refreshed the request returns.
  • Setting it to false, refresh=false has no impact on refresh and request returns immediately. It simply means the data will be available in near future.

Note: ElasticSearch will refresh only those shards that have changed, not the entire index

But there is catch in these simple parameters, there are cases that will cause refresh to happen irrespective of the value of refresh parameter you have set

  • if index.max_refresh_listeners which defaults to 1000 is reached. refresh=wait_for will cause the relevant shard to be refreshed immediately.
  • By default GET is realtime i.e each time a GET request is made, it issues index refresh for the appropriate segment. Causing all changes to be made available.

Nifi+Twitter+Kafka+ElasticSearch+HBase=DataPipeline

Apache Nifi is one of the best tool to go with if you want to create Data Pipeline’s that are reliable and very easy to share.It provides around 135 out of the box Processors ready to use be it Kafka,AMQ,HBase,HDFs,Solr,Elasticsearch ,while providing single interface to implement your own custom processor.

Nifi was designed and built to automate the flow of data between the systems.Lot of things are happening in IOT space and one of the major challenge is to capture and route the data i.e to create reliable Pipeline to ensure data is not lost ,delivered on time with efficiency ,Nifi is the answer

In this post we will see how to Integrate Nifi to help us capture Tweet containing particular word/phrase store it on a queue , index it onto ElasticSearch and store it on HBase
Requirement is to capture all the tweets containing keyword “Hadoop” on tweeter ,index the tweets on ElasticSearch and store it in HBase(It is not a good idea to index all the things but this post is more towards sharing the potential Nifi promises to provide in Big Data Space)
Step 1) Capture Tweet
Nifi ships with GetTwitter Processor for capturing a tweet containing particular word . To use this processor you need to register yourself as twitter developer at https://dev.twitter.com/ for obtaining Consumer/Access keys .Once obtained configure following properties of GetTwitterProcessor

  • Twitter EndPoint -> Filter Endpoint
  • Consumer Key -> Your consumer key
  • Consumer Secret -> Your Consumer Secret
  • Access Token -> Your Access Token
  • Access Token Secret -> Your Access Token Secret
  • Terms to Filter on -> “Hadoop”

Step2) Land Tweet To Kafka
If Twitter is captured successfully route it to kafka using PutKafka Processor ,which again ships with Nifi providing an easy and convenient way to route all the messages on to a kafka topic and also providing facility to configure the number of partitions and key for kafka messages.Properties to Configure

  • KafkaBrokers -> host:port
  • Topic Name -> Topic on which to send messages

Step3)Index and Store
On successful receipt of message we need to index it on ElasticSearch and Store it on HBase in parallel .Nifi again ships with two processors to achieve this task .PutElasticSearch and PutHbaseJSon
Configure PutElasticSearch to point to your ElasticSearch cluster and with index name and type of document to be indexed with.Properties to configure

  • Cluster Name -> ES Cluster Name
  • ElasticSearch Hosts -> host:9093
  • Index -> Index name on which to index
  • Type -> Document Type

PutHbaseJson converts all the attributes/properties in json to column qualifier with their value as value of that specific cell .Point it to your Hbase and specify the table name and column family to use using properties

  • TableName -> Table Name on which to insert
  • Column Family -> ColumnFamily in that table
  • HBaseClientService -> You need to enable this service by providing Zookeeper Quorum and Node of hbase(/hbase or /hbase-unsecure with Hortonworks) ,this service is responsible for integrating with HBase

With every Processor you have an option of specifying the batch size to insert/index .With every relation you create a queue that hold the message for the time it is configured and not entertained by end service of relationship

Data Pipeline

Nifi + Tweet Pipeline

ElasticSearch AutoComplete

Elasticsearch provides a great feature called Search-as-you-type providing us a way to implement search engine that looks like Google.In my recent project we had same requirement ,we were having n number of name/state/city/company/person in our Database and were required to support ad-hoc queries we choose ES as our tool to realize this use case.

Elasticsearch provides many features out of the box one of the feature ,i like most is autocomplete ,i remember days when we use to implement this feature using AJAX and running queries like %ELASTIC% to get the suggestions from DB ,but ES has a different approach to this problem using appropriate analyzers while indexing data into ES we can build the data in such a way that we don’t have to perform phrase query ,but by using exact matching queries we can get this functionality implemented.

For example Say we have Movie Data base ,with below entries
Reservoir Dogs
Airplane
Doctor Zhivago
The Deer Hunter
The Lord of the Rings

Using standard analyzer we will have below inverted index

InvertedIndex

Now say if we have all things implemented using standard analyzer,when we will type Th we will be suggested with nothing provided we are not querying to get all words that start with T* as it will be inefficient because there no such token Th in our inverted index,when we type The we will get two suggestion using match query The Deer Hunter and The Lord of the Rings but we wanted these suggestion to popup as soon as we type Th

To support this Elasticsearch provides us with n-grams analyzer ,for search-as-you-type, we use a specialized form of n-grams called edge n-grams. Edge n-grams are anchored to the beginning of the word. Edge n-gramming the word quick would result in this:(taken from Es Guide)
q
qu
qui
quic
quick

Using this analyzer when we index movie document with movie The Deer Hunter we will get following n-grams
T
Th
The
D
De
Dee
Deer
H
Hu
Hun
Hunt
Hunte
Hunter

These n-grams are actually the tokens present in our inverted index and when we type T we will make a exact matching query for term T and will returned by two document id’s and their contents as requested

For full implementation of above behavior and mapping to be used ,Find below link

ESAutoComplete

ElasticSearch save field explained !

If you are storing a document in elasticsearch having a field (name:word press) and you don’t want to store this field,but you can still retrieve the field if you have not disabled the _source (Enabled by default)

Elasticsearch by default saves every document that you send to it and therefore is able to give it back when requested . On the other hand Lucene has some kind of storage where you can store the fields that you want to get retrieved when a document ID is provided.

For example if the follwoing document is indexed in ES with I1 as index name with _source enabled(store is disabled by default)
{
_id:1,
name:”amit hora”,
}

When you query in a way to get all the documents having name=”a*” you will get the above document the reason being ES by default having _source field enabled and returns it with query in this case it will parse the document and will return the name field in doc having value as “a*”

while if you had _source field disabled you have to store the name field explicitly to be searched and retrieved when requested

Keep in mind though that retrieving many stored fields from lucene might require one disk seek per field while with retrieving only the _source from lucene and parsing it in order to retrieve the needed fields is just a single disk seek