Source code for luigi.execution_summary

# -*- coding: utf-8 -*-
#
# Copyright 2015-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module provide the function :py:func:`summary` that is used for printing
an `execution summary
<https://github.com/spotify/luigi/blob/master/examples/execution_summary_example.py>`_
at the end of luigi invocations.
"""

import textwrap
import collections
import functools
import enum

import luigi


[docs] class execution_summary(luigi.Config): summary_length = luigi.IntParameter(default=5)
[docs] class LuigiStatusCode(enum.Enum): """ All possible status codes for the attribute ``status`` in :class:`~luigi.execution_summary.LuigiRunResult` when the argument ``detailed_summary=True`` in *luigi.run() / luigi.build*. Here are the codes and what they mean: ============================= ========================================================== Status Code Name Meaning ============================= ========================================================== SUCCESS There were no failed tasks or missing dependencies SUCCESS_WITH_RETRY There were failed tasks but they all succeeded in a retry FAILED There were failed tasks FAILED_AND_SCHEDULING_FAILED There were failed tasks and tasks whose scheduling failed SCHEDULING_FAILED There were tasks whose scheduling failed NOT_RUN There were tasks that were not granted run permission by the scheduler MISSING_EXT There were missing external dependencies ============================= ========================================================== """ SUCCESS = (":)", "there were no failed tasks or missing dependencies") SUCCESS_WITH_RETRY = (":)", "there were failed tasks but they all succeeded in a retry") FAILED = (":(", "there were failed tasks") FAILED_AND_SCHEDULING_FAILED = (":(", "there were failed tasks and tasks whose scheduling failed") SCHEDULING_FAILED = (":(", "there were tasks whose scheduling failed") NOT_RUN = (":|", "there were tasks that were not granted run permission by the scheduler") MISSING_EXT = (":|", "there were missing external dependencies")
[docs] class LuigiRunResult: """ The result of a call to build/run when passing the detailed_summary=True argument. Attributes: - one_line_summary (str): One line summary of the progress. - summary_text (str): Detailed summary of the progress. - status (LuigiStatusCode): Luigi Status Code. See :class:`~luigi.execution_summary.LuigiStatusCode` for what these codes mean. - worker (luigi.worker.worker): Worker object. See :class:`~luigi.worker.worker`. - scheduling_succeeded (bool): Boolean which is *True* if all the tasks were scheduled without errors. """ def __init__(self, worker, worker_add_run_status=True): self.worker = worker summary_dict = _summary_dict(worker) self.summary_text = _summary_wrap(_summary_format(summary_dict, worker)) self.status = _tasks_status(summary_dict) self.one_line_summary = _create_one_line_summary(self.status) self.scheduling_succeeded = worker_add_run_status def __str__(self): return "LuigiRunResult with status {0}".format(self.status) def __repr__(self): return "LuigiRunResult(status={0!r},worker={1!r},scheduling_succeeded={2!r})".format(self.status, self.worker, self.scheduling_succeeded)
def _partition_tasks(worker): """ Takes a worker and sorts out tasks based on their status. Still_pending_not_ext is only used to get upstream_failure, upstream_missing_dependency and run_by_other_worker """ task_history = worker._add_task_history pending_tasks = {task for (task, status, ext) in task_history if status == 'PENDING'} set_tasks = {} set_tasks["completed"] = {task for (task, status, ext) in task_history if status == 'DONE' and task in pending_tasks} set_tasks["already_done"] = {task for (task, status, ext) in task_history if status == 'DONE' and task not in pending_tasks and task not in set_tasks["completed"]} set_tasks["ever_failed"] = {task for (task, status, ext) in task_history if status == 'FAILED'} set_tasks["failed"] = set_tasks["ever_failed"] - set_tasks["completed"] set_tasks["scheduling_error"] = {task for (task, status, ext) in task_history if status == 'UNKNOWN'} set_tasks["still_pending_ext"] = {task for (task, status, ext) in task_history if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and not ext} set_tasks["still_pending_not_ext"] = {task for (task, status, ext) in task_history if status == 'PENDING' and task not in set_tasks["ever_failed"] and task not in set_tasks["completed"] and ext} set_tasks["run_by_other_worker"] = set() set_tasks["upstream_failure"] = set() set_tasks["upstream_missing_dependency"] = set() set_tasks["upstream_run_by_other_worker"] = set() set_tasks["upstream_scheduling_error"] = set() set_tasks["not_run"] = set() return set_tasks def _root_task(worker): """ Return the first task scheduled by the worker, corresponding to the root task """ return worker._add_task_history[0][0] def _populate_unknown_statuses(set_tasks): """ Add the "upstream_*" and "not_run" statuses my mutating set_tasks. """ visited = set() for task in set_tasks["still_pending_not_ext"]: _depth_first_search(set_tasks, task, visited) def _depth_first_search(set_tasks, current_task, visited): """ This dfs checks why tasks are still pending. """ visited.add(current_task) if current_task in set_tasks["still_pending_not_ext"]: upstream_failure = False upstream_missing_dependency = False upstream_run_by_other_worker = False upstream_scheduling_error = False for task in current_task._requires(): if task not in visited: _depth_first_search(set_tasks, task, visited) if task in set_tasks["ever_failed"] or task in set_tasks["upstream_failure"]: set_tasks["upstream_failure"].add(current_task) upstream_failure = True if task in set_tasks["still_pending_ext"] or task in set_tasks["upstream_missing_dependency"]: set_tasks["upstream_missing_dependency"].add(current_task) upstream_missing_dependency = True if task in set_tasks["run_by_other_worker"] or task in set_tasks["upstream_run_by_other_worker"]: set_tasks["upstream_run_by_other_worker"].add(current_task) upstream_run_by_other_worker = True if task in set_tasks["scheduling_error"]: set_tasks["upstream_scheduling_error"].add(current_task) upstream_scheduling_error = True if not upstream_failure and not upstream_missing_dependency and \ not upstream_run_by_other_worker and not upstream_scheduling_error and \ current_task not in set_tasks["run_by_other_worker"]: set_tasks["not_run"].add(current_task) def _get_str(task_dict, extra_indent): """ This returns a string for each status """ summary_length = execution_summary().summary_length lines = [] task_names = sorted(task_dict.keys()) for task_family in task_names: tasks = task_dict[task_family] tasks = sorted(tasks, key=lambda x: str(x)) prefix_size = 8 if extra_indent else 4 prefix = ' ' * prefix_size line = None if summary_length > 0 and len(lines) >= summary_length: line = prefix + "..." lines.append(line) break if len(tasks[0].get_params()) == 0: line = prefix + '- {0} {1}()'.format(len(tasks), str(task_family)) elif _get_len_of_params(tasks[0]) > 60 or len(str(tasks[0])) > 200 or \ (len(tasks) == 2 and len(tasks[0].get_params()) > 1 and (_get_len_of_params(tasks[0]) > 40 or len(str(tasks[0])) > 100)): """ This is to make sure that there is no really long task in the output """ line = prefix + '- {0} {1}(...)'.format(len(tasks), task_family) elif len((tasks[0].get_params())) == 1: attributes = {getattr(task, tasks[0].get_params()[0][0]) for task in tasks} param_class = tasks[0].get_params()[0][1] first, last = _ranging_attributes(attributes, param_class) if first is not None and last is not None and len(attributes) > 3: param_str = '{0}...{1}'.format(param_class.serialize(first), param_class.serialize(last)) else: param_str = '{0}'.format(_get_str_one_parameter(tasks)) line = prefix + '- {0} {1}({2}={3})'.format(len(tasks), task_family, tasks[0].get_params()[0][0], param_str) else: ranging = False params = _get_set_of_params(tasks) unique_param_keys = list(_get_unique_param_keys(params)) if len(unique_param_keys) == 1: unique_param, = unique_param_keys attributes = params[unique_param] param_class = unique_param[1] first, last = _ranging_attributes(attributes, param_class) if first is not None and last is not None and len(attributes) > 2: ranging = True line = prefix + '- {0} {1}({2}'.format(len(tasks), task_family, _get_str_ranging_multiple_parameters(first, last, tasks, unique_param)) if not ranging: if len(tasks) == 1: line = prefix + '- {0} {1}'.format(len(tasks), tasks[0]) if len(tasks) == 2: line = prefix + '- {0} {1} and {2}'.format(len(tasks), tasks[0], tasks[1]) if len(tasks) > 2: line = prefix + '- {0} {1} ...'.format(len(tasks), tasks[0]) lines.append(line) return '\n'.join(lines) def _get_len_of_params(task): return sum(len(param[0]) for param in task.get_params()) def _get_str_ranging_multiple_parameters(first, last, tasks, unique_param): row = '' str_unique_param = '{0}...{1}'.format(unique_param[1].serialize(first), unique_param[1].serialize(last)) for param in tasks[0].get_params(): row += '{0}='.format(param[0]) if param[0] == unique_param[0]: row += '{0}'.format(str_unique_param) else: row += '{0}'.format(param[1].serialize(getattr(tasks[0], param[0]))) if param != tasks[0].get_params()[-1]: row += ", " row += ')' return row def _get_set_of_params(tasks): params = {} for param in tasks[0].get_params(): params[param] = {getattr(task, param[0]) for task in tasks} return params def _get_unique_param_keys(params): for param_key, param_values in params.items(): if len(param_values) > 1: yield param_key def _ranging_attributes(attributes, param_class): """ Checks if there is a continuous range """ next_attributes = {param_class.next_in_enumeration(attribute) for attribute in attributes} in_first = attributes.difference(next_attributes) in_second = next_attributes.difference(attributes) if len(in_first) == 1 and len(in_second) == 1: for x in attributes: if {param_class.next_in_enumeration(x)} == in_second: return next(iter(in_first)), x return None, None def _get_str_one_parameter(tasks): row = '' count = 0 for task in tasks: if (len(row) >= 30 and count > 2 and count != len(tasks) - 1) or len(row) > 200: row += '...' break param = task.get_params()[0] row += '{0}'.format(param[1].serialize(getattr(task, param[0]))) if count < len(tasks) - 1: row += ',' count += 1 return row def _serialize_first_param(task): return task.get_params()[0][1].serialize(getattr(task, task.get_params()[0][0])) def _get_number_of_tasks_for(status, group_tasks): if status == "still_pending": return (_get_number_of_tasks(group_tasks["still_pending_ext"]) + _get_number_of_tasks(group_tasks["still_pending_not_ext"])) return _get_number_of_tasks(group_tasks[status]) def _get_number_of_tasks(task_dict): return sum(len(tasks) for tasks in task_dict.values()) def _get_comments(group_tasks): """ Get the human readable comments and quantities for the task types. """ comments = {} for status, human in _COMMENTS: num_tasks = _get_number_of_tasks_for(status, group_tasks) if num_tasks: space = " " if status in _PENDING_SUB_STATUSES else "" comments[status] = '{space}* {num_tasks} {human}:\n'.format( space=space, num_tasks=num_tasks, human=human) return comments # Oredered in the sense that they'll be printed in this order _ORDERED_STATUSES = ( "already_done", "completed", "ever_failed", "failed", "scheduling_error", "still_pending", "still_pending_ext", "run_by_other_worker", "upstream_failure", "upstream_missing_dependency", "upstream_run_by_other_worker", "upstream_scheduling_error", "not_run", ) _PENDING_SUB_STATUSES = set(_ORDERED_STATUSES[_ORDERED_STATUSES.index("still_pending_ext"):]) _COMMENTS = { ("already_done", 'complete ones were encountered'), ("completed", 'ran successfully'), ("failed", 'failed'), ("scheduling_error", 'failed scheduling'), ("still_pending", 'were left pending, among these'), ("still_pending_ext", 'were missing external dependencies'), ("run_by_other_worker", 'were being run by another worker'), ("upstream_failure", 'had failed dependencies'), ("upstream_missing_dependency", 'had missing dependencies'), ("upstream_run_by_other_worker", 'had dependencies that were being run by other worker'), ("upstream_scheduling_error", 'had dependencies whose scheduling failed'), ("not_run", 'was not granted run permission by the scheduler'), } def _get_run_by_other_worker(worker): """ This returns a set of the tasks that are being run by other worker """ task_sets = _get_external_workers(worker).values() return functools.reduce(lambda a, b: a | b, task_sets, set()) def _get_external_workers(worker): """ This returns a dict with a set of tasks for all of the other workers """ worker_that_blocked_task = collections.defaultdict(set) get_work_response_history = worker._get_work_response_history for get_work_response in get_work_response_history: if get_work_response['task_id'] is None: for running_task in get_work_response['running_tasks']: other_worker_id = running_task['worker'] other_task_id = running_task['task_id'] other_task = worker._scheduled_tasks.get(other_task_id) if other_worker_id == worker._id or not other_task: continue worker_that_blocked_task[other_worker_id].add(other_task) return worker_that_blocked_task def _group_tasks_by_name_and_status(task_dict): """ Takes a dictionary with sets of tasks grouped by their status and returns a dictionary with dictionaries with an array of tasks grouped by their status and task name """ group_status = {} for task in task_dict: if task.task_family not in group_status: group_status[task.task_family] = [] group_status[task.task_family].append(task) return group_status def _summary_dict(worker): set_tasks = _partition_tasks(worker) set_tasks["run_by_other_worker"] = _get_run_by_other_worker(worker) _populate_unknown_statuses(set_tasks) return set_tasks def _summary_format(set_tasks, worker): group_tasks = {} for status, task_dict in set_tasks.items(): group_tasks[status] = _group_tasks_by_name_and_status(task_dict) comments = _get_comments(group_tasks) num_all_tasks = sum([len(set_tasks["already_done"]), len(set_tasks["completed"]), len(set_tasks["failed"]), len(set_tasks["scheduling_error"]), len(set_tasks["still_pending_ext"]), len(set_tasks["still_pending_not_ext"])]) str_output = '' str_output += 'Scheduled {0} tasks of which:\n'.format(num_all_tasks) for status in _ORDERED_STATUSES: if status not in comments: continue str_output += '{0}'.format(comments[status]) if status != 'still_pending': str_output += '{0}\n'.format(_get_str(group_tasks[status], status in _PENDING_SUB_STATUSES)) ext_workers = _get_external_workers(worker) group_tasks_ext_workers = {} for ext_worker, task_dict in ext_workers.items(): group_tasks_ext_workers[ext_worker] = _group_tasks_by_name_and_status(task_dict) if len(ext_workers) > 0: str_output += "\nThe other workers were:\n" count = 0 for ext_worker, task_dict in ext_workers.items(): if count > 3 and count < len(ext_workers) - 1: str_output += " and {0} other workers".format(len(ext_workers) - count) break str_output += " - {0} ran {1} tasks\n".format(ext_worker, len(task_dict)) count += 1 str_output += '\n' if num_all_tasks == sum([len(set_tasks["already_done"]), len(set_tasks["scheduling_error"]), len(set_tasks["still_pending_ext"]), len(set_tasks["still_pending_not_ext"])]): if len(ext_workers) == 0: str_output += '\n' str_output += 'Did not run any tasks' one_line_summary = _create_one_line_summary(_tasks_status(set_tasks)) str_output += "\n{0}".format(one_line_summary) if num_all_tasks == 0: str_output = 'Did not schedule any tasks' return str_output def _create_one_line_summary(status_code): """ Given a status_code of type LuigiStatusCode which has a tuple value, returns a one line summary """ return "This progress looks {0} because {1}".format(*status_code.value) def _tasks_status(set_tasks): """ Given a grouped set of tasks, returns a LuigiStatusCode """ if set_tasks["ever_failed"]: if not set_tasks["failed"]: return LuigiStatusCode.SUCCESS_WITH_RETRY else: if set_tasks["scheduling_error"]: return LuigiStatusCode.FAILED_AND_SCHEDULING_FAILED return LuigiStatusCode.FAILED elif set_tasks["scheduling_error"]: return LuigiStatusCode.SCHEDULING_FAILED elif set_tasks["not_run"]: return LuigiStatusCode.NOT_RUN elif set_tasks["still_pending_ext"]: return LuigiStatusCode.MISSING_EXT else: return LuigiStatusCode.SUCCESS def _summary_wrap(str_output): return textwrap.dedent(""" ===== Luigi Execution Summary ===== {str_output} ===== Luigi Execution Summary ===== """).format(str_output=str_output)
[docs] def summary(worker): """ Given a worker, return a human readable summary of what the worker have done. """ return _summary_wrap(_summary_format(_summary_dict(worker), worker))
# 5