# -*- coding: utf-8 -*-
# Copyright 2012-2015 Spotify AB
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import re
import subprocess
import warnings

from luigi import six

import luigi.configuration
import luigi.contrib.hadoop
import luigi.contrib.hadoop_jar
import luigi.contrib.hdfs
from luigi import LocalTarget
from luigi.task import flatten

logger = logging.getLogger('luigi-interface')

Scalding support for Luigi.

Example configuration section in luigi.cfg::

    # scala home directory, which should include a lib subdir with scala jars.
    scala-home: /usr/share/scala

    # scalding home directory, which should include a lib subdir with
    # scalding-*-assembly-* jars as built from the official Twitter build script.
    scalding-home: /usr/share/scalding

    # provided dependencies, e.g. jars required for compiling but not executing
    # scalding jobs. Currently required jars:
    # org.apache.hadoop/hadoop-core/0.20.2
    # org.slf4j/slf4j-log4j12/1.6.6
    # log4j/log4j/1.2.15
    # commons-httpclient/commons-httpclient/3.1
    # commons-cli/commons-cli/1.2
    # org.apache.zookeeper/zookeeper/3.3.4
    scalding-provided: /usr/share/scalding/provided

    # additional jars required.
    scalding-libjars: /usr/share/scalding/libjars

[docs]class ScaldingJobRunner(luigi.contrib.hadoop.JobRunner): """ JobRunner for `pyscald` commands. Used to run a ScaldingJobTask. """ def __init__(self): conf = luigi.configuration.get_config() default = os.environ.get('SCALA_HOME', '/usr/share/scala') self.scala_home = conf.get('scalding', 'scala-home', default) default = os.environ.get('SCALDING_HOME', '/usr/share/scalding') self.scalding_home = conf.get('scalding', 'scalding-home', default) self.provided_dir = conf.get( 'scalding', 'scalding-provided', os.path.join(default, 'provided')) self.libjars_dir = conf.get( 'scalding', 'scalding-libjars', os.path.join(default, 'libjars')) self.tmp_dir = LocalTarget(is_tmp=True) def _get_jars(self, path): return [os.path.join(path, j) for j in os.listdir(path) if j.endswith('.jar')]
[docs] def get_scala_jars(self, include_compiler=False): lib_dir = os.path.join(self.scala_home, 'lib') jars = [os.path.join(lib_dir, 'scala-library.jar')] # additional jar for scala 2.10 only reflect = os.path.join(lib_dir, 'scala-reflect.jar') if os.path.exists(reflect): jars.append(reflect) if include_compiler: jars.append(os.path.join(lib_dir, 'scala-compiler.jar')) return jars
[docs] def get_scalding_jars(self): lib_dir = os.path.join(self.scalding_home, 'lib') return self._get_jars(lib_dir)
[docs] def get_scalding_core(self): lib_dir = os.path.join(self.scalding_home, 'lib') for j in os.listdir(lib_dir): if j.startswith('scalding-core-'): p = os.path.join(lib_dir, j) logger.debug('Found scalding-core: %s', p) return p raise luigi.contrib.hadoop.HadoopJobError('Could not find scalding-core.')
[docs] def get_provided_jars(self): return self._get_jars(self.provided_dir)
[docs] def get_libjars(self): return self._get_jars(self.libjars_dir)
[docs] def get_tmp_job_jar(self, source): job_name = os.path.basename(os.path.splitext(source)[0]) return os.path.join(self.tmp_dir.path, job_name + '.jar')
[docs] def get_build_dir(self, source): build_dir = os.path.join(self.tmp_dir.path, 'build') return build_dir
[docs] def get_job_class(self, source): # find name of the job class # usually the one that matches file name or last class that extends Job job_name = os.path.splitext(os.path.basename(source))[0] package = None job_class = None for l in open(source).readlines(): p ='package\s+([^\s\(]+)', l) if p: package = p.groups()[0] p ='class\s+([^\s\(]+).*extends\s+.*Job', l) if p: job_class = p.groups()[0] if job_class == job_name: break if job_class: if package: job_class = package + '.' + job_class logger.debug('Found scalding job class: %s', job_class) return job_class else: raise luigi.contrib.hadoop.HadoopJobError('Coudl not find scalding job class.')
[docs] def build_job_jar(self, job): job_jar = job.jar() if job_jar: if not os.path.exists(job_jar): logger.error("Can't find jar: %s, full path %s", job_jar, os.path.abspath(job_jar)) raise Exception("job jar does not exist") if not job.job_class(): logger.error("Undefined job_class()") raise Exception("Undefined job_class()") return job_jar job_src = job.source() if not job_src: logger.error("Both source() and jar() undefined") raise Exception("Both source() and jar() undefined") if not os.path.exists(job_src): logger.error("Can't find source: %s, full path %s", job_src, os.path.abspath(job_src)) raise Exception("job source does not exist") job_src = job.source() job_jar = self.get_tmp_job_jar(job_src) build_dir = self.get_build_dir(job_src) if not os.path.exists(build_dir): os.makedirs(build_dir) classpath = ':'.join(filter(None, self.get_scalding_jars() + self.get_provided_jars() + self.get_libjars() + job.extra_jars())) scala_cp = ':'.join(self.get_scala_jars(include_compiler=True)) # compile scala source arglist = ['java', '-cp', scala_cp, '', '-classpath', classpath, '-d', build_dir, job_src]'Compiling scala source: %s', subprocess.list2cmdline(arglist)) subprocess.check_call(arglist) # build job jar file arglist = ['jar', 'cf', job_jar, '-C', build_dir, '.']'Building job jar: %s', subprocess.list2cmdline(arglist)) subprocess.check_call(arglist) return job_jar
[docs] def run_job(self, job, tracking_url_callback=None): if tracking_url_callback is not None: warnings.warn("tracking_url_callback argument is deprecated, task.set_tracking_url is " "used instead.", DeprecationWarning) job_jar = self.build_job_jar(job) jars = [job_jar] + self.get_libjars() + job.extra_jars() scalding_core = self.get_scalding_core() libjars = ','.join(filter(None, jars)) arglist = luigi.contrib.hdfs.load_hadoop_cmd() + ['jar', scalding_core, '-libjars', libjars] arglist += ['-D%s' % c for c in job.jobconfs()] job_class = job.job_class() or self.get_job_class(job.source()) arglist += [job_class, '--hdfs'] # scalding does not parse argument with '=' properly arglist += ['--name', job.task_id.replace('=', ':')] (tmp_files, job_args) = luigi.contrib.hadoop_jar.fix_paths(job) arglist += job_args env = os.environ.copy() jars.append(scalding_core) hadoop_cp = ':'.join(filter(None, jars)) env['HADOOP_CLASSPATH'] = hadoop_cp"Submitting Hadoop job: HADOOP_CLASSPATH=%s %s", hadoop_cp, subprocess.list2cmdline(arglist)) luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, job.set_tracking_url, env=env) for a, b in tmp_files: a.move(b)
[docs]class ScaldingJobTask(luigi.contrib.hadoop.BaseHadoopJobTask): """ A job task for Scalding that define a scala source and (optional) main method. requires() should return a dictionary where the keys are Scalding argument names and values are sub tasks or lists of subtasks. For example: .. code-block:: python {'input1': A, 'input2': C} => --input1 <Aoutput> --input2 <Coutput> {'input1': [A, B], 'input2': [C]} => --input1 <Aoutput> <Boutput> --input2 <Coutput> """
[docs] def relpath(self, current_file, rel_path): """ Compute path given current file and relative path. """ script_dir = os.path.dirname(os.path.abspath(current_file)) rel_path = os.path.abspath(os.path.join(script_dir, rel_path)) return rel_path
[docs] def source(self): """ Path to the scala source for this Scalding Job Either one of source() or jar() must be specified. """ return None
[docs] def jar(self): """ Path to the jar file for this Scalding Job Either one of source() or jar() must be specified. """ return None
[docs] def extra_jars(self): """ Extra jars for building and running this Scalding Job. """ return []
[docs] def job_class(self): """ optional main job class for this Scalding Job. """ return None
[docs] def job_runner(self): return ScaldingJobRunner()
[docs] def atomic_output(self): """ If True, then rewrite output arguments to be temp locations and atomically move them into place after the job finishes. """ return True
[docs] def requires(self): return {}
[docs] def job_args(self): """ Extra arguments to pass to the Scalding job. """ return []
[docs] def args(self): """ Returns an array of args to pass to the job. """ arglist = [] for k, v in six.iteritems(self.requires_hadoop()): arglist.append('--' + k) arglist.extend([t.output().path for t in flatten(v)]) arglist.extend(['--output', self.output()]) arglist.extend(self.job_args()) return arglist