Source code for luigi.contrib.hadoop_jar

# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Provides functionality to run a Hadoop job using a Jar
"""

import logging
import os
import random
import shlex
import warnings

import luigi.contrib.hadoop
import luigi.contrib.hdfs

logger = logging.getLogger('luigi-interface')


[docs] def fix_paths(job): """ Coerce input arguments to use temporary files when used for output. Return a list of temporary file pairs (tmpfile, destination path) and a list of arguments. Converts each HdfsTarget to a string for the path. """ tmp_files = [] args = [] for x in job.args(): if isinstance(x, luigi.contrib.hdfs.HdfsTarget): # input/output if x.exists() or not job.atomic_output(): # input args.append(x.path) else: # output x_path_no_slash = x.path[:-1] if x.path[-1] == '/' else x.path y = luigi.contrib.hdfs.HdfsTarget(x_path_no_slash + '-luigi-tmp-%09d' % random.randrange(0, 10_000_000_000)) tmp_files.append((y, x_path_no_slash)) logger.info('Using temp path: %s for path %s', y.path, x.path) args.append(y.path) else: try: # hopefully the target has a path to use args.append(x.path) except AttributeError: # if there's no path then hope converting it to a string will work args.append(str(x)) return (tmp_files, args)
[docs] class HadoopJarJobError(Exception): pass
[docs] class HadoopJarJobRunner(luigi.contrib.hadoop.JobRunner): """ JobRunner for `hadoop jar` commands. Used to run a HadoopJarJobTask. """ def __init__(self): pass
[docs] def run_job(self, job, tracking_url_callback=None): if tracking_url_callback is not None: warnings.warn("tracking_url_callback argument is deprecated, task.set_tracking_url is " "used instead.", DeprecationWarning) # TODO(jcrobak): libjars, files, etc. Can refactor out of # hadoop.HadoopJobRunner if not job.jar(): raise HadoopJarJobError("Jar not defined") hadoop_arglist = luigi.contrib.hdfs.load_hadoop_cmd() + ['jar', job.jar()] if job.main(): hadoop_arglist.append(job.main()) jobconfs = job.jobconfs() for jc in jobconfs: hadoop_arglist += ['-D' + jc] (tmp_files, job_args) = fix_paths(job) hadoop_arglist += job_args ssh_config = job.ssh() if ssh_config: host = ssh_config.get("host", None) key_file = ssh_config.get("key_file", None) username = ssh_config.get("username", None) if not host or not key_file or not username: raise HadoopJarJobError("missing some config for HadoopRemoteJarJobRunner") arglist = ['ssh', '-i', key_file, '-o', 'BatchMode=yes'] # no password prompts etc if ssh_config.get("no_host_key_check", False): arglist += ['-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no'] arglist.append('{}@{}'.format(username, host)) hadoop_arglist = [shlex.quote(arg) for arg in hadoop_arglist] arglist.append(' '.join(hadoop_arglist)) else: if not os.path.exists(job.jar()): logger.error("Can't find jar: %s, full path %s", job.jar(), os.path.abspath(job.jar())) raise HadoopJarJobError("job jar does not exist") arglist = hadoop_arglist luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, job.set_tracking_url) for a, b in tmp_files: a.move(b)
[docs] class HadoopJarJobTask(luigi.contrib.hadoop.BaseHadoopJobTask): """ A job task for `hadoop jar` commands that define a jar and (optional) main method. """
[docs] def jar(self): """ Path to the jar for this Hadoop Job. """ return None
[docs] def main(self): """ optional main method for this Hadoop Job. """ return None
[docs] def job_runner(self): # We recommend that you define a subclass, override this method and set up your own config return HadoopJarJobRunner()
[docs] def atomic_output(self): """ If True, then rewrite output arguments to be temp locations and atomically move them into place after the job finishes. """ return True
[docs] def ssh(self): """ Set this to run hadoop command remotely via ssh. It needs to be a dict that looks like {"host": "myhost", "key_file": None, "username": None, ["no_host_key_check": False]} """ return None
[docs] def args(self): """ Returns an array of args to pass to the job (after hadoop jar <jar> <main>). """ return []