Source code for luigi.retcodes

# -*- coding: utf-8 -*-
#
# Copyright 2015-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Module containing the logic for exit codes for the luigi binary. It's useful
when you in a programmatic way need to know if luigi actually finished the
given task, and if not why.
"""

import luigi
import sys
import logging
from luigi import IntParameter
from luigi.setup_logging import InterfaceLogging


[docs] class retcode(luigi.Config): """ See the :ref:`return codes configuration section <retcode-config>`. """ # default value inconsistent with doc/configuration.rst for backwards compatibility reasons unhandled_exception = IntParameter(default=4, description='For internal luigi errors.', ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons missing_data = IntParameter(default=0, description="For when there are incomplete ExternalTask dependencies.", ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons task_failed = IntParameter(default=0, description='''For when a task's run() method fails.''', ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons already_running = IntParameter(default=0, description='For both local --lock and luigid "lock"', ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons scheduling_error = IntParameter(default=0, description='''For when a task's complete() or requires() fails, or task-limit reached''' ) # default value inconsistent with doc/configuration.rst for backwards compatibility reasons not_run = IntParameter(default=0, description="For when a task is not granted run permission by the scheduler." )
[docs] def run_with_retcodes(argv): """ Run luigi with command line parsing, but raise ``SystemExit`` with the configured exit code. Note: Usually you use the luigi binary directly and don't call this function yourself. :param argv: Should (conceptually) be ``sys.argv[1:]`` """ logger = logging.getLogger('luigi-interface') with luigi.cmdline_parser.CmdlineParser.global_instance(argv): retcodes = retcode() worker = None try: worker = luigi.interface._run(argv).worker except luigi.interface.PidLockAlreadyTakenExit: sys.exit(retcodes.already_running) except Exception: # Some errors occur before logging is set up, we set it up now env_params = luigi.interface.core() InterfaceLogging.setup(env_params) logger.exception("Uncaught exception in luigi") sys.exit(retcodes.unhandled_exception) with luigi.cmdline_parser.CmdlineParser.global_instance(argv): task_sets = luigi.execution_summary._summary_dict(worker) root_task = luigi.execution_summary._root_task(worker) non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys() def has(status): assert status in luigi.execution_summary._ORDERED_STATUSES return status in non_empty_categories codes_and_conds = ( (retcodes.missing_data, has('still_pending_ext')), (retcodes.task_failed, has('failed')), (retcodes.already_running, has('run_by_other_worker')), (retcodes.scheduling_error, has('scheduling_error')), (retcodes.not_run, has('not_run')), ) expected_ret_code = max(code * (1 if cond else 0) for code, cond in codes_and_conds) if expected_ret_code == 0 and \ root_task not in task_sets["completed"] and \ root_task not in task_sets["already_done"]: sys.exit(retcodes.not_run) else: sys.exit(expected_ret_code)