luigi.scheduler
The system for scheduling tasks and executing them in order.
Deals with dependencies, priorities, resources, etc.
The Worker pulls tasks from the scheduler (usually over the REST interface) and executes them.
See Using the Central Scheduler for more info.
Functions
|
Classes
|
Standard Python OrderedSet recipe found at http://code.activestate.com/recipes/576694/ |
|
Create new instance of RetryPolicy(retry_count, disable_hard_timeout, disable_window) |
|
Async scheduler that can handle multiple workers, etc. |
|
Keep track of the current state and handle persistence. |
|
|
|
Structure for tracking worker activity and keeping their references. |
|
- luigi.scheduler.UPSTREAM_SEVERITY_KEY(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- class luigi.scheduler.RetryPolicy(retry_count, disable_hard_timeout, disable_window)
Create new instance of RetryPolicy(retry_count, disable_hard_timeout, disable_window)
- disable_hard_timeout
Alias for field number 1
- disable_window
Alias for field number 2
- retry_count
Alias for field number 0
- class luigi.scheduler.scheduler(*args, **kwargs)[source]
- retry_delay
Parameter whose value is a
float.
- remove_delay
Parameter whose value is a
float.
- worker_disconnect_delay
Parameter whose value is a
float.
- state_path
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- batch_emails
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- disable_window
Parameter whose value is an
int.
- retry_count
Parameter whose value is an
int.
- disable_hard_timeout
Parameter whose value is an
int.
- disable_persist
Parameter whose value is an
int.
- max_shown_tasks
Parameter whose value is an
int.
- max_graph_nodes
Parameter whose value is an
int.
- record_task_history
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- prune_on_get_work
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- pause_enabled
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- send_messages
A Parameter whose value is a
bool. This parameter has an implicit default value ofFalse. For the command line interface this means that the value isFalseunless you add"--the-bool-parameter"to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to beTrue. This is called explicit parsing. When omitting the parameter value, it is still consideredTruebut to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.You can toggle between the two parsing modes on a per-parameter base via
class MyTask(luigi.Task): implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING) explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)
or globally by
luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING
for all bool parameters instantiated after this line.
- metrics_collector
A parameter whose value is an
Enum.In the task definition, use
class Model(enum.Enum): Honda = 1 Volvo = 2 class MyTask(luigi.Task): my_param = luigi.EnumParameter(enum=Model)
At the command line, use,
$ luigi --module my_tasks MyTask --my-param Honda
- metrics_custom_import
Class to parse optional str parameters.
- stable_done_cooldown_secs
Sets a cooldown period in seconds after a task was completed, during this period the same task will not accepted by the scheduler.
- class luigi.scheduler.OrderedSet(iterable=None)[source]
Standard Python OrderedSet recipe found at http://code.activestate.com/recipes/576694/
Modified to include a peek function to get the last element
- class luigi.scheduler.Task(task_id, status, deps, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional')[source]
- DEFAULT_PRIORITY = 0
- property pretty_id
- class luigi.scheduler.Worker(worker_id, last_active=None)[source]
Structure for tracking worker activity and keeping their references.
- is_trivial_worker(state)[source]
If it’s not an assistant having only tasks that are without requirements.
We have to pass the state parameter for optimization reasons.
- property assistant
- property enabled
- property state
- class luigi.scheduler.SimpleTaskState(state_path)[source]
Keep track of the current state and handle persistence.
The point of this class is to enable other ways to keep state, eg. by using a database These will be implemented by creating an abstract base class that this and other classes inherit from.
- class luigi.scheduler.Scheduler(config=None, resources=None, task_history_impl=None, **kwargs)[source]
Async scheduler that can handle multiple workers, etc.
Can be run locally or on a server (using RemoteScheduler + server.Server).
Keyword Arguments: :param config: an object of class “scheduler” or None (in which the global instance will be used) :param resources: a dict of str->int constraints :param task_history_impl: ignore config and use this object as the task history
- add_task(task_id=None, status='PENDING', runnable=True, deps=None, new_deps=None, expl=None, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, assistant=False, tracking_url=None, worker=None, batchable=None, batch_id=None, retry_policy_dict=None, owners=None, **kwargs)[source]
add task identified by task_id if it doesn’t exist
if deps is not None, update dependency list
update status of task
add additional workers/stakeholders
update priority when needed
- task_list(status='', upstream_status='', limit=True, search=None, max_shown_tasks=None, **kwargs)[source]
Query for a subset of tasks by status.
- task_search(task_str, **kwargs)[source]
Query for a subset of tasks by task_id.
- Parameters:
task_str
- Returns:
- property task_history