luigi.scheduler

The system for scheduling tasks and executing them in order. Deals with dependencies, priorities, resources, etc. The Worker pulls tasks from the scheduler (usually over the REST interface) and executes them. See Using the Central Scheduler for more info.

Functions

rpc_method(**request_args)

Classes

OrderedSet([iterable])

Standard Python OrderedSet recipe found at http://code.activestate.com/recipes/576694/

RetryPolicy(retry_count, ...)

Create new instance of RetryPolicy(retry_count, disable_hard_timeout, disable_window)

Scheduler([config, resources, task_history_impl])

Async scheduler that can handle multiple workers, etc.

SimpleTaskState(state_path)

Keep track of the current state and handle persistence.

Task(task_id, status, deps[, resources, ...])

Worker(worker_id[, last_active])

Structure for tracking worker activity and keeping their references.

scheduler(*args, **kwargs)

luigi.scheduler.UPSTREAM_SEVERITY_KEY(value, start=0, stop=9223372036854775807, /)

Return first index of value.

Raises ValueError if the value is not present.

class luigi.scheduler.RetryPolicy(retry_count, disable_hard_timeout, disable_window)

Create new instance of RetryPolicy(retry_count, disable_hard_timeout, disable_window)

disable_hard_timeout

Alias for field number 1

disable_window

Alias for field number 2

retry_count

Alias for field number 0

luigi.scheduler.rpc_method(**request_args)[source]
class luigi.scheduler.scheduler(*args, **kwargs)[source]
retry_delay

Parameter whose value is a float.

remove_delay

Parameter whose value is a float.

worker_disconnect_delay

Parameter whose value is a float.

state_path

Parameter whose value is a str, and a base class for other parameter types.

Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:

class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

This makes it possible to instantiate multiple tasks, eg MyTask(foo='bar') and MyTask(foo='baz'). The task will then have the foo attribute set appropriately.

When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate a = TaskA(x=44) then a.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:

  • Any value provided on the command line:

    • To the root task (eg. --param xyz)

    • Then to the class, using the qualified task name syntax (eg. --TaskA-param xyz).

  • With [TASK_NAME]>PARAM_NAME: <serialized value> syntax. See Parameters from config Ingestion

  • Any default value set using the default flag.

Parameter objects may be reused, but you must then set the positional=False flag.

batch_emails

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

disable_window

Parameter whose value is an int.

retry_count

Parameter whose value is an int.

disable_hard_timeout

Parameter whose value is an int.

disable_persist

Parameter whose value is an int.

max_shown_tasks

Parameter whose value is an int.

max_graph_nodes

Parameter whose value is an int.

record_task_history

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

prune_on_get_work

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

pause_enabled

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

send_messages

A Parameter whose value is a bool. This parameter has an implicit default value of False. For the command line interface this means that the value is False unless you add "--the-bool-parameter" to your command without giving a parameter value. This is considered implicit parsing (the default). However, in some situations one might want to give the explicit bool value ("--the-bool-parameter true|false"), e.g. when you configure the default value to be True. This is called explicit parsing. When omitting the parameter value, it is still considered True but to avoid ambiguities during argument parsing, make sure to always place bool parameters behind the task family on the command line when using explicit parsing.

You can toggle between the two parsing modes on a per-parameter base via

class MyTask(luigi.Task):
    implicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.IMPLICIT_PARSING)
    explicit_bool = luigi.BoolParameter(parsing=luigi.BoolParameter.EXPLICIT_PARSING)

or globally by

luigi.BoolParameter.parsing = luigi.BoolParameter.EXPLICIT_PARSING

for all bool parameters instantiated after this line.

metrics_collector

A parameter whose value is an Enum.

In the task definition, use

class Model(enum.Enum):
  Honda = 1
  Volvo = 2

class MyTask(luigi.Task):
  my_param = luigi.EnumParameter(enum=Model)

At the command line, use,

$ luigi --module my_tasks MyTask --my-param Honda
metrics_custom_import

Class to parse optional str parameters.

stable_done_cooldown_secs

Sets a cooldown period in seconds after a task was completed, during this period the same task will not accepted by the scheduler.

class luigi.scheduler.OrderedSet(iterable=None)[source]

Standard Python OrderedSet recipe found at http://code.activestate.com/recipes/576694/

Modified to include a peek function to get the last element

add(key)[source]

Add an element.

discard(key)[source]

Remove an element. Do not raise an exception if absent.

peek(last=True)[source]
pop(last=True)[source]

Return the popped value. Raise KeyError if empty.

