Source code for luigi.contrib.spark

# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import collections
import importlib
import inspect
import logging
import os
import pickle
import re
import shutil
import sys
import tarfile
import tempfile

from luigi import configuration
from luigi.contrib.external_program import ExternalProgramTask

logger = logging.getLogger("luigi-interface")


[docs] class SparkSubmitTask(ExternalProgramTask): """ Template task for running a Spark job Supports running jobs on Spark local, standalone, Mesos or Yarn See http://spark.apache.org/docs/latest/submitting-applications.html for more information """ # Application (.jar or .py file) name = None entry_class = None app = None # Only log stderr if spark fails (since stderr is normally quite verbose) always_log_stderr = False # Spark applications write its logs into stderr stream_for_searching_tracking_url = "stderr" @property def tracking_url_pattern(self): if self.deploy_mode == "cluster": # in cluster mode client only receives application status once a period of time return r"tracking URL: (https?://.*)\s" else: return r"Bound (?:.*) to (?:.*), and started at (https?://.*)\s"
[docs] def app_options(self): """ Subclass this method to map your task parameters to the app's arguments """ return []
@property def pyspark_python(self): return None @property def pyspark_driver_python(self): return None @property def hadoop_user_name(self): return None @property def spark_version(self): return "spark" @property def spark_submit(self): return configuration.get_config().get(self.spark_version, "spark-submit", "spark-submit") @property def master(self): return configuration.get_config().get(self.spark_version, "master", None) @property def deploy_mode(self): return configuration.get_config().get(self.spark_version, "deploy-mode", None) @property def jars(self): return self._list_config(configuration.get_config().get(self.spark_version, "jars", None)) @property def packages(self): return self._list_config(configuration.get_config().get(self.spark_version, "packages", None)) @property def py_files(self): return self._list_config(configuration.get_config().get(self.spark_version, "py-files", None)) @property def files(self): return self._list_config(configuration.get_config().get(self.spark_version, "files", None)) @property def _conf(self): conf = collections.OrderedDict(self.conf or {}) if self.pyspark_python: conf["spark.pyspark.python"] = self.pyspark_python if self.pyspark_driver_python: conf["spark.pyspark.driver.python"] = self.pyspark_driver_python return conf @property def conf(self): return self._dict_config(configuration.get_config().get(self.spark_version, "conf", None)) @property def properties_file(self): return configuration.get_config().get(self.spark_version, "properties-file", None) @property def driver_memory(self): return configuration.get_config().get(self.spark_version, "driver-memory", None) @property def driver_java_options(self): return configuration.get_config().get(self.spark_version, "driver-java-options", None) @property def driver_library_path(self): return configuration.get_config().get(self.spark_version, "driver-library-path", None) @property def driver_class_path(self): return configuration.get_config().get(self.spark_version, "driver-class-path", None) @property def executor_memory(self): return configuration.get_config().get(self.spark_version, "executor-memory", None) @property def driver_cores(self): return configuration.get_config().get(self.spark_version, "driver-cores", None) @property def supervise(self): return bool(configuration.get_config().get(self.spark_version, "supervise", False)) @property def total_executor_cores(self): return configuration.get_config().get(self.spark_version, "total-executor-cores", None) @property def executor_cores(self): return configuration.get_config().get(self.spark_version, "executor-cores", None) @property def queue(self): return configuration.get_config().get(self.spark_version, "queue", None) @property def num_executors(self): return configuration.get_config().get(self.spark_version, "num-executors", None) @property def archives(self): return self._list_config(configuration.get_config().get(self.spark_version, "archives", None)) @property def hadoop_conf_dir(self): return configuration.get_config().get(self.spark_version, "hadoop-conf-dir", None)
[docs] def get_environment(self): env = os.environ.copy() for prop in ("HADOOP_CONF_DIR", "HADOOP_USER_NAME"): var = getattr(self, prop.lower(), None) if var: env[prop] = var return env
[docs] def program_environment(self): return self.get_environment()
[docs] def program_args(self): return self.spark_command() + self.app_command()
[docs] def spark_command(self): command = [self.spark_submit] command += self._text_arg("--master", self.master) command += self._text_arg("--deploy-mode", self.deploy_mode) command += self._text_arg("--name", self.name) command += self._text_arg("--class", self.entry_class) command += self._list_arg("--jars", self.jars) command += self._list_arg("--packages", self.packages) command += self._list_arg("--py-files", self.py_files) command += self._list_arg("--files", self.files) command += self._list_arg("--archives", self.archives) command += self._dict_arg("--conf", self._conf) command += self._text_arg("--properties-file", self.properties_file) command += self._text_arg("--driver-memory", self.driver_memory) command += self._text_arg("--driver-java-options", self.driver_java_options) command += self._text_arg("--driver-library-path", self.driver_library_path) command += self._text_arg("--driver-class-path", self.driver_class_path) command += self._text_arg("--executor-memory", self.executor_memory) command += self._text_arg("--driver-cores", self.driver_cores) command += self._flag_arg("--supervise", self.supervise) command += self._text_arg("--total-executor-cores", self.total_executor_cores) command += self._text_arg("--executor-cores", self.executor_cores) command += self._text_arg("--queue", self.queue) command += self._text_arg("--num-executors", self.num_executors) return command
[docs] def app_command(self): if not self.app: raise NotImplementedError("subclass should define an app (.jar or .py file)") return [self.app] + self.app_options()
def _list_config(self, config): if config and isinstance(config, str): return list(map(lambda x: x.strip(), config.split(","))) def _dict_config(self, config): if config and isinstance(config, str): return dict(map(lambda i: i.split("=", 1), config.split("|"))) def _text_arg(self, name, value): if value: return [name, value] return [] def _list_arg(self, name, value): if value and isinstance(value, (list, tuple)): return [name, ",".join(value)] return [] def _dict_arg(self, name, value): command = [] if value and isinstance(value, dict): for prop, value in value.items(): command += [name, "{0}={1}".format(prop, value)] return command def _flag_arg(self, name, value): if value: return [name] return []
[docs] class PySparkTask(SparkSubmitTask): """ Template task for running an inline PySpark job Simply implement the ``main`` method in your subclass You can optionally define package names to be distributed to the cluster with ``py_packages`` (uses luigi's global py-packages configuration by default) """ # Path to the pyspark program passed to spark-submit app = os.path.join(os.path.dirname(__file__), "pyspark_runner.py") @property def name(self): return self.__class__.__name__ @property def py_packages(self): packages = configuration.get_config().get("spark", "py-packages", None) if packages: return map(lambda s: s.strip(), packages.split(",")) @property def files(self): if self.deploy_mode == "cluster": return [self.run_pickle] @property def pickle_protocol(self): return configuration.get_config().getint("spark", "pickle-protocol", pickle.DEFAULT_PROTOCOL)
[docs] def setup(self, conf): """ Called by the pyspark_runner with a SparkConf instance that will be used to instantiate the SparkContext :param conf: SparkConf """
[docs] def setup_remote(self, sc): self._setup_packages(sc)
[docs] def main(self, sc, *args): """ Called by the pyspark_runner with a SparkContext and any arguments returned by ``app_options()`` :param sc: SparkContext :param args: arguments list """ raise NotImplementedError("subclass should define a main method")
[docs] def app_command(self): if self.deploy_mode == "cluster": pickle_loc = os.path.basename(self.run_pickle) else: pickle_loc = self.run_pickle return [self.app, pickle_loc] + self.app_options()
[docs] def run(self): path_name_fragment = re.sub(r"[^\w]", "_", self.name) self.run_path = tempfile.mkdtemp(prefix=path_name_fragment) self.run_pickle = os.path.join(self.run_path, ".".join([path_name_fragment, "pickle"])) with open(self.run_pickle, "wb") as fd: # Copy module file to run path. module_path = os.path.abspath(inspect.getfile(self.__class__)) shutil.copy(module_path, os.path.join(self.run_path, ".")) self._dump(fd) try: super(PySparkTask, self).run() finally: shutil.rmtree(self.run_path)
def _dump(self, fd): with self.no_unpicklable_properties(): if self.__module__ == "__main__": d = pickle.dumps(self, protocol=self.pickle_protocol) module_name = os.path.basename(sys.argv[0]).rsplit(".", 1)[0] d = d.replace(b"c__main__", b"c" + module_name.encode("ascii")) fd.write(d) else: pickle.dump(self, fd, protocol=self.pickle_protocol) def _setup_packages(self, sc): """ This method compresses and uploads packages to the cluster """ packages = self.py_packages if not packages: return for package in packages: mod = importlib.import_module(package) try: mod_path = mod.__path__[0] except AttributeError: mod_path = mod.__file__ os.makedirs(self.run_path, exist_ok=True) tar_path = os.path.join(self.run_path, package + ".tar.gz") tar = tarfile.open(tar_path, "w:gz") tar.add(mod_path, os.path.basename(mod_path)) tar.close() sc.addPyFile(tar_path)