luigi.scheduler module

The system for scheduling tasks and executing them in order. Deals with dependencies, priorities, resources, etc. The Worker pulls tasks from the scheduler (usually over the REST interface) and executes them. See Using the Central Scheduler for more info.

luigi.scheduler.UPSTREAM_SEVERITY_KEY(value, start=0, stop=9223372036854775807, /)

Return first index of value.

Raises ValueError if the value is not present.

class luigi.scheduler.RetryPolicy(retry_count, disable_hard_timeout, disable_window)

Bases: tuple

Create new instance of RetryPolicy(retry_count, disable_hard_timeout, disable_window)

disable_hard_timeout

Alias for field number 1

disable_window

Alias for field number 2

retry_count

Alias for field number 0

luigi.scheduler.rpc_method(**request_args)[source]
class luigi.scheduler.scheduler(*args, **kwargs)[source]

Bases: Config

retry_delay = FloatParameter (defaults to 900.0)
remove_delay = FloatParameter (defaults to 600.0)
worker_disconnect_delay = FloatParameter (defaults to 60.0)
state_path = Parameter (defaults to /var/lib/luigi-server/state.pickle)
batch_emails = BoolParameter (defaults to False): Send e-mails in batches rather than immediately
disable_window = IntParameter (defaults to 3600)
retry_count = IntParameter (defaults to 999999999)
disable_hard_timeout = IntParameter (defaults to 999999999)
disable_persist = IntParameter (defaults to 86400)
max_shown_tasks = IntParameter (defaults to 100000)
max_graph_nodes = IntParameter (defaults to 100000)
record_task_history = BoolParameter (defaults to False)
prune_on_get_work = BoolParameter (defaults to False)
pause_enabled = BoolParameter (defaults to True)
send_messages = BoolParameter (defaults to True)
metrics_collector = EnumParameter (defaults to MetricsCollectors.default)
metrics_custom_import = OptionalStrParameter (defaults to None)
stable_done_cooldown_secs = IntParameter (defaults to 10): Sets cooldown period to avoid running the same task twice

Sets a cooldown period in seconds after a task was completed, during this period the same task will not accepted by the scheduler.

class luigi.scheduler.OrderedSet(iterable=None)[source]

Bases: MutableSet

Standard Python OrderedSet recipe found at http://code.activestate.com/recipes/576694/

Modified to include a peek function to get the last element

add(key)[source]

Add an element.

discard(key)[source]

Remove an element. Do not raise an exception if absent.

peek(last=True)[source]
pop(last=True)[source]

Return the popped value. Raise KeyError if empty.

class luigi.scheduler.Task(task_id, status, deps, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional')[source]

Bases: object

set_params(params)[source]
is_batchable()[source]
add_failure()[source]

Add a failure event with the current timestamp.

num_failures()[source]

Return the number of failures in the window.

has_excessive_failures()[source]
clear_failures()[source]

Clear the failures history

property pretty_id
class luigi.scheduler.Worker(worker_id, last_active=None)[source]

Bases: object

Structure for tracking worker activity and keeping their references.

add_info(info)[source]
update(worker_reference, get_work=False)[source]
prune(config)[source]
get_tasks(state, *statuses)[source]
is_trivial_worker(state)[source]

If it’s not an assistant having only tasks that are without requirements.

We have to pass the state parameter for optimization reasons.

property assistant
property enabled
property state
add_rpc_message(name, **kwargs)[source]
fetch_rpc_messages()[source]
class luigi.scheduler.SimpleTaskState(state_path)[source]

Bases: object

Keep track of the current state and handle persistence.

The point of this class is to enable other ways to keep state, eg. by using a database These will be implemented by creating an abstract base class that this and other classes inherit from.

get_state()[source]
set_state(state)[source]
dump()[source]
load()[source]
get_active_tasks()[source]
get_active_tasks_by_status(*statuses)[source]
get_active_task_count_for_status(status)[source]
get_batch_running_tasks(batch_id)[source]
set_batcher(worker_id, family, batcher_args, max_batch_size)[source]
get_batcher(worker_id, family)[source]
num_pending_tasks()[source]

Return how many tasks are PENDING + RUNNING. O(1).

get_task(task_id, default=None, setdefault=None)[source]
has_task(task_id)[source]
re_enable(task, config=None)[source]
set_batch_running(task, batch_id, worker_id)[source]
set_status(task, new_status, config=None)[source]
fail_dead_worker_task(task, config, assistants)[source]
update_status(task, config)[source]
may_prune(task)[source]
inactivate_tasks(delete_tasks)[source]
get_active_workers(last_active_lt=None, last_get_work_gt=None)[source]
get_assistants(last_active_lt=None)[source]
get_worker_ids()[source]
get_worker(worker_id)[source]
inactivate_workers(delete_workers)[source]
disable_workers(worker_ids)[source]
update_metrics(task, config)[source]
class luigi.scheduler.Scheduler(config=None, resources=None, task_history_impl=None, **kwargs)[source]

Bases: object

Async scheduler that can handle multiple workers, etc.

Can be run locally or on a server (using RemoteScheduler + server.Server).

Keyword Arguments: :param config: an object of class “scheduler” or None (in which the global instance will be used) :param resources: a dict of str->int constraints :param task_history_impl: ignore config and use this object as the task history

load()[source]
dump()[source]
prune()[source]
add_task_batcher(worker, task_family, batched_args, max_batch_size=inf)[source]
forgive_failures(task_id=None)[source]
mark_as_done(task_id=None)[source]
add_task(task_id=None, status='PENDING', runnable=True, deps=None, new_deps=None, expl=None, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, assistant=False, tracking_url=None, worker=None, batchable=None, batch_id=None, retry_policy_dict=None, owners=None, **kwargs)[source]
  • add task identified by task_id if it doesn’t exist

  • if deps is not None, update dependency list

  • update status of task

  • add additional workers/stakeholders

  • update priority when needed

announce_scheduling_failure(task_name, family, params, expl, owners, **kwargs)[source]
add_worker(worker, info, **kwargs)[source]
disable_worker(worker)[source]
set_worker_processes(worker, n)[source]
send_scheduler_message(worker, task, content)[source]
add_scheduler_message_response(task_id, message_id, response)[source]
get_scheduler_message_response(task_id, message_id)[source]
has_task_history()[source]
is_pause_enabled()[source]
is_paused()[source]
pause()[source]
unpause()[source]
update_resources(**resources)[source]
update_resource(resource, amount)[source]
count_pending(worker)[source]
get_work(host=None, assistant=False, current_tasks=None, worker=None, **kwargs)[source]
ping(**kwargs)[source]
graph(**kwargs)[source]
dep_graph(task_id, include_done=True, **kwargs)[source]
inverse_dep_graph(task_id, include_done=True, **kwargs)[source]
task_list(status='', upstream_status='', limit=True, search=None, max_shown_tasks=None, **kwargs)[source]

Query for a subset of tasks by status.

worker_list(include_running=True, **kwargs)[source]
resource_list()[source]

Resources usage info and their consumers (tasks).

resources()[source]

get total resources and available ones

Query for a subset of tasks by task_id.

Parameters:

task_str

Returns:

re_enable_task(task_id)[source]
fetch_error(task_id, **kwargs)[source]
set_task_status_message(task_id, status_message)[source]
get_task_status_message(task_id)[source]
set_task_progress_percentage(task_id, progress_percentage)[source]
get_task_progress_percentage(task_id)[source]
decrease_running_task_resources(task_id, decrease_resources)[source]
get_running_task_resources(task_id)[source]
property task_history
update_metrics_task_started(task)[source]