Template tasks for running external programs as luigi tasks.
This module is primarily intended for when you need to call a single external
program or shell script, and it's enough to specify program arguments and
environment variables.
If you need to run multiple commands, chain them together or pipe output
from one command to the next, you're probably better off using something like
`plumbum`_, and wrapping plumbum commands in normal luigi
:py:class:`~luigi.task.Task` s.
.. _plumbum: https://plumbum.readthedocs.io/
import logging
import os
import re
import signal
import subprocess
import sys
import tempfile
from contextlib import contextmanager
from multiprocessing import Process
from time import sleep
import luigi
from luigi.parameter import ParameterVisibility
logger = logging.getLogger('luigi-interface')
class ExternalProgramTask(luigi.Task):
Template task for running an external program in a subprocess
The program is run using :py:class:`subprocess.Popen`, with ``args`` passed
as a list, generated by :py:meth:`program_args` (where the first element should
be the executable). See :py:class:`subprocess.Popen` for details.
Your must override :py:meth:`program_args` to specify the arguments you want,
and you can optionally override :py:meth:`program_environment` if you want to
control the environment variables (see :py:class:`ExternalPythonProgramTask`
for an example).
By default, the output (stdout and stderr) of the run external program
is being captured and displayed after the execution has ended. This
behaviour can be overridden by passing ``--capture-output False``
capture_output = luigi.BoolParameter(default=True, significant=False, positional=False)
stream_for_searching_tracking_url = luigi.parameter.ChoiceParameter(
var_type=str, choices=['none', 'stdout', 'stderr'], default='none',
significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
description="Stream for searching tracking URL")
Used for defining which stream should be tracked for URL, may be set to 'stdout', 'stderr' or 'none'.
Default value is 'none', so URL tracking is not performed.
tracking_url_pattern = luigi.OptionalParameter(
default=None, significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
description="Regex pattern used for searching URL in the logs of the external program")
Regex pattern used for searching URL in the logs of the external program.
If a log line matches the regex, the first group in the matching is set as the tracking URL
for the job in the web UI. Example: 'Job UI is here: (https?://.*)'.
Default value is None, so URL tracking is not performed.
def program_args(self):
Override this method to map your task parameters to the program arguments
:return: list to pass as ``args`` to :py:class:`subprocess.Popen`
raise NotImplementedError
def program_environment(self):
Override this method to control environment variables for the program
:return: dict mapping environment variable names to values
env = os.environ.copy()
return env
def always_log_stderr(self):
When True, stderr will be logged even if program execution succeeded
Override to False to log stderr only when program execution fails.
return True
def _clean_output_file(self, file_object):
return ''.join(map(lambda s: s.decode('utf-8'), file_object.readlines()))
def build_tracking_url(self, logs_output):
This method is intended for transforming pattern match in logs to an URL
:param logs_output: Found match of `self.tracking_url_pattern`
:return: a tracking URL for the task
return logs_output
def run(self):
args = list(map(str, self.program_args()))
logger.info('Running command: %s', ' '.join(args))
env = self.program_environment()
kwargs = {'env': env}
tmp_stdout, tmp_stderr = None, None
if self.capture_output:
tmp_stdout, tmp_stderr = tempfile.TemporaryFile(), tempfile.TemporaryFile()
kwargs.update({'stdout': tmp_stdout, 'stderr': tmp_stderr})
if self.stream_for_searching_tracking_url != 'none' and self.tracking_url_pattern is not None:
with self._proc_with_tracking_url_context(proc_args=args, proc_kwargs=kwargs) as proc:
proc = subprocess.Popen(args, **kwargs)
with ExternalProgramRunContext(proc):
success = proc.returncode == 0
if self.capture_output:
stdout = self._clean_output_file(tmp_stdout)
stderr = self._clean_output_file(tmp_stderr)
if stdout:
logger.info('Program stdout:\n{}'.format(stdout))
if stderr:
if self.always_log_stderr or not success:
logger.info('Program stderr:\n{}'.format(stderr))
stdout, stderr = None, None
if not success:
raise ExternalProgramRunError(
'Program failed with return code={}:'.format(proc.returncode),
args, env=env, stdout=stdout, stderr=stderr)
if self.capture_output:
def _proc_with_tracking_url_context(self, proc_args, proc_kwargs):
time_to_sleep = 0.5
file_to_write = proc_kwargs.get(self.stream_for_searching_tracking_url)
proc_kwargs.update({self.stream_for_searching_tracking_url: subprocess.PIPE})
main_proc = subprocess.Popen(proc_args, **proc_kwargs)
pipe_to_read = main_proc.stderr if self.stream_for_searching_tracking_url == 'stderr' else main_proc.stdout
def _track_url_by_pattern():
Scans the pipe looking for a passed pattern, if the pattern is found, `set_tracking_url` callback is sent.
If tmp_stdout is passed, also appends lines to this file.
pattern = re.compile(self.tracking_url_pattern)
for new_line in iter(pipe_to_read.readline, ''):
if new_line:
if file_to_write:
match = re.search(pattern, new_line.decode('utf-8'))
if match:
track_proc = Process(target=_track_url_by_pattern)
with ExternalProgramRunContext(main_proc):
yield main_proc
# need to wait a bit to let the subprocess read the last lines
track_proc.join(time_to_sleep * 2)
if track_proc.is_alive():
class ExternalProgramRunContext:
def __init__(self, proc):
self.proc = proc
def __enter__(self):
self.__old_signal = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.kill_job)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is KeyboardInterrupt:
signal.signal(signal.SIGTERM, self.__old_signal)
def kill_job(self, captured_signal=None, stack_frame=None):
if captured_signal is not None:
# adding 128 gives the exit code corresponding to a signal
sys.exit(128 + captured_signal)
class ExternalProgramRunError(RuntimeError):
def __init__(self, message, args, env=None, stdout=None, stderr=None):
super(ExternalProgramRunError, self).__init__(message, args, env, stdout, stderr)
self.message = message
self.args = args
self.env = env
self.out = stdout
self.err = stderr
def __str__(self):
info = self.message
info += '\nCOMMAND: {}'.format(' '.join(self.args))
info += '\nSTDOUT: {}'.format(self.out or '[empty]')
info += '\nSTDERR: {}'.format(self.err or '[empty]')
env_string = None
if self.env:
env_string = ' '.join(['='.join([k, '\'{}\''.format(v)]) for k, v in self.env.items()])
info += '\nENVIRONMENT: {}'.format(env_string or '[empty]')
# reset terminal color in case the ENVIRONMENT changes colors
info += '\033[m'
return info
class ExternalPythonProgramTask(ExternalProgramTask):
Template task for running an external Python program in a subprocess
Simple extension of :py:class:`ExternalProgramTask`, adding two
:py:class:`luigi.parameter.Parameter` s for setting a virtualenv and for
extending the ``PYTHONPATH``.
virtualenv = luigi.OptionalParameter(
description='path to the virtualenv directory to use. It should point to '
'the directory containing the ``bin/activate`` file used for '
'enabling the virtualenv.')
extra_pythonpath = luigi.OptionalParameter(
description='extend the search path for modules by prepending this '
'value to the ``PYTHONPATH`` environment variable.')
def program_environment(self):
env = super(ExternalPythonProgramTask, self).program_environment()
if self.extra_pythonpath:
pythonpath = ':'.join([self.extra_pythonpath, env.get('PYTHONPATH', '')])
env.update({'PYTHONPATH': pythonpath})
if self.virtualenv:
# Make the same changes to the env that a normal venv/bin/activate script would
path = ':'.join(['{}/bin'.format(self.virtualenv), env.get('PATH', '')])
'PATH': path,
'VIRTUAL_ENV': self.virtualenv
# remove PYTHONHOME env variable, if it exists
env.pop('PYTHONHOME', None)
return env