luigi.contrib.esindex module¶
Support for Elasticsearch (1.0.0 or newer).
Provides an ElasticsearchTarget
and a CopyToIndex
template task.
Modeled after luigi.contrib.rdbms.CopyToTable
.
A minimal example (assuming elasticsearch is running on localhost:9200):
class ExampleIndex(CopyToIndex):
index = 'example'
def docs(self):
return [{'_id': 1, 'title': 'An example document.'}]
if __name__ == '__main__':
task = ExampleIndex()
luigi.build([task], local_scheduler=True)
All options:
class ExampleIndex(CopyToIndex):
host = 'localhost'
port = 9200
index = 'example'
doc_type = 'default'
purge_existing_index = True
marker_index_hist_size = 1
def docs(self):
return [{'_id': 1, 'title': 'An example document.'}]
if __name__ == '__main__':
task = ExampleIndex()
luigi.build([task], local_scheduler=True)
Host, port, index, doc_type parameters are standard elasticsearch.
purge_existing_index will delete the index, whenever an update is required. This is useful, when one deals with “dumps” that represent the whole data, not just updates.
marker_index_hist_size sets the maximum number of entries in the ‘marker’ index:
0 (default) keeps all updates,
1 to only remember the most recent update to the index.
This can be useful, if an index needs to recreated, even though the corresponding indexing task has been run sometime in the past - but a later indexing task might have altered the index in the meantime.
There are a two luigi luigi.cfg configuration options:
[elasticsearch]
marker-index = update_log
marker-doc-type = entry
- class luigi.contrib.esindex.ElasticsearchTarget(host, port, index, doc_type, update_id, marker_index_hist_size=0, http_auth=None, timeout=10, extra_elasticsearch_args=None)[source]¶
Bases:
Target
Target for a resource in Elasticsearch.
- Parameters:
host (str) – Elasticsearch server host
port (int) – Elasticsearch server port
index (str) – index name
doc_type (str) – doctype name
update_id (str) – an identifier for this data set
marker_index_hist_size (int) – list of changes to the index to remember
timeout (int) – Elasticsearch connection timeout
extra_elasticsearch_args – extra args for Elasticsearch
- marker_index = 'update_log'¶
- marker_doc_type = 'entry'¶
- class luigi.contrib.esindex.CopyToIndex(*args, **kwargs)[source]¶
Bases:
Task
Template task for inserting a data set into Elasticsearch.
Usage:
Subclass and override the required index attribute.
Implement a custom docs method, that returns an iterable over the documents. A document can be a JSON string, e.g. from a newline-delimited JSON (ldj) file (default implementation) or some dictionary.
Optional attributes:
doc_type (default),
host (localhost),
port (9200),
settings ({‘settings’: {}})
mapping (None),
chunk_size (2000),
raise_on_error (True),
purge_existing_index (False),
marker_index_hist_size (0)
If settings are defined, they are only applied at index creation time.
- property host¶
ES hostname.
- property port¶
ES port.
- property http_auth¶
ES optional http auth information as either ‘:’ separated string or a tuple, e.g. (‘user’, ‘pass’) or “user:pass”.
- abstract property index¶
The target index.
May exist or not.
- property doc_type¶
The target doc_type.
- property mapping¶
Dictionary with custom mapping or None.
- property settings¶
Settings to be used at index creation time.
- property chunk_size¶
Single API call for this number of docs.
- property raise_on_error¶
Whether to fail fast.
- property purge_existing_index¶
Whether to delete the index completely before any indexing.
- property marker_index_hist_size¶
Number of event log entries in the marker index. 0: unlimited.
- property timeout¶
Timeout.
- property extra_elasticsearch_args¶
Extra arguments to pass to the Elasticsearch constructor
- docs()[source]¶
Return the documents to be indexed.
Beside the user defined fields, the document may contain an _index, _type and _id.
- create_index()[source]¶
Override to provide code for creating the target index.
By default it will be created without any special settings or mappings.
- output()[source]¶
Returns a ElasticsearchTarget representing the inserted dataset.
Normally you don’t override this.
- run()[source]¶
Run task, namely:
purge existing index, if requested (purge_existing_index),
create the index, if missing,
apply mappings, if given,
set refresh interval to -1 (disable) for performance reasons,
bulk index in batches of size chunk_size (2000),
set refresh interval to 1s,
refresh Elasticsearch,
create entry in marker index.