class luigi.scheduler.Task(task_id, status, deps, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional')[source]
DEFAULT_PRIORITY = 0
set_params(params)[source]
is_batchable()[source]
add_failure()[source]

Add a failure event with the current timestamp.

num_failures()[source]

Return the number of failures in the window.

has_excessive_failures()[source]
clear_failures()[source]

Clear the failures history

property pretty_id
class luigi.scheduler.Worker(worker_id, last_active=None)[source]

Structure for tracking worker activity and keeping their references.

add_info(info)[source]
update(worker_reference, get_work=False)[source]
prune(config)[source]
get_tasks(state, *statuses)[source]
is_trivial_worker(state)[source]

If it’s not an assistant having only tasks that are without requirements.

We have to pass the state parameter for optimization reasons.

property assistant
property enabled
property state
add_rpc_message(name, **kwargs)[source]
fetch_rpc_messages()[source]
class luigi.scheduler.SimpleTaskState(state_path)[source]

Keep track of the current state and handle persistence.

The point of this class is to enable other ways to keep state, eg. by using a database These will be implemented by creating an abstract base class that this and other classes inherit from.

get_state()[source]
set_state(state)[source]
dump()[source]
load()[source]
get_active_tasks()[source]
get_active_tasks_by_status(*statuses)[source]
get_active_task_count_for_status(status)[source]
get_batch_running_tasks(batch_id)[source]
set_batcher(worker_id, family, batcher_args, max_batch_size)[source]
get_batcher(worker_id, family)[source]
num_pending_tasks()[source]

Return how many tasks are PENDING + RUNNING. O(1).

get_task(task_id, default=None, setdefault=None)[source]
has_task(task_id)[source]
re_enable(task, config=None)[source]
set_batch_running(task, batch_id, worker_id)[source]
set_status(task, new_status, config=None)[source]
fail_dead_worker_task(task, config, assistants)[source]
update_status(task, config)[source]
may_prune(task)[source]
inactivate_tasks(delete_tasks)[source]
get_active_workers(last_active_lt=None, last_get_work_gt=None)[source]
get_assistants(last_active_lt=None)[source]
get_worker_ids()[source]
get_worker(worker_id)[source]
inactivate_workers(delete_workers)[source]
disable_workers(worker_ids)[source]
update_metrics(task, config)[source]
class luigi.scheduler.Scheduler(config=None, resources=None, task_history_impl=None, **kwargs)[source]

Async scheduler that can handle multiple workers, etc.

Can be run locally or on a server (using RemoteScheduler + server.Server).

Keyword Arguments: :param config: an object of class “scheduler” or None (in which the global instance will be used) :param resources: a dict of str->int constraints :param task_history_impl: ignore config and use this object as the task history

load()[source]
dump()[source]
prune()[source]
add_task_batcher(worker, task_family, batched_args, max_batch_size=inf)[source]
forgive_failures(task_id=None)[source]
mark_as_done(task_id=None)[source]
add_task(task_id=None, status='PENDING', runnable=True, deps=None, new_deps=None, expl=None, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, assistant=False, tracking_url=None, worker=None, batchable=None, batch_id=None, retry_policy_dict=None, owners=None, **kwargs)[source]
  • add task identified by task_id if it doesn’t exist

  • if deps is not None, update dependency list

  • update status of task

  • add additional workers/stakeholders

  • update priority when needed

announce_scheduling_failure(task_name, family, params, expl, owners, **kwargs)[source]
add_worker(worker, info, **kwargs)[source]
disable_worker(worker)[source]
set_worker_processes(worker, n)[source]
send_scheduler_message(worker, task, content)[source]
add_scheduler_message_response(task_id, message_id, response)[source]
get_scheduler_message_response(task_id, message_id)[source]
has_task_history()[source]
is_pause_enabled()[source]
is_paused()[source]
pause()[source]
unpause()[source]
update_resources(**resources)[source]
update_resource(resource, amount)[source]
count_pending(worker)[source]
get_work(host=None, assistant=False, current_tasks=None, worker=None, **kwargs)[source]
ping(**kwargs)[source]
graph(**kwargs)[source]
dep_graph(task_id, include_done=True, **kwargs)[source]
inverse_dep_graph(task_id, include_done=True, **kwargs)[source]
task_list(status='', upstream_status='', limit=True, search=None, max_shown_tasks=None, **kwargs)[source]

Query for a subset of tasks by status.

worker_list(include_running=True, **kwargs)[source]
resource_list()[source]

Resources usage info and their consumers (tasks).

resources()[source]

get total resources and available ones

Query for a subset of tasks by task_id.

Parameters:

task_str

Returns:

re_enable_task(task_id)[source]
fetch_error(task_id, **kwargs)[source]
set_task_status_message(task_id, status_message)[source]
get_task_status_message(task_id)[source]
set_task_progress_percentage(task_id, progress_percentage)[source]
get_task_progress_percentage(task_id)[source]
decrease_running_task_resources(task_id, decrease_resources)[source]
get_running_task_resources(task_id)[source]
property task_history
update_metrics_task_started(task)[source]
report_task_statistics(task_id, statistics)[source]