Writing an Elasticsearch Ingest plug-in

Elasticsearch is written in Java and all the various modules are hooked up with Google Guice during startup of the Elasticsearch server. Apart from that a plug-in API is provided whereby the functionality of Elasticsearch can further be extended. In this article we will demonstrate how to write an ingest plug-in for Elasticsearch that can filter a certain keyword from a document field before it is inidexed. The example plug-in is fairly simple but gives you enough knowledge to write a full-blown Elasticsearch ingest plug-in that is capable of processing documents before they are actually indexed.

The Elasticsearch plug-in API

 Elasticsearch loads plug-in from the plugins directory of Elasticsearch. Each plug-in needs to provide a class that implements the org.elasticsearch.plugins.Plugin interface. In addition the class may also implement a specific interface from the same org.elasticsearch.plugins package that determines the type of plug-in such as: 

ActionPlugin Plug-ins that extend scripting functionality for e.g. updateByQuery or deleteByQuery requests
AnalysisPlugin Plug-ins that provided additional analysers or extend Elasticsearch analysis functionality
ClusterPlugin  Plug-ins that add custom behavior to cluster management
DiscoveryPlugin  Plug-ins that extend the Elasticsearch discovery functionality
IngestPlugin  Ingest plug-ins
MapperPlugin  Plug-ins that provide custom mappers
NetworkPlugin  Plug-ins that extend network and transport related capabilities of Elasticsearch
RepositoryPlugin  Plug-ins that provided custom shapshot repositories
ScriptPlugin  Plug-ins that extend Elasticsearch scripting functionality such as a new scripting language
SearchPlugin  Plug-ins that extend search capabilities of Elasticsearch
ReloadablePlugin  Reloadable plug-ins

In addition to that each plug-in needs to supply plug-in metadata to elasticsearch in a plugin-descriptor.properties file and optionally a plugin-security.policy file with access control permissions required by the plug-in (as specified by the JDK security sandbox model).

 

Creating the filter ingest plug-in

We are going to create the Elasticsearch plug-in as a Maven project using Eclipse. Create a new Maven project from New -> Maven Project and in the archetype selection window type elasticsearch and select the elasticsearch-plugin-archetype from org.codelibs. The archetype provides a good starting point that generates pom.xml file with necessary dependencies, assemblies.xml file that bundles the plug-in as a zip archive, plug-in metadata and initial Java classes for the plug-in.

es plugin maven archetype

Specify proper Maven configuration for the plug-in along with properties specific for the Maven archetype. For elasticsearchVersion we are going to specify 7.6.0 as the version of Elasticsearch against which the plug-in is being developed.

es maven plugin configuration

We need to rename the generated package accordingly, we our case we are using com.martin_toshev.elasticsearch.plugin.filter and also the generated plug-in class to FilterIngestPlugin. The rest of the generated classes are not needed and may be removed. The FilterIngestPlugin class has the following logic:

package com.martin_toshev.elasticsearch.plugin.filter;

 

import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;

public class FilterIngestPlugin extends Plugin implements IngestPlugin {

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
      Map<String, Processor.Factory> processors = new HashMap<>();
      processors.put(FilterWordProcessor.TYPE, new FilterWordProcessor.Factory());
      return processors;
   }
}

 The getProcessors method returns one or more ingest processors that can be used by Elasticsearch once the plug-in is installed. In this case we are registering one processor provided by the FilterWordProcess class with the following implementation:

package com.martin_toshev.elasticsearch.plugin.filter;

import java.util.Map;

import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

public class FilterWordProcessor extends AbstractProcessor {

   public static final String TYPE = "filter_word";

   private String filterWord;

   private String field;

   public FilterWordProcessor(String tag, String filterWord, String field) {
      super(tag);
      this.filterWord = filterWord;
      this.field = field;
   }

   @Override
   public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
      IngestDocument document = ingestDocument;
      String value = document.getFieldValue(field, String.class);
      String clearedValue = value.replace(filterWord, "");
      document.setFieldValue(field, clearedValue);
      return document;
   }

   @Override
   public String getType() {
      return TYPE;
   }

   public static final class Factory implements Processor.Factory {

      @Override
      public Processor create(Map<String, Processor.Factory> registry, String processorTag,
            Map<String, Object> config) throws Exception {

         String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
         String filterWord = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "filterWord");

         return new FilterWordProcessor(processorTag, filterWord, field);
      }
   }

}

The ingest processor has type called filter_word which is used by Elasticsearch to manage it. The plug-in uses two properties that are specified when the processor is created: field that indicates which document field to filter and filterWord to indicate the word in the document field to filter out before the document is indexed. The execute method provides the logic of the processor which if fairly straight-forward.

We also need to modify the generated plugin-descriptor.properties file as follows:

description=Filter ingest plug-in
version=${project.version}
name=filter-ingest-plugin
classname=${elasticsearch.plugin.classname}
elasticsearch.version=${elasticsearch.version}
java.version=${maven.compiler.target}

In the pom.xml file elasticsearch.plugin.classname property to com.martin_toshev.elasticsearch.plugin.filter.FilterIngestPlugin (or a different one that is specified).

 

 

Building the plug-in

To build the plug-in navigate to the project directory from the command line and execute the following:

mvn clean install

After the build is finished successfully the plug-in archive is generated under target/releases: filter-ingest-plugin-1.0.0-SNAPSHOT.zip

 

 

Deploying the plug-in

 To install the plug-in the elasticsearch-plugin utility from the Elasticsearch installation can be used (change the path to the filter plug-in archive properly):

elasticsearch-plugin.bat install file:///D:\project\filter-ingest-plugin\target\releases\filter-ingest-plugin-1.0.0-SNAPSHOT.zip

Once the plug-in is installed Elasticsearch needs to be restarted. Make sure that the plug-in archive is properly expanded under the plugins directory of Elasticsearch.

 

Testing the plug-in

 To test the plug-in first start a Kibana developer console and create an ingest pipeline that uses the filter_word processor registered by the plug-in to filter the crap word from the description field of an indexed document:

PUT /_ingest/pipeline/filter_crap
{
   "processors": [
      {
          "filter_word" :
               { "field" : "description", "filterWord" : "crap" }
      }
   ]
}

Then index a document in the order_data index using the created filter_crap ingest pipeline (we are not going to create an explicit mapping for the index):

PUT /order_data/_doc/1?pipeline=filter_crap
{
   "description": "crap ! Don't buy this."
}

Now inspect the documents in the index and verify that the above indexed document description field has been properly filtered:

GET /order_data/_search
{
   "query": {
      "match_all": {}
   }
}

 

 

 

Share