luigi.contrib.esindex
Support for Elasticsearch (1.0.0 or newer).
Provides an ElasticsearchTarget and a CopyToIndex template task.
Modeled after luigi.contrib.rdbms.CopyToTable.
A minimal example (assuming elasticsearch is running on localhost:9200):
class ExampleIndex(CopyToIndex):
index = 'example'
def docs(self):
return [{'_id': 1, 'title': 'An example document.'}]
if __name__ == '__main__':
task = ExampleIndex()
luigi.build([task], local_scheduler=True)
All options:
class ExampleIndex(CopyToIndex):
host = 'localhost'
port = 9200
index = 'example'
doc_type = 'default'
purge_existing_index = True
marker_index_hist_size = 1
def docs(self):
return [{'_id': 1, 'title': 'An example document.'}]
if __name__ == '__main__':
task = ExampleIndex()
luigi.build([task], local_scheduler=True)
Host, port, index, doc_type parameters are standard elasticsearch.
purge_existing_index will delete the index, whenever an update is required. This is useful, when one deals with “dumps” that represent the whole data, not just updates.
marker_index_hist_size sets the maximum number of entries in the ‘marker’ index:
0 (default) keeps all updates,
1 to only remember the most recent update to the index.
This can be useful, if an index needs to recreated, even though the corresponding indexing task has been run sometime in the past - but a later indexing task might have altered the index in the meantime.
There are a two luigi luigi.cfg configuration options:
[elasticsearch]
marker-index = update_log
marker-doc-type = entry
Classes
|
Template task for inserting a data set into Elasticsearch. |
|
Target for a resource in Elasticsearch. |
- class luigi.contrib.esindex.ElasticsearchTarget(host, port, index, doc_type, update_id, marker_index_hist_size=0, http_auth=None, timeout=10, extra_elasticsearch_args=None)[source]
Target for a resource in Elasticsearch.
- Parameters:
host (str) – Elasticsearch server host
port (int) – Elasticsearch server port
index (str) – index name
doc_type (str) – doctype name
update_id (str) – an identifier for this data set
marker_index_hist_size (int) – list of changes to the index to remember
timeout (int) – Elasticsearch connection timeout
extra_elasticsearch_args – extra args for Elasticsearch
- marker_index = 'update_log'
- marker_doc_type = 'entry'
- class luigi.contrib.esindex.CopyToIndex(*args, **kwargs)[source]
Template task for inserting a data set into Elasticsearch.
Usage:
Subclass and override the required index attribute.
Implement a custom docs method, that returns an iterable over the documents. A document can be a JSON string, e.g. from a newline-delimited JSON (ldj) file (default implementation) or some dictionary.
Optional attributes:
doc_type (default),
host (localhost),
port (9200),
settings ({‘settings’: {}})
mapping (None),
chunk_size (2000),
raise_on_error (True),
purge_existing_index (False),
marker_index_hist_size (0)
If settings are defined, they are only applied at index creation time.
- property host
ES hostname.
- property port
ES port.
- property http_auth
ES optional http auth information as either ‘:’ separated string or a tuple, e.g. (‘user’, ‘pass’) or “user:pass”.
- abstract property index
The target index.
May exist or not.
- property doc_type
The target doc_type.
- property mapping
Dictionary with custom mapping or None.
- property settings
Settings to be used at index creation time.
- property chunk_size
Single API call for this number of docs.
- property raise_on_error
Whether to fail fast.
- property purge_existing_index
Whether to delete the index completely before any indexing.
- property marker_index_hist_size
Number of event log entries in the marker index. 0: unlimited.
- property timeout
Timeout.
- property extra_elasticsearch_args
Extra arguments to pass to the Elasticsearch constructor
- docs()[source]
Return the documents to be indexed.
Beside the user defined fields, the document may contain an _index, _type and _id.
- create_index()[source]
Override to provide code for creating the target index.
By default it will be created without any special settings or mappings.
- output()[source]
Returns a ElasticsearchTarget representing the inserted dataset.
Normally you don’t override this.
- run()[source]
Run task, namely:
purge existing index, if requested (purge_existing_index),
create the index, if missing,
apply mappings, if given,
set refresh interval to -1 (disable) for performance reasons,
bulk index in batches of size chunk_size (2000),
set refresh interval to 1s,
refresh Elasticsearch,
create entry in marker index.