luigi.scheduler module

The system for scheduling tasks and executing them in order. Deals with dependencies, priorities, resources, etc. The Worker pulls tasks from the scheduler (usually over the REST interface) and executes them. See Using the Central Scheduler for more info.

luigi.scheduler.UPSTREAM_SEVERITY_KEY()

T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.

class luigi.scheduler.RetryPolicy(retry_count, disable_hard_timeout, disable_window)

Bases: tuple

Create new instance of RetryPolicy(retry_count, disable_hard_timeout, disable_window)

disable_hard_timeout

Alias for field number 1

disable_window

Alias for field number 2

retry_count

Alias for field number 0

luigi.scheduler.rpc_method(**request_args)[source]
class luigi.scheduler.scheduler(*args, **kwargs)[source]

Bases: luigi.task.Config

retry_delay = FloatParameter (defaults to 900.0)
remove_delay = FloatParameter (defaults to 600.0)
worker_disconnect_delay = FloatParameter (defaults to 60.0)
state_path = Parameter (defaults to /var/lib/luigi-server/state.pickle)
batch_emails = BoolParameter (defaults to False): Send e-mails in batches rather than immediately
disable_window = IntParameter (defaults to 3600)
retry_count = IntParameter (defaults to 999999999)
disable_hard_timeout = IntParameter (defaults to 999999999)
disable_persist = IntParameter (defaults to 86400)
max_shown_tasks = IntParameter (defaults to 100000)
max_graph_nodes = IntParameter (defaults to 100000)
record_task_history = BoolParameter (defaults to False)
prune_on_get_work = BoolParameter (defaults to False)
class luigi.scheduler.Failures(window)[source]

Bases: object

This class tracks the number of failures in a given time window.

Failures added are marked with the current timestamp, and this class counts the number of failures in a sliding time window ending at the present.

Initialize with the given window.

Parameters:window – how long to track failures for, as a float (number of seconds).
add_failure()[source]

Add a failure event with the current timestamp.

num_failures()[source]

Return the number of failures in the window.

clear()[source]

Clear the failure queue.

class luigi.scheduler.OrderedSet(iterable=None)[source]

Bases: _abcoll.MutableSet

Standard Python OrderedSet recipe found at http://code.activestate.com/recipes/576694/

Modified to include a peek function to get the last element

add(key)[source]
discard(key)[source]
peek(last=True)[source]
pop(last=True)[source]
class luigi.scheduler.Task(task_id, status, deps, resources=None, priority=0, family='', module=None, params=None, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional')[source]

Bases: object

is_batchable()[source]
add_failure()[source]
has_excessive_failures()[source]
pretty_id
class luigi.scheduler.Worker(worker_id, last_active=None)[source]

Bases: object

Structure for tracking worker activity and keeping their references.

add_info(info)[source]
update(worker_reference, get_work=False)[source]
prune(config)[source]
get_tasks(state, *statuses)[source]
is_trivial_worker(state)[source]

If it’s not an assistant having only tasks that are without requirements.

We have to pass the state parameter for optimization reasons.

assistant
enabled
state
add_rpc_message(name, **kwargs)[source]
fetch_rpc_messages()[source]
class luigi.scheduler.SimpleTaskState(state_path)[source]

Bases: object

Keep track of the current state and handle persistance.

The point of this class is to enable other ways to keep state, eg. by using a database These will be implemented by creating an abstract base class that this and other classes inherit from.

get_state()[source]
set_state(state)[source]
dump()[source]
load()[source]
get_active_tasks()[source]
get_active_tasks_by_status(*statuses)[source]
get_batch_running_tasks(batch_id)[source]
set_batcher(worker_id, family, batcher_args, max_batch_size)[source]
get_batcher(worker_id, family)[source]
num_pending_tasks()[source]

Return how many tasks are PENDING + RUNNING. O(1).

get_task(task_id, default=None, setdefault=None)[source]
has_task(task_id)[source]
re_enable(task, config=None)[source]
set_batch_running(task, batch_id, worker_id)[source]
set_status(task, new_status, config=None)[source]
fail_dead_worker_task(task, config, assistants)[source]
update_status(task, config)[source]
may_prune(task)[source]
inactivate_tasks(delete_tasks)[source]
get_active_workers(last_active_lt=None, last_get_work_gt=None)[source]
get_assistants(last_active_lt=None)[source]
get_worker_ids()[source]
get_worker(worker_id)[source]
inactivate_workers(delete_workers)[source]
disable_workers(worker_ids)[source]
class luigi.scheduler.Scheduler(config=None, resources=None, task_history_impl=None, **kwargs)[source]

Bases: object

Async scheduler that can handle multiple workers, etc.

Can be run locally or on a server (using RemoteScheduler + server.Server).

Keyword Arguments: :param config: an object of class “scheduler” or None (in which the global instance will be used) :param resources: a dict of str->int constraints :param task_history_impl: ignore config and use this object as the task history

load()[source]
dump()[source]
prune()[source]
add_task_batcher(worker, task_family, batched_args, max_batch_size=inf)[source]
forgive_failures(task_id=None)[source]
add_task(task_id=None, status='PENDING', runnable=True, deps=None, new_deps=None, expl=None, resources=None, priority=0, family='', module=None, params=None, assistant=False, tracking_url=None, worker=None, batchable=None, batch_id=None, retry_policy_dict={}, owners=None, **kwargs)[source]
  • add task identified by task_id if it doesn’t exist
  • if deps is not None, update dependency list
  • update status of task
  • add additional workers/stakeholders
  • update priority when needed
announce_scheduling_failure(task_name, family, params, expl, owners, **kwargs)[source]
add_worker(worker, info, **kwargs)[source]
disable_worker(worker)[source]
set_worker_processes(worker, n)[source]
is_paused()[source]
pause()[source]
unpause()[source]
update_resources(**resources)[source]
update_resource(resource, amount)[source]
count_pending(worker)[source]
get_work(host=None, assistant=False, current_tasks=None, worker=None, **kwargs)[source]
ping(**kwargs)[source]
graph(**kwargs)[source]
dep_graph(task_id, include_done=True, **kwargs)[source]
inverse_dep_graph(task_id, include_done=True, **kwargs)[source]
task_list(status='', upstream_status='', limit=True, search=None, max_shown_tasks=None, **kwargs)[source]

Query for a subset of tasks by status.

worker_list(include_running=True, **kwargs)[source]
resource_list()[source]

Resources usage info and their consumers (tasks).

resources()[source]

get total resources and available ones

Query for a subset of tasks by task_id.

Parameters:task_str
Returns:
re_enable_task(task_id)[source]
fetch_error(task_id, **kwargs)[source]
set_task_status_message(task_id, status_message)[source]
get_task_status_message(task_id)[source]
set_task_progress_percentage(task_id, progress_percentage)[source]
get_task_progress_percentage(task_id)[source]
task_history