luigi.contrib.esindex module

Support for Elasticsearch (1.0.0 or newer).

Provides an ElasticsearchTarget and a CopyToIndex template task.

Modeled after luigi.contrib.rdbms.CopyToTable.

A minimal example (assuming elasticsearch is running on localhost:9200):

class ExampleIndex(CopyToIndex):
    index = 'example'

    def docs(self):
        return [{'_id': 1, 'title': 'An example document.'}]

if __name__ == '__main__':
    task = ExampleIndex()
    luigi.build([task], local_scheduler=True)

All options:

class ExampleIndex(CopyToIndex):
    host = 'localhost'
    port = 9200
    index = 'example'
    doc_type = 'default'
    purge_existing_index = True
    marker_index_hist_size = 1

    def docs(self):
        return [{'_id': 1, 'title': 'An example document.'}]

if __name__ == '__main__':
    task = ExampleIndex()
    luigi.build([task], local_scheduler=True)

Host, port, index, doc_type parameters are standard elasticsearch.

purge_existing_index will delete the index, whenever an update is required. This is useful, when one deals with “dumps” that represent the whole data, not just updates.

marker_index_hist_size sets the maximum number of entries in the ‘marker’ index:

  • 0 (default) keeps all updates,
  • 1 to only remember the most recent update to the index.

This can be useful, if an index needs to recreated, even though the corresponding indexing task has been run sometime in the past - but a later indexing task might have altered the index in the meantime.

There are a two luigi luigi.cfg configuration options:

[elasticsearch]

marker-index = update_log
marker-doc-type = entry
class luigi.contrib.esindex.ElasticsearchTarget(host, port, index, doc_type, update_id, marker_index_hist_size=0, http_auth=None, timeout=10, extra_elasticsearch_args=None)[source]

Bases: luigi.target.Target

Target for a resource in Elasticsearch.

Parameters:
  • host (str) – Elasticsearch server host
  • port (int) – Elasticsearch server port
  • index (str) – index name
  • doc_type (str) – doctype name
  • update_id (str) – an identifier for this data set
  • marker_index_hist_size (int) – list of changes to the index to remember
  • timeout (int) – Elasticsearch connection timeout
  • extra_elasticsearch_args – extra args for Elasticsearch
marker_index = 'update_log'
marker_doc_type = 'entry'
marker_index_document_id()[source]

Generate an id for the indicator document.

touch()[source]

Mark this update as complete.

The document id would be sufficent but, for documentation, we index the parameters update_id, target_index, target_doc_type and date as well.

exists()[source]

Test, if this task has been run.

create_marker_index()[source]

Create the index that will keep track of the tasks if necessary.

ensure_hist_size()[source]

Shrink the history of updates for a index/doc_type combination down to self.marker_index_hist_size.

class luigi.contrib.esindex.CopyToIndex(*args, **kwargs)[source]

Bases: luigi.task.Task

Template task for inserting a data set into Elasticsearch.

Usage:

  1. Subclass and override the required index attribute.
  2. Implement a custom docs method, that returns an iterable over the documents. A document can be a JSON string, e.g. from a newline-delimited JSON (ldj) file (default implementation) or some dictionary.

Optional attributes:

  • doc_type (default),
  • host (localhost),
  • port (9200),
  • settings ({‘settings’: {}})
  • mapping (None),
  • chunk_size (2000),
  • raise_on_error (True),
  • purge_existing_index (False),
  • marker_index_hist_size (0)

If settings are defined, they are only applied at index creation time.

host

ES hostname.

port

ES port.

http_auth

ES optional http auth information as either ‘:’ separated string or a tuple, e.g. (‘user’, ‘pass’) or “user:pass”.

index

The target index.

May exist or not.

doc_type

The target doc_type.

mapping

Dictionary with custom mapping or None.

settings

Settings to be used at index creation time.

chunk_size

Single API call for this number of docs.

raise_on_error

Whether to fail fast.

purge_existing_index

Whether to delete the index completely before any indexing.

marker_index_hist_size

Number of event log entries in the marker index. 0: unlimited.

timeout

Timeout.

extra_elasticsearch_args

Extra arguments to pass to the Elasticsearch constructor

docs()[source]

Return the documents to be indexed.

Beside the user defined fields, the document may contain an _index, _type and _id.

create_index()[source]

Override to provide code for creating the target index.

By default it will be created without any special settings or mappings.

delete_index()[source]

Delete the index, if it exists.

update_id()[source]

This id will be a unique identifier for this indexing task.

output()[source]

Returns a ElasticsearchTarget representing the inserted dataset.

Normally you don’t override this.

run()[source]

Run task, namely:

  • purge existing index, if requested (purge_existing_index),
  • create the index, if missing,
  • apply mappings, if given,
  • set refresh interval to -1 (disable) for performance reasons,
  • bulk index in batches of size chunk_size (2000),
  • set refresh interval to 1s,
  • refresh Elasticsearch,
  • create entry in marker index.