luigi.worker module

The worker communicates with the scheduler and does two things:

  1. Sends all tasks that has to be run

  2. Gets tasks from the scheduler that should be run

When running in local mode, the worker talks directly to a Scheduler instance. When you run a central server, the worker will talk to the scheduler using a RemoteScheduler instance.

Everything in this module is private to luigi and may change in incompatible ways between versions. The exception is the exception types and the worker config class.

exception luigi.worker.TaskException[source]

Bases: Exception

class luigi.worker.GetWorkResponse(task_id, running_tasks, n_pending_tasks, n_unique_pending, n_pending_last_scheduled, worker_state)

Bases: tuple

Create new instance of GetWorkResponse(task_id, running_tasks, n_pending_tasks, n_unique_pending, n_pending_last_scheduled, worker_state)

n_pending_last_scheduled

Alias for field number 4

n_pending_tasks

Alias for field number 2

n_unique_pending

Alias for field number 3

running_tasks

Alias for field number 1

task_id

Alias for field number 0

worker_state

Alias for field number 5

class luigi.worker.TaskProcess(task, worker_id, result_queue, status_reporter, use_multiprocessing=False, worker_timeout=0, check_unfulfilled_deps=True, check_complete_on_run=False, task_completion_cache=None)[source]

Bases: Process

Wrap all task execution in this class.

Mainly for convenience since this is run in a separate process.

forward_reporter_attributes = {'decrease_running_resources': 'decrease_running_resources', 'scheduler_messages': 'scheduler_messages', 'update_progress_percentage': 'set_progress_percentage', 'update_status_message': 'set_status_message', 'update_tracking_url': 'set_tracking_url'}
run()[source]

Method to be run in sub-process; can be overridden in sub-class

terminate()[source]

Terminate this process and its subprocesses.

class luigi.worker.ContextManagedTaskProcess(context, *args, **kwargs)[source]

Bases: TaskProcess

run()[source]

Method to be run in sub-process; can be overridden in sub-class

class luigi.worker.TaskStatusReporter(scheduler, task_id, worker_id, scheduler_messages)[source]

Bases: object

Reports task status information to the scheduler.

This object must be pickle-able for passing to TaskProcess on systems where fork method needs to pickle the process object (e.g. Windows).

update_tracking_url(tracking_url)[source]
update_status_message(message)[source]
update_progress_percentage(percentage)[source]
decrease_running_resources(decrease_resources)[source]
class luigi.worker.SchedulerMessage(scheduler, task_id, message_id, content, **payload)[source]

Bases: object

Message object that is build by the the Worker when a message from the scheduler is received and passed to the message queue of a Task.

respond(response)[source]
class luigi.worker.SingleProcessPool[source]

Bases: object

Dummy process pool for using a single processor.

Imitates the api of multiprocessing.Pool using single-processor equivalents.

apply_async(function, args)[source]
close()[source]
join()[source]
class luigi.worker.DequeQueue[source]

Bases: deque

deque wrapper implementing the Queue interface.

put(obj, block=None, timeout=None)[source]
get(block=None, timeout=None)[source]
exception luigi.worker.AsyncCompletionException(trace)[source]

Bases: Exception

Exception indicating that something went wrong with checking complete.

class luigi.worker.TracebackWrapper(trace)[source]

Bases: object

Class to wrap tracebacks so we can know they’re not just strings.

luigi.worker.check_complete_cached(task, completion_cache=None)[source]
luigi.worker.check_complete(task, out_queue, completion_cache=None)[source]

Checks if task is complete, puts the result to out_queue, optionally using the completion cache.

class luigi.worker.worker(*args, **kwargs)[source]

Bases: Config

id = Parameter (defaults to ): Override the auto-generated worker_id
ping_interval = FloatParameter (defaults to 1.0)
keep_alive = BoolParameter (defaults to False)
count_uniques = BoolParameter (defaults to False): worker-count-uniques means that we will keep a worker alive only if it has a unique pending task, as well as having keep-alive true
count_last_scheduled = BoolParameter (defaults to False): Keep a worker alive only if there are pending tasks which it was the last to schedule.
wait_interval = FloatParameter (defaults to 1.0)
wait_jitter = FloatParameter (defaults to 5.0)
max_keep_alive_idle_duration = TimeDeltaParameter (defaults to 0:00:00)
max_reschedules = IntParameter (defaults to 1)
timeout = IntParameter (defaults to 0)
task_limit = IntParameter (defaults to None)
retry_external_tasks = BoolParameter (defaults to False): If true, incomplete external tasks will be retested for completion while Luigi is running.
send_failure_email = BoolParameter (defaults to True): If true, send e-mails directly from the workeron failure
no_install_shutdown_handler = BoolParameter (defaults to False): If true, the SIGUSR1 shutdown handler willNOT be install on the worker
check_unfulfilled_deps = BoolParameter (defaults to True): If true, check for completeness of dependencies before running a task
check_complete_on_run = BoolParameter (defaults to False): If true, only mark tasks as done after running if they are complete. Regardless of this setting, the worker will always check if external tasks are complete before marking them as done.
force_multiprocessing = BoolParameter (defaults to False): If true, use multiprocessing also when running with 1 worker
task_process_context = OptionalParameter (defaults to None): If set to a fully qualified class name, the class will be instantiated with a TaskProcess as its constructor parameter and applied as a context manager around its run() call, so this can be used for obtaining high level customizable monitoring or logging of each individual Task run.
cache_task_completion = BoolParameter (defaults to False): If true, cache the response of successful completion checks of tasks assigned to a worker. This can especially speed up tasks with dynamic dependencies but assumes that the completion status does not change after it was true the first time.
class luigi.worker.KeepAliveThread(scheduler, worker_id, ping_interval, rpc_message_callback)[source]

Bases: Thread

Periodically tell the scheduler that the worker still lives.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

stop()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

luigi.worker.rpc_message_callback(fn)[source]
class luigi.worker.Worker(scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs)[source]

Bases: object

Worker object communicates with a scheduler.

Simple class that talks to a scheduler and:

  • tells the scheduler what it has to do + its dependencies

  • asks for stuff to do (pulls it in a loop and runs it)

add(task, multiprocess=False, processes=0)[source]

Add a Task for the worker to check and possibly schedule and run.

Returns True if task and its dependencies were successfully scheduled or completed before.

handle_interrupt(signum, _)[source]

Stops the assistant from asking for more work on SIGUSR1

run()[source]

Returns True if all scheduled tasks were executed successfully.

set_worker_processes(n)[source]
dispatch_scheduler_message(task_id, message_id, content, **kwargs)[source]