luigi.scheduler module¶
The system for scheduling tasks and executing them in order.
Deals with dependencies, priorities, resources, etc.
The Worker
pulls tasks from the scheduler (usually over the REST interface) and executes them.
See Using the Central Scheduler for more info.
-
luigi.scheduler.
UPSTREAM_SEVERITY_KEY
()¶ T.index(value, [start, [stop]]) -> integer – return first index of value. Raises ValueError if the value is not present.
-
class
luigi.scheduler.
RetryPolicy
(retry_count, disable_hard_timeout, disable_window)¶ Bases:
tuple
Create new instance of RetryPolicy(retry_count, disable_hard_timeout, disable_window)
-
disable_hard_timeout
¶ Alias for field number 1
-
disable_window
¶ Alias for field number 2
-
retry_count
¶ Alias for field number 0
-
-
class
luigi.scheduler.
scheduler
(*args, **kwargs)[source]¶ Bases:
luigi.task.Config
-
retry_delay
= FloatParameter (defaults to 900.0)¶
-
remove_delay
= FloatParameter (defaults to 600.0)¶
-
worker_disconnect_delay
= FloatParameter (defaults to 60.0)¶
-
state_path
= Parameter (defaults to /var/lib/luigi-server/state.pickle)¶
-
batch_emails
= BoolParameter (defaults to False): Send e-mails in batches rather than immediately¶
-
disable_window
= IntParameter (defaults to 3600)¶
-
retry_count
= IntParameter (defaults to 999999999)¶
-
disable_hard_timeout
= IntParameter (defaults to 999999999)¶
-
disable_persist
= IntParameter (defaults to 86400)¶
-
max_shown_tasks
= IntParameter (defaults to 100000)¶
-
max_graph_nodes
= IntParameter (defaults to 100000)¶
-
record_task_history
= BoolParameter (defaults to False)¶
-
prune_on_get_work
= BoolParameter (defaults to False)¶
-
pause_enabled
= BoolParameter (defaults to True)¶
-
send_messages
= BoolParameter (defaults to True)¶
-
metrics_collector
= EnumParameter (defaults to MetricsCollectors.default)¶
-
stable_done_cooldown_secs
= IntParameter (defaults to 10): Sets cooldown period to avoid running the same task twice¶ Sets a cooldown period in seconds after a task was completed, during this period the same task will not accepted by the scheduler.
-
-
class
luigi.scheduler.
Failures
(window)[source]¶ Bases:
object
This class tracks the number of failures in a given time window.
Failures added are marked with the current timestamp, and this class counts the number of failures in a sliding time window ending at the present.
Initialize with the given window.
Parameters: window – how long to track failures for, as a float (number of seconds).
-
class
luigi.scheduler.
OrderedSet
(iterable=None)[source]¶ Bases:
_abcoll.MutableSet
Standard Python OrderedSet recipe found at http://code.activestate.com/recipes/576694/
Modified to include a peek function to get the last element
-
class
luigi.scheduler.
Task
(task_id, status, deps, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, tracking_url=None, status_message=None, progress_percentage=None, retry_policy='notoptional')[source]¶ Bases:
object
-
pretty_id
¶
-
-
class
luigi.scheduler.
Worker
(worker_id, last_active=None)[source]¶ Bases:
object
Structure for tracking worker activity and keeping their references.
-
is_trivial_worker
(state)[source]¶ If it’s not an assistant having only tasks that are without requirements.
We have to pass the state parameter for optimization reasons.
-
assistant
¶
-
enabled
¶
-
state
¶
-
-
class
luigi.scheduler.
SimpleTaskState
(state_path)[source]¶ Bases:
object
Keep track of the current state and handle persistence.
The point of this class is to enable other ways to keep state, eg. by using a database These will be implemented by creating an abstract base class that this and other classes inherit from.
-
class
luigi.scheduler.
Scheduler
(config=None, resources=None, task_history_impl=None, **kwargs)[source]¶ Bases:
object
Async scheduler that can handle multiple workers, etc.
Can be run locally or on a server (using RemoteScheduler + server.Server).
Keyword Arguments: :param config: an object of class “scheduler” or None (in which the global instance will be used) :param resources: a dict of str->int constraints :param task_history_impl: ignore config and use this object as the task history
-
add_task
(task_id=None, status='PENDING', runnable=True, deps=None, new_deps=None, expl=None, resources=None, priority=0, family='', module=None, params=None, param_visibilities=None, accepts_messages=False, assistant=False, tracking_url=None, worker=None, batchable=None, batch_id=None, retry_policy_dict=None, owners=None, **kwargs)[source]¶ - add task identified by task_id if it doesn’t exist
- if deps is not None, update dependency list
- update status of task
- add additional workers/stakeholders
- update priority when needed
-
task_list
(status='', upstream_status='', limit=True, search=None, max_shown_tasks=None, **kwargs)[source]¶ Query for a subset of tasks by status.
-
task_search
(task_str, **kwargs)[source]¶ Query for a subset of tasks by task_id.
Parameters: task_str – Returns:
-
task_history
¶
-