luigi.worker module

The worker communicates with the scheduler and does two things:

  1. Sends all tasks that has to be run
  2. Gets tasks from the scheduler that should be run

When running in local mode, the worker talks directly to a Scheduler instance. When you run a central server, the worker will talk to the scheduler using a RemoteScheduler instance.

Everything in this module is private to luigi and may change in incompatible ways between versions. The exception is the exception types and the worker config class.

exception luigi.worker.TaskException[source]

Bases: exceptions.Exception

class luigi.worker.GetWorkResponse(task_id, running_tasks, n_pending_tasks, n_unique_pending, n_pending_last_scheduled, worker_state)

Bases: tuple

Create new instance of GetWorkResponse(task_id, running_tasks, n_pending_tasks, n_unique_pending, n_pending_last_scheduled, worker_state)


Alias for field number 4


Alias for field number 2


Alias for field number 3


Alias for field number 1


Alias for field number 0


Alias for field number 5

class luigi.worker.TaskProcess(task, worker_id, result_queue, status_reporter, use_multiprocessing=False, worker_timeout=0, check_unfulfilled_deps=True)[source]

Bases: multiprocessing.process.Process

Wrap all task execution in this class.

Mainly for convenience since this is run in a separate process.

forward_reporter_attributes = {'decrease_running_resources': 'decrease_running_resources', 'scheduler_messages': 'scheduler_messages', 'update_progress_percentage': 'set_progress_percentage', 'update_status_message': 'set_status_message', 'update_tracking_url': 'set_tracking_url'}

Method to be run in sub-process; can be overridden in sub-class


Terminate this process and its subprocesses.

class luigi.worker.ContextManagedTaskProcess(context, *args, **kwargs)[source]

Bases: luigi.worker.TaskProcess


Method to be run in sub-process; can be overridden in sub-class

class luigi.worker.TaskStatusReporter(scheduler, task_id, worker_id, scheduler_messages)[source]

Bases: object

Reports task status information to the scheduler.

This object must be pickle-able for passing to TaskProcess on systems where fork method needs to pickle the process object (e.g. Windows).

class luigi.worker.SchedulerMessage(scheduler, task_id, message_id, content, **payload)[source]

Bases: object

Message object that is build by the the Worker when a message from the scheduler is received and passed to the message queue of a Task.

class luigi.worker.SingleProcessPool[source]

Bases: object

Dummy process pool for using a single processor.

Imitates the api of multiprocessing.Pool using single-processor equivalents.

apply_async(function, args)[source]
class luigi.worker.DequeQueue[source]

Bases: collections.deque

deque wrapper implementing the Queue interface.

put(obj, block=None, timeout=None)[source]
get(block=None, timeout=None)[source]
exception luigi.worker.AsyncCompletionException(trace)[source]

Bases: exceptions.Exception

Exception indicating that something went wrong with checking complete.

class luigi.worker.TracebackWrapper(trace)[source]

Bases: object

Class to wrap tracebacks so we can know they’re not just strings.

luigi.worker.check_complete(task, out_queue)[source]

Checks if task is complete, puts the result to out_queue.

class luigi.worker.worker(*args, **kwargs)[source]

Bases: luigi.task.Config

ping_interval = FloatParameter (defaults to 1.0)
keep_alive = BoolParameter (defaults to False)
count_uniques = BoolParameter (defaults to False): worker-count-uniques means that we will keep a worker alive only if it has a unique pending task, as well as having keep-alive true
count_last_scheduled = BoolParameter (defaults to False): Keep a worker alive only if there are pending tasks which it was the last to schedule.
wait_interval = FloatParameter (defaults to 1.0)
wait_jitter = FloatParameter (defaults to 5.0)
max_reschedules = IntParameter (defaults to 1)
timeout = IntParameter (defaults to 0)
task_limit = IntParameter (defaults to None)
retry_external_tasks = BoolParameter (defaults to False): If true, incomplete external tasks will be retested for completion while Luigi is running.
send_failure_email = BoolParameter (defaults to True): If true, send e-mails directly from the workeron failure
no_install_shutdown_handler = BoolParameter (defaults to False): If true, the SIGUSR1 shutdown handler willNOT be install on the worker
check_unfulfilled_deps = BoolParameter (defaults to True): If true, check for completeness of dependencies before running a task
force_multiprocessing = BoolParameter (defaults to False): If true, use multiprocessing also when running with 1 worker
task_process_context = Parameter (defaults to None): If set to a fully qualified class name, the class will be instantiated with a TaskProcess as its constructor parameter and applied as a context manager around its run() call, so this can be used for obtaining high level customizable monitoring or logging of each individual Task run.
class luigi.worker.KeepAliveThread(scheduler, worker_id, ping_interval, rpc_message_callback)[source]

Bases: threading.Thread

Periodically tell the scheduler that the worker still lives.


Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class luigi.worker.Worker(scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs)[source]

Bases: object

Worker object communicates with a scheduler.

Simple class that talks to a scheduler and:

  • tells the scheduler what it has to do + its dependencies
  • asks for stuff to do (pulls it in a loop and runs it)
add(task, multiprocess=False, processes=0)[source]

Add a Task for the worker to check and possibly schedule and run.

Returns True if task and its dependencies were successfully scheduled or completed before.

handle_interrupt(signum, _)[source]

Stops the assistant from asking for more work on SIGUSR1


Returns True if all scheduled tasks were executed successfully.

dispatch_scheduler_message(task_id, message_id, content, **kwargs)[source]