Source code for luigi.contrib.external_program

# -*- coding: utf-8 -*-
# Copyright 2012-2016 Spotify AB
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Template tasks for running external programs as luigi tasks.

This module is primarily intended for when you need to call a single external
program or shell script, and it's enough to specify program arguments and
environment variables.

If you need to run multiple commands, chain them together or pipe output
from one command to the next, you're probably better off using something like
`plumbum`_, and wrapping plumbum commands in normal luigi
:py:class:`~luigi.task.Task` s.

.. _plumbum:

import logging
import os
import re
import signal
import subprocess
import sys
import tempfile
from contextlib import contextmanager
from multiprocessing import Process
from time import sleep

import luigi
from luigi.parameter import ParameterVisibility

logger = logging.getLogger('luigi-interface')

[docs]class ExternalProgramTask(luigi.Task): """ Template task for running an external program in a subprocess The program is run using :py:class:`subprocess.Popen`, with ``args`` passed as a list, generated by :py:meth:`program_args` (where the first element should be the executable). See :py:class:`subprocess.Popen` for details. Your must override :py:meth:`program_args` to specify the arguments you want, and you can optionally override :py:meth:`program_environment` if you want to control the environment variables (see :py:class:`ExternalPythonProgramTask` for an example). By default, the output (stdout and stderr) of the run external program is being captured and displayed after the execution has ended. This behaviour can be overridden by passing ``--capture-output False`` """ capture_output = luigi.BoolParameter(default=True, significant=False, positional=False) stream_for_searching_tracking_url = luigi.parameter.ChoiceParameter( var_type=str, choices=['none', 'stdout', 'stderr'], default='none', significant=False, positional=False, visibility=ParameterVisibility.HIDDEN, description="Stream for searching tracking URL") """ Used for defining which stream should be tracked for URL, may be set to 'stdout', 'stderr' or 'none'. Default value is 'none', so URL tracking is not performed. """ tracking_url_pattern = luigi.OptionalParameter( default=None, significant=False, positional=False, visibility=ParameterVisibility.HIDDEN, description="Regex pattern used for searching URL in the logs of the external program") """ Regex pattern used for searching URL in the logs of the external program. If a log line matches the regex, the first group in the matching is set as the tracking URL for the job in the web UI. Example: 'Job UI is here: (https?://.*)'. Default value is None, so URL tracking is not performed. """
[docs] def program_args(self): """ Override this method to map your task parameters to the program arguments :return: list to pass as ``args`` to :py:class:`subprocess.Popen` """ raise NotImplementedError
[docs] def program_environment(self): """ Override this method to control environment variables for the program :return: dict mapping environment variable names to values """ env = os.environ.copy() return env
@property def always_log_stderr(self): """ When True, stderr will be logged even if program execution succeeded Override to False to log stderr only when program execution fails. """ return True def _clean_output_file(self, file_object): return ''.join(map(lambda s: s.decode('utf-8'), file_object.readlines()))
[docs] def build_tracking_url(self, logs_output): """ This method is intended for transforming pattern match in logs to an URL :param logs_output: Found match of `self.tracking_url_pattern` :return: a tracking URL for the task """ return logs_output
[docs] def run(self): args = list(map(str, self.program_args()))'Running command: %s', ' '.join(args)) env = self.program_environment() kwargs = {'env': env} tmp_stdout, tmp_stderr = None, None if self.capture_output: tmp_stdout, tmp_stderr = tempfile.TemporaryFile(), tempfile.TemporaryFile() kwargs.update({'stdout': tmp_stdout, 'stderr': tmp_stderr}) try: if self.stream_for_searching_tracking_url != 'none' and self.tracking_url_pattern is not None: with self._proc_with_tracking_url_context(proc_args=args, proc_kwargs=kwargs) as proc: proc.wait() else: proc = subprocess.Popen(args, **kwargs) with ExternalProgramRunContext(proc): proc.wait() success = proc.returncode == 0 if self.capture_output: stdout = self._clean_output_file(tmp_stdout) stderr = self._clean_output_file(tmp_stderr) if stdout:'Program stdout:\n{}'.format(stdout)) if stderr: if self.always_log_stderr or not success:'Program stderr:\n{}'.format(stderr)) else: stdout, stderr = None, None if not success: raise ExternalProgramRunError( 'Program failed with return code={}:'.format(proc.returncode), args, env=env, stdout=stdout, stderr=stderr) finally: if self.capture_output: tmp_stderr.close() tmp_stdout.close()
@contextmanager def _proc_with_tracking_url_context(self, proc_args, proc_kwargs): time_to_sleep = 0.5 file_to_write = proc_kwargs.get(self.stream_for_searching_tracking_url) proc_kwargs.update({self.stream_for_searching_tracking_url: subprocess.PIPE}) main_proc = subprocess.Popen(proc_args, **proc_kwargs) pipe_to_read = main_proc.stderr if self.stream_for_searching_tracking_url == 'stderr' else main_proc.stdout def _track_url_by_pattern(): """ Scans the pipe looking for a passed pattern, if the pattern is found, `set_tracking_url` callback is sent. If tmp_stdout is passed, also appends lines to this file. """ pattern = re.compile(self.tracking_url_pattern) for new_line in iter(pipe_to_read.readline, ''): if new_line: if file_to_write: file_to_write.write(new_line) match =, new_line.decode('utf-8')) if match: self.set_tracking_url( self.build_tracking_url( ) else: sleep(time_to_sleep) track_proc = Process(target=_track_url_by_pattern) try: track_proc.start() with ExternalProgramRunContext(main_proc): yield main_proc finally: # need to wait a bit to let the subprocess read the last lines track_proc.join(time_to_sleep * 2) if track_proc.is_alive(): track_proc.terminate() pipe_to_read.close()
[docs]class ExternalProgramRunContext(object): def __init__(self, proc): self.proc = proc def __enter__(self): self.__old_signal = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.kill_job) return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is KeyboardInterrupt: self.kill_job() signal.signal(signal.SIGTERM, self.__old_signal)
[docs] def kill_job(self, captured_signal=None, stack_frame=None): self.proc.kill() if captured_signal is not None: # adding 128 gives the exit code corresponding to a signal sys.exit(128 + captured_signal)
[docs]class ExternalProgramRunError(RuntimeError): def __init__(self, message, args, env=None, stdout=None, stderr=None): super(ExternalProgramRunError, self).__init__(message, args, env, stdout, stderr) self.message = message self.args = args self.env = env self.out = stdout self.err = stderr def __str__(self): info = self.message info += '\nCOMMAND: {}'.format(' '.join(self.args)) info += '\nSTDOUT: {}'.format(self.out or '[empty]') info += '\nSTDERR: {}'.format(self.err or '[empty]') env_string = None if self.env: env_string = ' '.join(['='.join([k, '\'{}\''.format(v)]) for k, v in self.env.items()]) info += '\nENVIRONMENT: {}'.format(env_string or '[empty]') # reset terminal color in case the ENVIRONMENT changes colors info += '\033[m' return info
[docs]class ExternalPythonProgramTask(ExternalProgramTask): """ Template task for running an external Python program in a subprocess Simple extension of :py:class:`ExternalProgramTask`, adding two :py:class:`luigi.parameter.Parameter` s for setting a virtualenv and for extending the ``PYTHONPATH``. """ virtualenv = luigi.Parameter( default=None, positional=False, description='path to the virtualenv directory to use. It should point to ' 'the directory containing the ``bin/activate`` file used for ' 'enabling the virtualenv.') extra_pythonpath = luigi.Parameter( default=None, positional=False, description='extend the search path for modules by prepending this ' 'value to the ``PYTHONPATH`` environment variable.')
[docs] def program_environment(self): env = super(ExternalPythonProgramTask, self).program_environment() if self.extra_pythonpath: pythonpath = ':'.join([self.extra_pythonpath, env.get('PYTHONPATH', '')]) env.update({'PYTHONPATH': pythonpath}) if self.virtualenv: # Make the same changes to the env that a normal venv/bin/activate script would path = ':'.join(['{}/bin'.format(self.virtualenv), env.get('PATH', '')]) env.update({ 'PATH': path, 'VIRTUAL_ENV': self.virtualenv }) # remove PYTHONHOME env variable, if it exists env.pop('PYTHONHOME', None) return env