# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Apache Pig support.
Example configuration section in luigi.cfg::
[pig]
# pig home directory
home: /usr/share/pig
"""
from contextlib import contextmanager
import logging
import os
import select
import signal
import subprocess
import sys
import tempfile
from luigi import six
import luigi
from luigi import configuration
logger = logging.getLogger('luigi-interface')
[docs]class PigJobTask(luigi.Task):
[docs] def pig_home(self):
return configuration.get_config().get('pig', 'home', '/usr/share/pig')
[docs] def pig_command_path(self):
return os.path.join(self.pig_home(), "bin/pig")
[docs] def pig_env_vars(self):
"""
Dictionary of environment variables that should be set when running Pig.
Ex::
return { 'PIG_CLASSPATH': '/your/path' }
"""
return {}
[docs] def pig_properties(self):
"""
Dictionary of properties that should be set when running Pig.
Example::
return { 'pig.additional.jars':'/path/to/your/jar' }
"""
return {}
[docs] def pig_parameters(self):
"""
Dictionary of parameters that should be set for the Pig job.
Example::
return { 'YOUR_PARAM_NAME':'Your param value' }
"""
return {}
[docs] def pig_options(self):
"""
List of options that will be appended to the Pig command.
Example::
return ['-x', 'local']
"""
return []
[docs] def output(self):
raise NotImplementedError("subclass should define output path")
[docs] def pig_script_path(self):
"""
Return the path to the Pig script to be run.
"""
raise NotImplementedError("subclass should define pig_script_path")
@contextmanager
def _build_pig_cmd(self):
opts = self.pig_options()
def line(k, v):
return ('%s=%s%s' % (k, v, os.linesep)).encode('utf-8')
with tempfile.NamedTemporaryFile() as param_file, tempfile.NamedTemporaryFile() as prop_file:
if self.pig_parameters():
items = six.iteritems(self.pig_parameters())
param_file.writelines(line(k, v) for (k, v) in items)
param_file.flush()
opts.append('-param_file')
opts.append(param_file.name)
if self.pig_properties():
items = six.iteritems(self.pig_properties())
prop_file.writelines(line(k, v) for (k, v) in items)
prop_file.flush()
opts.append('-propertyFile')
opts.append(prop_file.name)
cmd = [self.pig_command_path()] + opts + ["-f", self.pig_script_path()]
logger.info(subprocess.list2cmdline(cmd))
yield cmd
[docs] def run(self):
with self._build_pig_cmd() as cmd:
self.track_and_progress(cmd)
[docs] def track_and_progress(self, cmd):
temp_stdout = tempfile.TemporaryFile('wb')
env = os.environ.copy()
env['PIG_HOME'] = self.pig_home()
for k, v in six.iteritems(self.pig_env_vars()):
env[k] = v
proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
reads = [proc.stderr.fileno(), proc.stdout.fileno()]
# tracking the possible problems with this job
err_lines = []
with PigRunContext():
while proc.poll() is None:
ret = select.select(reads, [], [])
for fd in ret[0]:
if fd == proc.stderr.fileno():
line = proc.stderr.readline().decode('utf8')
err_lines.append(line)
if fd == proc.stdout.fileno():
line_bytes = proc.stdout.readline()
temp_stdout.write(line_bytes)
line = line_bytes.decode('utf8')
err_line = line.lower()
if err_line.find('More information at:') != -1:
logger.info(err_line.split('more information at: ')[-1].strip())
if err_line.find(' - '):
t = err_line.split(' - ')[-1].strip()
if t != "":
logger.info(t)
# Read the rest + stdout
err = ''.join(err_lines + [an_err_line.decode('utf8') for an_err_line in proc.stderr])
if proc.returncode == 0:
logger.info("Job completed successfully!")
else:
logger.error("Error when running script:\n%s", self.pig_script_path())
logger.error(err)
raise PigJobError("Pig script failed with return value: %s" % (proc.returncode,), err=err)
[docs]class PigRunContext(object):
def __init__(self):
self.job_id = None
def __enter__(self):
self.__old_signal = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.kill_job)
return self
[docs] def kill_job(self, captured_signal=None, stack_frame=None):
if self.job_id:
logger.info('Job interrupted, killing job %s', self.job_id)
subprocess.call(['pig', '-e', '"kill %s"' % self.job_id])
if captured_signal is not None:
# adding 128 gives the exit code corresponding to a signal
sys.exit(128 + captured_signal)
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is KeyboardInterrupt:
self.kill_job()
signal.signal(signal.SIGTERM, self.__old_signal)
[docs]class PigJobError(RuntimeError):
def __init__(self, message, out=None, err=None):
super(PigJobError, self).__init__(message, out, err)
self.message = message
self.out = out
self.err = err
def __str__(self):
info = self.message
if self.out:
info += "\nSTDOUT: " + str(self.out)
if self.err:
info += "\nSTDERR: " + str(self.err)
return info