Source code for luigi.worker

# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
The worker communicates with the scheduler and does two things:

1. Sends all tasks that has to be run
2. Gets tasks from the scheduler that should be run

When running in local mode, the worker talks directly to a :py:class:`~luigi.scheduler.Scheduler` instance.
When you run a central server, the worker will talk to the scheduler using a :py:class:`~luigi.rpc.RemoteScheduler` instance.

Everything in this module is private to luigi and may change in incompatible
ways between versions. The exception is the exception types and the
:py:class:`worker` config class.
"""

import collections
import datetime
import getpass
import importlib
import logging
import multiprocessing
import os
import signal
import subprocess
import sys
import contextlib

try:
    import Queue
except ImportError:
    import queue as Queue
import random
import socket
import threading
import time
import traceback
import types

from luigi import six

from luigi import notifications
from luigi.event import Event
from luigi.task_register import load_task
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy
from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED
from luigi.target import Target
from luigi.task import Task, flatten, getpaths, Config
from luigi.task_register import TaskClassException
from luigi.task_status import RUNNING
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter

try:
    import simplejson as json
except ImportError:
    import json

logger = logging.getLogger('luigi-interface')

# Prevent fork() from being called during a C-level getaddrinfo() which uses a process-global mutex,
# that may not be unlocked in child process, resulting in the process being locked indefinitely.
fork_lock = threading.Lock()

# Why we assert on _WAIT_INTERVAL_EPS:
# multiprocessing.Queue.get() is undefined for timeout=0 it seems:
# https://docs.python.org/3.4/library/multiprocessing.html#multiprocessing.Queue.get.
# I also tried with really low epsilon, but then ran into the same issue where
# the test case "test_external_dependency_worker_is_patient" got stuck. So I
# unscientifically just set the final value to a floating point number that
# "worked for me".
_WAIT_INTERVAL_EPS = 0.00001


def _is_external(task):
    return task.run is None or task.run == NotImplemented


def _get_retry_policy_dict(task):
    return RetryPolicy(task.retry_count, task.disable_hard_timeout, task.disable_window_seconds)._asdict()


[docs]class TaskException(Exception): pass
GetWorkResponse = collections.namedtuple('GetWorkResponse', ( 'task_id', 'running_tasks', 'n_pending_tasks', 'n_unique_pending', 'n_pending_last_scheduled', 'worker_state', ))
[docs]class TaskProcess(multiprocessing.Process): """ Wrap all task execution in this class. Mainly for convenience since this is run in a separate process. """ # mapping of status_reporter attributes to task attributes that are added to tasks # before they actually run, and removed afterwards forward_reporter_attributes = { "update_tracking_url": "set_tracking_url", "update_status_message": "set_status_message", "update_progress_percentage": "set_progress_percentage", "decrease_running_resources": "decrease_running_resources", "scheduler_messages": "scheduler_messages", } def __init__(self, task, worker_id, result_queue, status_reporter, use_multiprocessing=False, worker_timeout=0, check_unfulfilled_deps=True, check_complete_on_run=False): super(TaskProcess, self).__init__() self.task = task self.worker_id = worker_id self.result_queue = result_queue self.status_reporter = status_reporter self.worker_timeout = task.worker_timeout or worker_timeout self.timeout_time = time.time() + self.worker_timeout if self.worker_timeout else None self.use_multiprocessing = use_multiprocessing or self.timeout_time is not None self.check_unfulfilled_deps = check_unfulfilled_deps self.check_complete_on_run = check_complete_on_run def _run_get_new_deps(self): task_gen = self.task.run() if not isinstance(task_gen, types.GeneratorType): return None next_send = None while True: try: if next_send is None: requires = six.next(task_gen) else: requires = task_gen.send(next_send) except StopIteration: return None new_req = flatten(requires) if all(t.complete() for t in new_req): next_send = getpaths(requires) else: new_deps = [(t.task_module, t.task_family, t.to_str_params()) for t in new_req] return new_deps
[docs] def run(self): logger.info('[pid %s] Worker %s running %s', os.getpid(), self.worker_id, self.task) if self.use_multiprocessing: # Need to have different random seeds if running in separate processes random.seed((os.getpid(), time.time())) status = FAILED expl = '' missing = [] new_deps = [] try: # Verify that all the tasks are fulfilled! For external tasks we # don't care about unfulfilled dependencies, because we are just # checking completeness of self.task so outputs of dependencies are # irrelevant. if self.check_unfulfilled_deps and not _is_external(self.task): missing = [dep.task_id for dep in self.task.deps() if not dep.complete()] if missing: deps = 'dependency' if len(missing) == 1 else 'dependencies' raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing))) self.task.trigger_event(Event.START, self.task) t0 = time.time() status = None if _is_external(self.task): # External task if self.task.complete(): status = DONE else: status = FAILED expl = 'Task is an external data dependency ' \ 'and data does not exist (yet?).' else: with self._forward_attributes(): new_deps = self._run_get_new_deps() if not new_deps: if not self.check_complete_on_run or self.task.complete(): status = DONE else: raise TaskException("Task finished running, but complete() is still returning false.") else: status = PENDING if new_deps: logger.info( '[pid %s] Worker %s new requirements %s', os.getpid(), self.worker_id, self.task) elif status == DONE: self.task.trigger_event( Event.PROCESSING_TIME, self.task, time.time() - t0) expl = self.task.on_success() logger.info('[pid %s] Worker %s done %s', os.getpid(), self.worker_id, self.task) self.task.trigger_event(Event.SUCCESS, self.task) except KeyboardInterrupt: raise except BaseException as ex: status = FAILED expl = self._handle_run_exception(ex) finally: self.result_queue.put( (self.task.task_id, status, expl, missing, new_deps))
def _handle_run_exception(self, ex): logger.exception("[pid %s] Worker %s failed %s", os.getpid(), self.worker_id, self.task) self.task.trigger_event(Event.FAILURE, self.task, ex) return self.task.on_failure(ex) def _recursive_terminate(self): import psutil try: parent = psutil.Process(self.pid) children = parent.children(recursive=True) # terminate parent. Give it a chance to clean up super(TaskProcess, self).terminate() parent.wait() # terminate children for child in children: try: child.terminate() except psutil.NoSuchProcess: continue except psutil.NoSuchProcess: return
[docs] def terminate(self): """Terminate this process and its subprocesses.""" # default terminate() doesn't cleanup child processes, it orphans them. try: return self._recursive_terminate() except ImportError: return super(TaskProcess, self).terminate()
@contextlib.contextmanager def _forward_attributes(self): # forward configured attributes to the task for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): setattr(self.task, task_attr, getattr(self.status_reporter, reporter_attr)) try: yield self finally: # reset attributes again for reporter_attr, task_attr in six.iteritems(self.forward_reporter_attributes): setattr(self.task, task_attr, None)
# This code and the task_process_context config key currently feels a bit ad-hoc. # Discussion on generalizing it into a plugin system: https://github.com/spotify/luigi/issues/1897
[docs]class ContextManagedTaskProcess(TaskProcess): def __init__(self, context, *args, **kwargs): super(ContextManagedTaskProcess, self).__init__(*args, **kwargs) self.context = context
[docs] def run(self): if self.context: logger.debug('Importing module and instantiating ' + self.context) module_path, class_name = self.context.rsplit('.', 1) module = importlib.import_module(module_path) cls = getattr(module, class_name) with cls(self): super(ContextManagedTaskProcess, self).run() else: super(ContextManagedTaskProcess, self).run()
[docs]class TaskStatusReporter(object): """ Reports task status information to the scheduler. This object must be pickle-able for passing to `TaskProcess` on systems where fork method needs to pickle the process object (e.g. Windows). """ def __init__(self, scheduler, task_id, worker_id, scheduler_messages): self._task_id = task_id self._worker_id = worker_id self._scheduler = scheduler self.scheduler_messages = scheduler_messages
[docs] def update_tracking_url(self, tracking_url): self._scheduler.add_task( task_id=self._task_id, worker=self._worker_id, status=RUNNING, tracking_url=tracking_url )
[docs] def update_status_message(self, message): self._scheduler.set_task_status_message(self._task_id, message)
[docs] def update_progress_percentage(self, percentage): self._scheduler.set_task_progress_percentage(self._task_id, percentage)
[docs] def decrease_running_resources(self, decrease_resources): self._scheduler.decrease_running_task_resources(self._task_id, decrease_resources)
[docs]class SchedulerMessage(object): """ Message object that is build by the the :py:class:`Worker` when a message from the scheduler is received and passed to the message queue of a :py:class:`Task`. """ def __init__(self, scheduler, task_id, message_id, content, **payload): super(SchedulerMessage, self).__init__() self._scheduler = scheduler self._task_id = task_id self._message_id = message_id self.content = content self.payload = payload def __str__(self): return str(self.content) def __eq__(self, other): return self.content == other
[docs] def respond(self, response): self._scheduler.add_scheduler_message_response(self._task_id, self._message_id, response)
[docs]class SingleProcessPool(object): """ Dummy process pool for using a single processor. Imitates the api of multiprocessing.Pool using single-processor equivalents. """
[docs] def apply_async(self, function, args): return function(*args)
[docs] def close(self): pass
[docs] def join(self): pass
[docs]class DequeQueue(collections.deque): """ deque wrapper implementing the Queue interface. """
[docs] def put(self, obj, block=None, timeout=None): return self.append(obj)
[docs] def get(self, block=None, timeout=None): try: return self.pop() except IndexError: raise Queue.Empty
[docs]class AsyncCompletionException(Exception): """ Exception indicating that something went wrong with checking complete. """ def __init__(self, trace): self.trace = trace
[docs]class TracebackWrapper(object): """ Class to wrap tracebacks so we can know they're not just strings. """ def __init__(self, trace): self.trace = trace
[docs]def check_complete(task, out_queue): """ Checks if task is complete, puts the result to out_queue. """ logger.debug("Checking if %s is complete", task) try: is_complete = task.complete() except Exception: is_complete = TracebackWrapper(traceback.format_exc()) out_queue.put((task, is_complete))
[docs]class worker(Config): # NOTE: `section.config-variable` in the config_path argument is deprecated in favor of `worker.config_variable` id = Parameter(default='', description='Override the auto-generated worker_id') ping_interval = FloatParameter(default=1.0, config_path=dict(section='core', name='worker-ping-interval')) keep_alive = BoolParameter(default=False, config_path=dict(section='core', name='worker-keep-alive')) count_uniques = BoolParameter(default=False, config_path=dict(section='core', name='worker-count-uniques'), description='worker-count-uniques means that we will keep a ' 'worker alive only if it has a unique pending task, as ' 'well as having keep-alive true') count_last_scheduled = BoolParameter(default=False, description='Keep a worker alive only if there are ' 'pending tasks which it was the last to ' 'schedule.') wait_interval = FloatParameter(default=1.0, config_path=dict(section='core', name='worker-wait-interval')) wait_jitter = FloatParameter(default=5.0) max_keep_alive_idle_duration = TimeDeltaParameter(default=datetime.timedelta(0)) max_reschedules = IntParameter(default=1, config_path=dict(section='core', name='worker-max-reschedules')) timeout = IntParameter(default=0, config_path=dict(section='core', name='worker-timeout')) task_limit = IntParameter(default=None, config_path=dict(section='core', name='worker-task-limit')) retry_external_tasks = BoolParameter(default=False, config_path=dict(section='core', name='retry-external-tasks'), description='If true, incomplete external tasks will be ' 'retested for completion while Luigi is running.') send_failure_email = BoolParameter(default=True, description='If true, send e-mails directly from the worker' 'on failure') no_install_shutdown_handler = BoolParameter(default=False, description='If true, the SIGUSR1 shutdown handler will' 'NOT be install on the worker') check_unfulfilled_deps = BoolParameter(default=True, description='If true, check for completeness of ' 'dependencies before running a task') check_complete_on_run = BoolParameter(default=False, description='If true, only mark tasks as done after running if they are complete. ' 'Regardless of this setting, the worker will always check if external ' 'tasks are complete before marking them as done.') force_multiprocessing = BoolParameter(default=False, description='If true, use multiprocessing also when ' 'running with 1 worker') task_process_context = OptionalParameter(default=None, description='If set to a fully qualified class name, the class will ' 'be instantiated with a TaskProcess as its constructor parameter and ' 'applied as a context manager around its run() call, so this can be ' 'used for obtaining high level customizable monitoring or logging of ' 'each individual Task run.')
[docs]class KeepAliveThread(threading.Thread): """ Periodically tell the scheduler that the worker still lives. """ def __init__(self, scheduler, worker_id, ping_interval, rpc_message_callback): super(KeepAliveThread, self).__init__() self._should_stop = threading.Event() self._scheduler = scheduler self._worker_id = worker_id self._ping_interval = ping_interval self._rpc_message_callback = rpc_message_callback
[docs] def stop(self): self._should_stop.set()
[docs] def run(self): while True: self._should_stop.wait(self._ping_interval) if self._should_stop.is_set(): logger.info("Worker %s was stopped. Shutting down Keep-Alive thread" % self._worker_id) break with fork_lock: response = None try: response = self._scheduler.ping(worker=self._worker_id) except BaseException: # httplib.BadStatusLine: logger.warning('Failed pinging scheduler') # handle rpc messages if response: for message in response["rpc_messages"]: self._rpc_message_callback(message)
[docs]def rpc_message_callback(fn): fn.is_rpc_message_callback = True return fn
[docs]class Worker(object): """ Worker object communicates with a scheduler. Simple class that talks to a scheduler and: * tells the scheduler what it has to do + its dependencies * asks for stuff to do (pulls it in a loop and runs it) """ def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs): if scheduler is None: scheduler = Scheduler() self.worker_processes = int(worker_processes) self._worker_info = self._generate_worker_info() self._config = worker(**kwargs) worker_id = worker_id or self._config.id or self._generate_worker_id(self._worker_info) assert self._config.wait_interval >= _WAIT_INTERVAL_EPS, "[worker] wait_interval must be positive" assert self._config.wait_jitter >= 0.0, "[worker] wait_jitter must be equal or greater than zero" self._id = worker_id self._scheduler = scheduler self._assistant = assistant self._stop_requesting_work = False self.host = socket.gethostname() self._scheduled_tasks = {} self._suspended_tasks = {} self._batch_running_tasks = {} self._batch_families_sent = set() self._first_task = None self.add_succeeded = True self.run_succeeded = True self.unfulfilled_counts = collections.defaultdict(int) # note that ``signal.signal(signal.SIGUSR1, fn)`` only works inside the main execution thread, which is why we # provide the ability to conditionally install the hook. if not self._config.no_install_shutdown_handler: try: signal.signal(signal.SIGUSR1, self.handle_interrupt) signal.siginterrupt(signal.SIGUSR1, False) except AttributeError: pass # Keep info about what tasks are running (could be in other processes) self._task_result_queue = multiprocessing.Queue() self._running_tasks = {} self._idle_since = None # Stuff for execution_summary self._add_task_history = [] self._get_work_response_history = [] def _add_task(self, *args, **kwargs): """ Call ``self._scheduler.add_task``, but store the values too so we can implement :py:func:`luigi.execution_summary.summary`. """ task_id = kwargs['task_id'] status = kwargs['status'] runnable = kwargs['runnable'] task = self._scheduled_tasks.get(task_id) if task: self._add_task_history.append((task, status, runnable)) kwargs['owners'] = task._owner_list() if task_id in self._batch_running_tasks: for batch_task in self._batch_running_tasks.pop(task_id): self._add_task_history.append((batch_task, status, True)) if task and kwargs.get('params'): kwargs['param_visibilities'] = task._get_param_visibilities() self._scheduler.add_task(*args, **kwargs) logger.info('Informed scheduler that task %s has status %s', task_id, status) def __enter__(self): """ Start the KeepAliveThread. """ self._keep_alive_thread = KeepAliveThread(self._scheduler, self._id, self._config.ping_interval, self._handle_rpc_message) self._keep_alive_thread.daemon = True self._keep_alive_thread.start() return self def __exit__(self, type, value, traceback): """ Stop the KeepAliveThread and kill still running tasks. """ self._keep_alive_thread.stop() self._keep_alive_thread.join() for task in self._running_tasks.values(): if task.is_alive(): task.terminate() return False # Don't suppress exception def _generate_worker_info(self): # Generate as much info as possible about the worker # Some of these calls might not be available on all OS's args = [('salt', '%09d' % random.randrange(0, 999999999)), ('workers', self.worker_processes)] try: args += [('host', socket.gethostname())] except BaseException: pass try: args += [('username', getpass.getuser())] except BaseException: pass try: args += [('pid', os.getpid())] except BaseException: pass try: sudo_user = os.getenv("SUDO_USER") if sudo_user: args.append(('sudo_user', sudo_user)) except BaseException: pass return args def _generate_worker_id(self, worker_info): worker_info_str = ', '.join(['{}={}'.format(k, v) for k, v in worker_info]) return 'Worker({})'.format(worker_info_str) def _validate_task(self, task): if not isinstance(task, Task): raise TaskException('Can not schedule non-task %s' % task) if not task.initialized(): # we can't get the repr of it since it's not initialized... raise TaskException('Task of class %s not initialized. Did you override __init__ and forget to call super(...).__init__?' % task.__class__.__name__) def _log_complete_error(self, task, tb): log_msg = "Will not run {task} or any dependencies due to error in complete() method:\n{tb}".format(task=task, tb=tb) logger.warning(log_msg) def _log_dependency_error(self, task, tb): log_msg = "Will not run {task} or any dependencies due to error in deps() method:\n{tb}".format(task=task, tb=tb) logger.warning(log_msg) def _log_unexpected_error(self, task): logger.exception("Luigi unexpected framework error while scheduling %s", task) # needs to be called from within except clause def _announce_scheduling_failure(self, task, expl): try: self._scheduler.announce_scheduling_failure( worker=self._id, task_name=str(task), family=task.task_family, params=task.to_str_params(only_significant=True), expl=expl, owners=task._owner_list(), ) except Exception: formatted_traceback = traceback.format_exc() self._email_unexpected_error(task, formatted_traceback) raise def _email_complete_error(self, task, formatted_traceback): self._announce_scheduling_failure(task, formatted_traceback) if self._config.send_failure_email: self._email_error(task, formatted_traceback, subject="Luigi: {task} failed scheduling. Host: {host}", headline="Will not run {task} or any dependencies due to error in complete() method", ) def _email_dependency_error(self, task, formatted_traceback): self._announce_scheduling_failure(task, formatted_traceback) if self._config.send_failure_email: self._email_error(task, formatted_traceback, subject="Luigi: {task} failed scheduling. Host: {host}", headline="Will not run {task} or any dependencies due to error in deps() method", ) def _email_unexpected_error(self, task, formatted_traceback): # this sends even if failure e-mails are disabled, as they may indicate # a more severe failure that may not reach other alerting methods such # as scheduler batch notification self._email_error(task, formatted_traceback, subject="Luigi: Framework error while scheduling {task}. Host: {host}", headline="Luigi framework error", ) def _email_task_failure(self, task, formatted_traceback): if self._config.send_failure_email: self._email_error(task, formatted_traceback, subject="Luigi: {task} FAILED. Host: {host}", headline="A task failed when running. Most likely run() raised an exception.", ) def _email_error(self, task, formatted_traceback, subject, headline): formatted_subject = subject.format(task=task, host=self.host) formatted_headline = headline.format(task=task, host=self.host) command = subprocess.list2cmdline(sys.argv) message = notifications.format_task_error( formatted_headline, task, command, formatted_traceback) notifications.send_error_email(formatted_subject, message, task.owner_email) def _handle_task_load_error(self, exception, task_ids): msg = 'Cannot find task(s) sent by scheduler: {}'.format(','.join(task_ids)) logger.exception(msg) subject = 'Luigi: {}'.format(msg) error_message = notifications.wrap_traceback(exception) for task_id in task_ids: self._add_task( worker=self._id, task_id=task_id, status=FAILED, runnable=False, expl=error_message, ) notifications.send_error_email(subject, error_message)
[docs] def add(self, task, multiprocess=False, processes=0): """ Add a Task for the worker to check and possibly schedule and run. Returns True if task and its dependencies were successfully scheduled or completed before. """ if self._first_task is None and hasattr(task, 'task_id'): self._first_task = task.task_id self.add_succeeded = True if multiprocess: queue = multiprocessing.Manager().Queue() pool = multiprocessing.Pool(processes=processes if processes > 0 else None) else: queue = DequeQueue() pool = SingleProcessPool() self._validate_task(task) pool.apply_async(check_complete, [task, queue]) # we track queue size ourselves because len(queue) won't work for multiprocessing queue_size = 1 try: seen = {task.task_id} while queue_size: current = queue.get() queue_size -= 1 item, is_complete = current for next in self._add(item, is_complete): if next.task_id not in seen: self._validate_task(next) seen.add(next.task_id) pool.apply_async(check_complete, [next, queue]) queue_size += 1 except (KeyboardInterrupt, TaskException): raise except Exception as ex: self.add_succeeded = False formatted_traceback = traceback.format_exc() self._log_unexpected_error(task) task.trigger_event(Event.BROKEN_TASK, task, ex) self._email_unexpected_error(task, formatted_traceback) raise finally: pool.close() pool.join() return self.add_succeeded
def _add_task_batcher(self, task): family = task.task_family if family not in self._batch_families_sent: task_class = type(task) batch_param_names = task_class.batch_param_names() if batch_param_names: self._scheduler.add_task_batcher( worker=self._id, task_family=family, batched_args=batch_param_names, max_batch_size=task.max_batch_size, ) self._batch_families_sent.add(family) def _add(self, task, is_complete): if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit: logger.warning('Will not run %s or any dependencies due to exceeded task-limit of %d', task, self._config.task_limit) deps = None status = UNKNOWN runnable = False else: formatted_traceback = None try: self._check_complete_value(is_complete) except KeyboardInterrupt: raise except AsyncCompletionException as ex: formatted_traceback = ex.trace except BaseException: formatted_traceback = traceback.format_exc() if formatted_traceback is not None: self.add_succeeded = False self._log_complete_error(task, formatted_traceback) task.trigger_event(Event.DEPENDENCY_MISSING, task) self._email_complete_error(task, formatted_traceback) deps = None status = UNKNOWN runnable = False elif is_complete: deps = None status = DONE runnable = False task.trigger_event(Event.DEPENDENCY_PRESENT, task) elif _is_external(task): deps = None status = PENDING runnable = self._config.retry_external_tasks task.trigger_event(Event.DEPENDENCY_MISSING, task) logger.warning('Data for %s does not exist (yet?). The task is an ' 'external data dependency, so it cannot be run from' ' this luigi process.', task) else: try: deps = task.deps() self._add_task_batcher(task) except Exception as ex: formatted_traceback = traceback.format_exc() self.add_succeeded = False self._log_dependency_error(task, formatted_traceback) task.trigger_event(Event.BROKEN_TASK, task, ex) self._email_dependency_error(task, formatted_traceback) deps = None status = UNKNOWN runnable = False else: status = PENDING runnable = True if task.disabled: status = DISABLED if deps: for d in deps: self._validate_dependency(d) task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) yield d # return additional tasks to add deps = [d.task_id for d in deps] self._scheduled_tasks[task.task_id] = task self._add_task( worker=self._id, task_id=task.task_id, status=status, deps=deps, runnable=runnable, priority=task.priority, resources=task.process_resources(), params=task.to_str_params(), family=task.task_family, module=task.task_module, batchable=task.batchable, retry_policy_dict=_get_retry_policy_dict(task), accepts_messages=task.accepts_messages, ) def _validate_dependency(self, dependency): if isinstance(dependency, Target): raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class') elif not isinstance(dependency, Task): raise Exception('requires() must return Task objects but {} is a {}'.format(dependency, type(dependency))) def _check_complete_value(self, is_complete): if is_complete not in (True, False): if isinstance(is_complete, TracebackWrapper): raise AsyncCompletionException(is_complete.trace) raise Exception("Return value of Task.complete() must be boolean (was %r)" % is_complete) def _add_worker(self): self._worker_info.append(('first_task', self._first_task)) self._scheduler.add_worker(self._id, self._worker_info) def _log_remote_tasks(self, get_work_response): logger.debug("Done") logger.debug("There are no more tasks to run at this time") if get_work_response.running_tasks: for r in get_work_response.running_tasks: logger.debug('%s is currently run by worker %s', r['task_id'], r['worker']) elif get_work_response.n_pending_tasks: logger.debug( "There are %s pending tasks possibly being run by other workers", get_work_response.n_pending_tasks) if get_work_response.n_unique_pending: logger.debug( "There are %i pending tasks unique to this worker", get_work_response.n_unique_pending) if get_work_response.n_pending_last_scheduled: logger.debug( "There are %i pending tasks last scheduled by this worker", get_work_response.n_pending_last_scheduled) def _get_work_task_id(self, get_work_response): if get_work_response.get('task_id') is not None: return get_work_response['task_id'] elif 'batch_id' in get_work_response: try: task = load_task( module=get_work_response.get('task_module'), task_name=get_work_response['task_family'], params_str=get_work_response['task_params'], ) except Exception as ex: self._handle_task_load_error(ex, get_work_response['batch_task_ids']) self.run_succeeded = False return None self._scheduler.add_task( worker=self._id, task_id=task.task_id, module=get_work_response.get('task_module'), family=get_work_response['task_family'], params=task.to_str_params(), status=RUNNING, batch_id=get_work_response['batch_id'], ) return task.task_id else: return None def _get_work(self): if self._stop_requesting_work: return GetWorkResponse(None, 0, 0, 0, 0, WORKER_STATE_DISABLED) if self.worker_processes > 0: logger.debug("Asking scheduler for work...") r = self._scheduler.get_work( worker=self._id, host=self.host, assistant=self._assistant, current_tasks=list(self._running_tasks.keys()), ) else: logger.debug("Checking if tasks are still pending") r = self._scheduler.count_pending(worker=self._id) running_tasks = r['running_tasks'] task_id = self._get_work_task_id(r) self._get_work_response_history.append({ 'task_id': task_id, 'running_tasks': running_tasks, }) if task_id is not None and task_id not in self._scheduled_tasks: logger.info('Did not schedule %s, will load it dynamically', task_id) try: # TODO: we should obtain the module name from the server! self._scheduled_tasks[task_id] = \ load_task(module=r.get('task_module'), task_name=r['task_family'], params_str=r['task_params']) except TaskClassException as ex: self._handle_task_load_error(ex, [task_id]) task_id = None self.run_succeeded = False if task_id is not None and 'batch_task_ids' in r: batch_tasks = filter(None, [ self._scheduled_tasks.get(batch_id) for batch_id in r['batch_task_ids']]) self._batch_running_tasks[task_id] = batch_tasks return GetWorkResponse( task_id=task_id, running_tasks=running_tasks, n_pending_tasks=r['n_pending_tasks'], n_unique_pending=r['n_unique_pending'], # TODO: For a tiny amount of time (a month?) we'll keep forwards compatibility # That is you can user a newer client than server (Sep 2016) n_pending_last_scheduled=r.get('n_pending_last_scheduled', 0), worker_state=r.get('worker_state', WORKER_STATE_ACTIVE), ) def _run_task(self, task_id): if task_id in self._running_tasks: logger.debug('Got already running task id {} from scheduler, taking a break'.format(task_id)) next(self._sleeper()) return task = self._scheduled_tasks[task_id] task_process = self._create_task_process(task) self._running_tasks[task_id] = task_process if task_process.use_multiprocessing: with fork_lock: task_process.start() else: # Run in the same process task_process.run() def _create_task_process(self, task): message_queue = multiprocessing.Queue() if task.accepts_messages else None reporter = TaskStatusReporter(self._scheduler, task.task_id, self._id, message_queue) use_multiprocessing = self._config.force_multiprocessing or bool(self.worker_processes > 1) return ContextManagedTaskProcess( self._config.task_process_context, task, self._id, self._task_result_queue, reporter, use_multiprocessing=use_multiprocessing, worker_timeout=self._config.timeout, check_unfulfilled_deps=self._config.check_unfulfilled_deps, check_complete_on_run=self._config.check_complete_on_run, ) def _purge_children(self): """ Find dead children and put a response on the result queue. :return: """ for task_id, p in six.iteritems(self._running_tasks): if not p.is_alive() and p.exitcode: error_msg = 'Task {} died unexpectedly with exit code {}'.format(task_id, p.exitcode) p.task.trigger_event(Event.PROCESS_FAILURE, p.task, error_msg) elif p.timeout_time is not None and time.time() > float(p.timeout_time) and p.is_alive(): p.terminate() error_msg = 'Task {} timed out after {} seconds and was terminated.'.format(task_id, p.worker_timeout) p.task.trigger_event(Event.TIMEOUT, p.task, error_msg) else: continue logger.info(error_msg) self._task_result_queue.put((task_id, FAILED, error_msg, [], [])) def _handle_next_task(self): """ We have to catch three ways a task can be "done": 1. normal execution: the task runs/fails and puts a result back on the queue, 2. new dependencies: the task yielded new deps that were not complete and will be rescheduled and dependencies added, 3. child process dies: we need to catch this separately. """ self._idle_since = None while True: self._purge_children() # Deal with subprocess failures try: task_id, status, expl, missing, new_requirements = ( self._task_result_queue.get( timeout=self._config.wait_interval)) except Queue.Empty: return task = self._scheduled_tasks[task_id] if not task or task_id not in self._running_tasks: continue # Not a running task. Probably already removed. # Maybe it yielded something? # external task if run not implemented, retry-able if config option is enabled. external_task_retryable = _is_external(task) and self._config.retry_external_tasks if status == FAILED and not external_task_retryable: self._email_task_failure(task, expl) new_deps = [] if new_requirements: new_req = [load_task(module, name, params) for module, name, params in new_requirements] for t in new_req: self.add(t) new_deps = [t.task_id for t in new_req] self._add_task(worker=self._id, task_id=task_id, status=status, expl=json.dumps(expl), resources=task.process_resources(), runnable=None, params=task.to_str_params(), family=task.task_family, module=task.task_module, new_deps=new_deps, assistant=self._assistant, retry_policy_dict=_get_retry_policy_dict(task)) self._running_tasks.pop(task_id) # re-add task to reschedule missing dependencies if missing: reschedule = True # keep out of infinite loops by not rescheduling too many times for task_id in missing: self.unfulfilled_counts[task_id] += 1 if (self.unfulfilled_counts[task_id] > self._config.max_reschedules): reschedule = False if reschedule: self.add(task) self.run_succeeded &= (status == DONE) or (len(new_deps) > 0) return def _sleeper(self): # TODO is exponential backoff necessary? while True: jitter = self._config.wait_jitter wait_interval = self._config.wait_interval + random.uniform(0, jitter) logger.debug('Sleeping for %f seconds', wait_interval) time.sleep(wait_interval) yield def _keep_alive(self, get_work_response): """ Returns true if a worker should stay alive given. If worker-keep-alive is not set, this will always return false. For an assistant, it will always return the value of worker-keep-alive. Otherwise, it will return true for nonzero n_pending_tasks. If worker-count-uniques is true, it will also require that one of the tasks is unique to this worker. """ if not self._config.keep_alive: return False elif self._assistant: return True elif self._config.count_last_scheduled: return get_work_response.n_pending_last_scheduled > 0 elif self._config.count_uniques: return get_work_response.n_unique_pending > 0 elif get_work_response.n_pending_tasks == 0: return False elif not self._config.max_keep_alive_idle_duration: return True elif not self._idle_since: return True else: time_to_shutdown = self._idle_since + self._config.max_keep_alive_idle_duration - datetime.datetime.now() logger.debug("[%s] %s until shutdown", self._id, time_to_shutdown) return time_to_shutdown > datetime.timedelta(0)
[docs] def handle_interrupt(self, signum, _): """ Stops the assistant from asking for more work on SIGUSR1 """ if signum == signal.SIGUSR1: self._start_phasing_out()
def _start_phasing_out(self): """ Go into a mode where we dont ask for more work and quit once existing tasks are done. """ self._config.keep_alive = False self._stop_requesting_work = True
[docs] def run(self): """ Returns True if all scheduled tasks were executed successfully. """ logger.info('Running Worker with %d processes', self.worker_processes) sleeper = self._sleeper() self.run_succeeded = True self._add_worker() while True: while len(self._running_tasks) >= self.worker_processes > 0: logger.debug('%d running tasks, waiting for next task to finish', len(self._running_tasks)) self._handle_next_task() get_work_response = self._get_work() if get_work_response.worker_state == WORKER_STATE_DISABLED: self._start_phasing_out() if get_work_response.task_id is None: if not self._stop_requesting_work: self._log_remote_tasks(get_work_response) if len(self._running_tasks) == 0: self._idle_since = self._idle_since or datetime.datetime.now() if self._keep_alive(get_work_response): six.next(sleeper) continue else: break else: self._handle_next_task() continue # task_id is not None: logger.debug("Pending tasks: %s", get_work_response.n_pending_tasks) self._run_task(get_work_response.task_id) while len(self._running_tasks): logger.debug('Shut down Worker, %d more tasks to go', len(self._running_tasks)) self._handle_next_task() return self.run_succeeded
def _handle_rpc_message(self, message): logger.info("Worker %s got message %s" % (self._id, message)) # the message is a dict {'name': <function_name>, 'kwargs': <function_kwargs>} name = message['name'] kwargs = message['kwargs'] # find the function and check if it's callable and configured to work # as a message callback func = getattr(self, name, None) tpl = (self._id, name) if not callable(func): logger.error("Worker %s has no function '%s'" % tpl) elif not getattr(func, "is_rpc_message_callback", False): logger.error("Worker %s function '%s' is not available as rpc message callback" % tpl) else: logger.info("Worker %s successfully dispatched rpc message to function '%s'" % tpl) func(**kwargs)
[docs] @rpc_message_callback def set_worker_processes(self, n): # set the new value self.worker_processes = max(1, n) # tell the scheduler self._scheduler.add_worker(self._id, {'workers': self.worker_processes})
[docs] @rpc_message_callback def dispatch_scheduler_message(self, task_id, message_id, content, **kwargs): task_id = str(task_id) if task_id in self._running_tasks: task_process = self._running_tasks[task_id] if task_process.status_reporter.scheduler_messages: message = SchedulerMessage(self._scheduler, task_id, message_id, content, **kwargs) task_process.status_reporter.scheduler_messages.put(message)