Source code for luigi.contrib.spark

# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import collections
import logging
import os
import re
import sys
import tempfile
import shutil
import importlib
import tarfile
import inspect

import pickle

from luigi.contrib.external_program import ExternalProgramTask
from luigi import configuration

logger = logging.getLogger('luigi-interface')


[docs] class SparkSubmitTask(ExternalProgramTask): """ Template task for running a Spark job Supports running jobs on Spark local, standalone, Mesos or Yarn See http://spark.apache.org/docs/latest/submitting-applications.html for more information """ # Application (.jar or .py file) name = None entry_class = None app = None # Only log stderr if spark fails (since stderr is normally quite verbose) always_log_stderr = False # Spark applications write its logs into stderr stream_for_searching_tracking_url = 'stderr' @property def tracking_url_pattern(self): if self.deploy_mode == "cluster": # in cluster mode client only receives application status once a period of time return r"tracking URL: (https?://.*)\s" else: return r"Bound (?:.*) to (?:.*), and started at (https?://.*)\s"
[docs] def app_options(self): """ Subclass this method to map your task parameters to the app's arguments """ return []
@property def pyspark_python(self): return None @property def pyspark_driver_python(self): return None @property def hadoop_user_name(self): return None @property def spark_version(self): return "spark" @property def spark_submit(self): return configuration.get_config().get(self.spark_version, 'spark-submit', 'spark-submit') @property def master(self): return configuration.get_config().get(self.spark_version, "master", None) @property def deploy_mode(self): return configuration.get_config().get(self.spark_version, "deploy-mode", None) @property def jars(self): return self._list_config(configuration.get_config().get(self.spark_version, "jars", None)) @property def packages(self): return self._list_config(configuration.get_config().get( self.spark_version, "packages", None)) @property def py_files(self): return self._list_config(configuration.get_config().get( self.spark_version, "py-files", None)) @property def files(self): return self._list_config(configuration.get_config().get(self.spark_version, "files", None)) @property def _conf(self): conf = collections.OrderedDict(self.conf or {}) if self.pyspark_python: conf['spark.pyspark.python'] = self.pyspark_python if self.pyspark_driver_python: conf['spark.pyspark.driver.python'] = self.pyspark_driver_python return conf @property def conf(self): return self._dict_config(configuration.get_config().get(self.spark_version, "conf", None)) @property def properties_file(self): return configuration.get_config().get(self.spark_version, "properties-file", None) @property def driver_memory(self): return configuration.get_config().get(self.spark_version, "driver-memory", None) @property def driver_java_options(self): return configuration.get_config().get(self.spark_version, "driver-java-options", None) @property def driver_library_path(self): return configuration.get_config().get(self.spark_version, "driver-library-path", None) @property def driver_class_path(self): return configuration.get_config().get(self.spark_version, "driver-class-path", None) @property def executor_memory(self): return configuration.get_config().get(self.spark_version, "executor-memory", None) @property def driver_cores(self): return configuration.get_config().get(self.spark_version, "driver-cores", None) @property def supervise(self): return bool(configuration.get_config().get(self.spark_version, "supervise", False)) @property def total_executor_cores(self): return configuration.get_config().get(self.spark_version, "total-executor-cores", None) @property def executor_cores(self): return configuration.get_config().get(self.spark_version, "executor-cores", None) @property def queue(self): return configuration.get_config().get(self.spark_version, "queue", None) @property def num_executors(self): return configuration.get_config().get(self.spark_version, "num-executors", None) @property def archives(self): return self._list_config(configuration.get_config().get( self.spark_version, "archives", None)) @property def hadoop_conf_dir(self): return configuration.get_config().get(self.spark_version, "hadoop-conf-dir", None)
[docs] def get_environment(self): env = os.environ.copy() for prop in ('HADOOP_CONF_DIR', 'HADOOP_USER_NAME'): var = getattr(self, prop.lower(), None) if var: env[prop] = var return env
[docs] def program_environment(self): return self.get_environment()
[docs] def program_args(self): return self.spark_command() + self.app_command()
[docs] def spark_command(self): command = [self.spark_submit] command += self._text_arg('--master', self.master) command += self._text_arg('--deploy-mode', self.deploy_mode) command += self._text_arg('--name', self.name) command += self._text_arg('--class', self.entry_class) command += self._list_arg('--jars', self.jars) command += self._list_arg('--packages', self.packages) command += self._list_arg('--py-files', self.py_files) command += self._list_arg('--files', self.files) command += self._list_arg('--archives', self.archives) command += self._dict_arg('--conf', self._conf) command += self._text_arg('--properties-file', self.properties_file) command += self._text_arg('--driver-memory', self.driver_memory) command += self._text_arg('--driver-java-options', self.driver_java_options) command += self._text_arg('--driver-library-path', self.driver_library_path) command += self._text_arg('--driver-class-path', self.driver_class_path) command += self._text_arg('--executor-memory', self.executor_memory) command += self._text_arg('--driver-cores', self.driver_cores) command += self._flag_arg('--supervise', self.supervise) command += self._text_arg('--total-executor-cores', self.total_executor_cores) command += self._text_arg('--executor-cores', self.executor_cores) command += self._text_arg('--queue', self.queue) command += self._text_arg('--num-executors', self.num_executors) return command
[docs] def app_command(self): if not self.app: raise NotImplementedError("subclass should define an app (.jar or .py file)") return [self.app] + self.app_options()
def _list_config(self, config): if config and isinstance(config, str): return list(map(lambda x: x.strip(), config.split(','))) def _dict_config(self, config): if config and isinstance(config, str): return dict(map(lambda i: i.split('=', 1), config.split('|'))) def _text_arg(self, name, value): if value: return [name, value] return [] def _list_arg(self, name, value): if value and isinstance(value, (list, tuple)): return [name, ','.join(value)] return [] def _dict_arg(self, name, value): command = [] if value and isinstance(value, dict): for prop, value in value.items(): command += [name, '{0}={1}'.format(prop, value)] return command def _flag_arg(self, name, value): if value: return [name] return []
[docs] class PySparkTask(SparkSubmitTask): """ Template task for running an inline PySpark job Simply implement the ``main`` method in your subclass You can optionally define package names to be distributed to the cluster with ``py_packages`` (uses luigi's global py-packages configuration by default) """ # Path to the pyspark program passed to spark-submit app = os.path.join(os.path.dirname(__file__), 'pyspark_runner.py') @property def name(self): return self.__class__.__name__ @property def py_packages(self): packages = configuration.get_config().get('spark', 'py-packages', None) if packages: return map(lambda s: s.strip(), packages.split(',')) @property def files(self): if self.deploy_mode == "cluster": return [self.run_pickle] @property def pickle_protocol(self): return configuration.get_config().getint('spark', 'pickle-protocol', pickle.DEFAULT_PROTOCOL)
[docs] def setup(self, conf): """ Called by the pyspark_runner with a SparkConf instance that will be used to instantiate the SparkContext :param conf: SparkConf """
[docs] def setup_remote(self, sc): self._setup_packages(sc)
[docs] def main(self, sc, *args): """ Called by the pyspark_runner with a SparkContext and any arguments returned by ``app_options()`` :param sc: SparkContext :param args: arguments list """ raise NotImplementedError("subclass should define a main method")
[docs] def app_command(self): if self.deploy_mode == "cluster": pickle_loc = os.path.basename(self.run_pickle) else: pickle_loc = self.run_pickle return [self.app, pickle_loc] + self.app_options()
[docs] def run(self): path_name_fragment = re.sub(r'[^\w]', '_', self.name) self.run_path = tempfile.mkdtemp(prefix=path_name_fragment) self.run_pickle = os.path.join(self.run_path, '.'.join([path_name_fragment, 'pickle'])) with open(self.run_pickle, 'wb') as fd: # Copy module file to run path. module_path = os.path.abspath(inspect.getfile(self.__class__)) shutil.copy(module_path, os.path.join(self.run_path, '.')) self._dump(fd) try: super(PySparkTask, self).run() finally: shutil.rmtree(self.run_path)
def _dump(self, fd): with self.no_unpicklable_properties(): if self.__module__ == '__main__': d = pickle.dumps(self, protocol=self.pickle_protocol) module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0] d = d.replace(b'c__main__', b'c' + module_name.encode('ascii')) fd.write(d) else: pickle.dump(self, fd, protocol=self.pickle_protocol) def _setup_packages(self, sc): """ This method compresses and uploads packages to the cluster """ packages = self.py_packages if not packages: return for package in packages: mod = importlib.import_module(package) try: mod_path = mod.__path__[0] except AttributeError: mod_path = mod.__file__ os.makedirs(self.run_path, exist_ok=True) tar_path = os.path.join(self.run_path, package + '.tar.gz') tar = tarfile.open(tar_path, "w:gz") tar.add(mod_path, os.path.basename(mod_path)) tar.close() sc.addPyFile(tar_path)