# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import re
import subprocess
import warnings
from luigi import six
import luigi.configuration
import luigi.contrib.hadoop
import luigi.contrib.hadoop_jar
import luigi.contrib.hdfs
from luigi import LocalTarget
from luigi.task import flatten
logger = logging.getLogger('luigi-interface')
"""
Scalding support for Luigi.
Example configuration section in luigi.cfg::
[scalding]
# scala home directory, which should include a lib subdir with scala jars.
scala-home: /usr/share/scala
# scalding home directory, which should include a lib subdir with
# scalding-*-assembly-* jars as built from the official Twitter build script.
scalding-home: /usr/share/scalding
# provided dependencies, e.g. jars required for compiling but not executing
# scalding jobs. Currently required jars:
# org.apache.hadoop/hadoop-core/0.20.2
# org.slf4j/slf4j-log4j12/1.6.6
# log4j/log4j/1.2.15
# commons-httpclient/commons-httpclient/3.1
# commons-cli/commons-cli/1.2
# org.apache.zookeeper/zookeeper/3.3.4
scalding-provided: /usr/share/scalding/provided
# additional jars required.
scalding-libjars: /usr/share/scalding/libjars
"""
[docs]class ScaldingJobRunner(luigi.contrib.hadoop.JobRunner):
"""
JobRunner for `pyscald` commands. Used to run a ScaldingJobTask.
"""
def __init__(self):
conf = luigi.configuration.get_config()
default = os.environ.get('SCALA_HOME', '/usr/share/scala')
self.scala_home = conf.get('scalding', 'scala-home', default)
default = os.environ.get('SCALDING_HOME', '/usr/share/scalding')
self.scalding_home = conf.get('scalding', 'scalding-home', default)
self.provided_dir = conf.get(
'scalding', 'scalding-provided', os.path.join(default, 'provided'))
self.libjars_dir = conf.get(
'scalding', 'scalding-libjars', os.path.join(default, 'libjars'))
self.tmp_dir = LocalTarget(is_tmp=True)
def _get_jars(self, path):
return [os.path.join(path, j) for j in os.listdir(path)
if j.endswith('.jar')]
[docs] def get_scala_jars(self, include_compiler=False):
lib_dir = os.path.join(self.scala_home, 'lib')
jars = [os.path.join(lib_dir, 'scala-library.jar')]
# additional jar for scala 2.10 only
reflect = os.path.join(lib_dir, 'scala-reflect.jar')
if os.path.exists(reflect):
jars.append(reflect)
if include_compiler:
jars.append(os.path.join(lib_dir, 'scala-compiler.jar'))
return jars
[docs] def get_scalding_jars(self):
lib_dir = os.path.join(self.scalding_home, 'lib')
return self._get_jars(lib_dir)
[docs] def get_scalding_core(self):
lib_dir = os.path.join(self.scalding_home, 'lib')
for j in os.listdir(lib_dir):
if j.startswith('scalding-core-'):
p = os.path.join(lib_dir, j)
logger.debug('Found scalding-core: %s', p)
return p
raise luigi.contrib.hadoop.HadoopJobError('Could not find scalding-core.')
[docs] def get_provided_jars(self):
return self._get_jars(self.provided_dir)
[docs] def get_libjars(self):
return self._get_jars(self.libjars_dir)
[docs] def get_tmp_job_jar(self, source):
job_name = os.path.basename(os.path.splitext(source)[0])
return os.path.join(self.tmp_dir.path, job_name + '.jar')
[docs] def get_build_dir(self, source):
build_dir = os.path.join(self.tmp_dir.path, 'build')
return build_dir
[docs] def get_job_class(self, source):
# find name of the job class
# usually the one that matches file name or last class that extends Job
job_name = os.path.splitext(os.path.basename(source))[0]
package = None
job_class = None
for l in open(source).readlines():
p = re.search(r'package\s+([^\s\(]+)', l)
if p:
package = p.groups()[0]
p = re.search(r'class\s+([^\s\(]+).*extends\s+.*Job', l)
if p:
job_class = p.groups()[0]
if job_class == job_name:
break
if job_class:
if package:
job_class = package + '.' + job_class
logger.debug('Found scalding job class: %s', job_class)
return job_class
else:
raise luigi.contrib.hadoop.HadoopJobError('Coudl not find scalding job class.')
[docs] def build_job_jar(self, job):
job_jar = job.jar()
if job_jar:
if not os.path.exists(job_jar):
logger.error("Can't find jar: %s, full path %s", job_jar, os.path.abspath(job_jar))
raise Exception("job jar does not exist")
if not job.job_class():
logger.error("Undefined job_class()")
raise Exception("Undefined job_class()")
return job_jar
job_src = job.source()
if not job_src:
logger.error("Both source() and jar() undefined")
raise Exception("Both source() and jar() undefined")
if not os.path.exists(job_src):
logger.error("Can't find source: %s, full path %s", job_src, os.path.abspath(job_src))
raise Exception("job source does not exist")
job_src = job.source()
job_jar = self.get_tmp_job_jar(job_src)
build_dir = self.get_build_dir(job_src)
if not os.path.exists(build_dir):
os.makedirs(build_dir)
classpath = ':'.join(filter(None,
self.get_scalding_jars() +
self.get_provided_jars() +
self.get_libjars() +
job.extra_jars()))
scala_cp = ':'.join(self.get_scala_jars(include_compiler=True))
# compile scala source
arglist = ['java', '-cp', scala_cp, 'scala.tools.nsc.Main',
'-classpath', classpath,
'-d', build_dir, job_src]
logger.info('Compiling scala source: %s', subprocess.list2cmdline(arglist))
subprocess.check_call(arglist)
# build job jar file
arglist = ['jar', 'cf', job_jar, '-C', build_dir, '.']
logger.info('Building job jar: %s', subprocess.list2cmdline(arglist))
subprocess.check_call(arglist)
return job_jar
[docs] def run_job(self, job, tracking_url_callback=None):
if tracking_url_callback is not None:
warnings.warn("tracking_url_callback argument is deprecated, task.set_tracking_url is "
"used instead.", DeprecationWarning)
job_jar = self.build_job_jar(job)
jars = [job_jar] + self.get_libjars() + job.extra_jars()
scalding_core = self.get_scalding_core()
libjars = ','.join(filter(None, jars))
arglist = luigi.contrib.hdfs.load_hadoop_cmd() + ['jar', scalding_core, '-libjars', libjars]
arglist += ['-D%s' % c for c in job.jobconfs()]
job_class = job.job_class() or self.get_job_class(job.source())
arglist += [job_class, '--hdfs']
# scalding does not parse argument with '=' properly
arglist += ['--name', job.task_id.replace('=', ':')]
(tmp_files, job_args) = luigi.contrib.hadoop_jar.fix_paths(job)
arglist += job_args
env = os.environ.copy()
jars.append(scalding_core)
hadoop_cp = ':'.join(filter(None, jars))
env['HADOOP_CLASSPATH'] = hadoop_cp
logger.info("Submitting Hadoop job: HADOOP_CLASSPATH=%s %s",
hadoop_cp, subprocess.list2cmdline(arglist))
luigi.contrib.hadoop.run_and_track_hadoop_job(arglist, job.set_tracking_url, env=env)
for a, b in tmp_files:
a.move(b)
[docs]class ScaldingJobTask(luigi.contrib.hadoop.BaseHadoopJobTask):
"""
A job task for Scalding that define a scala source and (optional) main method.
requires() should return a dictionary where the keys are Scalding argument
names and values are sub tasks or lists of subtasks.
For example:
.. code-block:: python
{'input1': A, 'input2': C} => --input1 <Aoutput> --input2 <Coutput>
{'input1': [A, B], 'input2': [C]} => --input1 <Aoutput> <Boutput> --input2 <Coutput>
"""
[docs] def relpath(self, current_file, rel_path):
"""
Compute path given current file and relative path.
"""
script_dir = os.path.dirname(os.path.abspath(current_file))
rel_path = os.path.abspath(os.path.join(script_dir, rel_path))
return rel_path
[docs] def source(self):
"""
Path to the scala source for this Scalding Job
Either one of source() or jar() must be specified.
"""
return None
[docs] def jar(self):
"""
Path to the jar file for this Scalding Job
Either one of source() or jar() must be specified.
"""
return None
[docs] def job_class(self):
"""
optional main job class for this Scalding Job.
"""
return None
[docs] def job_runner(self):
return ScaldingJobRunner()
[docs] def atomic_output(self):
"""
If True, then rewrite output arguments to be temp locations and
atomically move them into place after the job finishes.
"""
return True
[docs] def requires(self):
return {}
[docs] def job_args(self):
"""
Extra arguments to pass to the Scalding job.
"""
return []
[docs] def args(self):
"""
Returns an array of args to pass to the job.
"""
arglist = []
for k, v in six.iteritems(self.requires_hadoop()):
arglist.append('--' + k)
arglist.extend([t.output().path for t in flatten(v)])
arglist.extend(['--output', self.output()])
arglist.extend(self.job_args())
return arglist