Source code for luigi.contrib.pig

# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Apache Pig support.
Example configuration section in luigi.cfg::

    [pig]
    # pig home directory
    home: /usr/share/pig
"""
from contextlib import contextmanager
import logging
import os
import select
import signal
import subprocess
import sys
import tempfile

from luigi import six

import luigi
from luigi import configuration

logger = logging.getLogger('luigi-interface')


[docs]class PigJobTask(luigi.Task):
[docs] def pig_home(self): return configuration.get_config().get('pig', 'home', '/usr/share/pig')
[docs] def pig_command_path(self): return os.path.join(self.pig_home(), "bin/pig")
[docs] def pig_env_vars(self): """ Dictionary of environment variables that should be set when running Pig. Ex:: return { 'PIG_CLASSPATH': '/your/path' } """ return {}
[docs] def pig_properties(self): """ Dictionary of properties that should be set when running Pig. Example:: return { 'pig.additional.jars':'/path/to/your/jar' } """ return {}
[docs] def pig_parameters(self): """ Dictionary of parameters that should be set for the Pig job. Example:: return { 'YOUR_PARAM_NAME':'Your param value' } """ return {}
[docs] def pig_options(self): """ List of options that will be appended to the Pig command. Example:: return ['-x', 'local'] """ return []
[docs] def output(self): raise NotImplementedError("subclass should define output path")
[docs] def pig_script_path(self): """ Return the path to the Pig script to be run. """ raise NotImplementedError("subclass should define pig_script_path")
@contextmanager def _build_pig_cmd(self): opts = self.pig_options() def line(k, v): return ('%s=%s%s' % (k, v, os.linesep)).encode('utf-8') with tempfile.NamedTemporaryFile() as param_file, tempfile.NamedTemporaryFile() as prop_file: if self.pig_parameters(): items = six.iteritems(self.pig_parameters()) param_file.writelines(line(k, v) for (k, v) in items) param_file.flush() opts.append('-param_file') opts.append(param_file.name) if self.pig_properties(): items = six.iteritems(self.pig_properties()) prop_file.writelines(line(k, v) for (k, v) in items) prop_file.flush() opts.append('-propertyFile') opts.append(prop_file.name) cmd = [self.pig_command_path()] + opts + ["-f", self.pig_script_path()] logger.info(subprocess.list2cmdline(cmd)) yield cmd
[docs] def run(self): with self._build_pig_cmd() as cmd: self.track_and_progress(cmd)
[docs] def track_and_progress(self, cmd): temp_stdout = tempfile.TemporaryFile('wb') env = os.environ.copy() env['PIG_HOME'] = self.pig_home() for k, v in six.iteritems(self.pig_env_vars()): env[k] = v proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) reads = [proc.stderr.fileno(), proc.stdout.fileno()] # tracking the possible problems with this job err_lines = [] with PigRunContext(): while proc.poll() is None: ret = select.select(reads, [], []) for fd in ret[0]: if fd == proc.stderr.fileno(): line = proc.stderr.readline().decode('utf8') err_lines.append(line) if fd == proc.stdout.fileno(): line_bytes = proc.stdout.readline() temp_stdout.write(line_bytes) line = line_bytes.decode('utf8') err_line = line.lower() if err_line.find('More information at:') != -1: logger.info(err_line.split('more information at: ')[-1].strip()) if err_line.find(' - '): t = err_line.split(' - ')[-1].strip() if t != "": logger.info(t) # Read the rest + stdout err = ''.join(err_lines + [an_err_line.decode('utf8') for an_err_line in proc.stderr]) if proc.returncode == 0: logger.info("Job completed successfully!") else: logger.error("Error when running script:\n%s", self.pig_script_path()) logger.error(err) raise PigJobError("Pig script failed with return value: %s" % (proc.returncode,), err=err)
[docs]class PigRunContext(object): def __init__(self): self.job_id = None def __enter__(self): self.__old_signal = signal.getsignal(signal.SIGTERM) signal.signal(signal.SIGTERM, self.kill_job) return self
[docs] def kill_job(self, captured_signal=None, stack_frame=None): if self.job_id: logger.info('Job interrupted, killing job %s', self.job_id) subprocess.call(['pig', '-e', '"kill %s"' % self.job_id]) if captured_signal is not None: # adding 128 gives the exit code corresponding to a signal sys.exit(128 + captured_signal)
def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is KeyboardInterrupt: self.kill_job() signal.signal(signal.SIGTERM, self.__old_signal)
[docs]class PigJobError(RuntimeError): def __init__(self, message, out=None, err=None): super(PigJobError, self).__init__(message, out, err) self.message = message self.out = out self.err = err def __str__(self): info = self.message if self.out: info += "\nSTDOUT: " + str(self.out) if self.err: info += "\nSTDERR: " + str(self.err) return info