Source code for luigi.metrics

import abc
import importlib

from enum import Enum


[docs] class MetricsCollectors(Enum): custom = -1 default = 1 none = 1 datadog = 2 prometheus = 3
[docs] @classmethod def get(cls, which, custom_import=None): if which == MetricsCollectors.none: return NoMetricsCollector() elif which == MetricsCollectors.datadog: from luigi.contrib.datadog_metric import DatadogMetricsCollector return DatadogMetricsCollector() elif which == MetricsCollectors.prometheus: from luigi.contrib.prometheus_metric import PrometheusMetricsCollector return PrometheusMetricsCollector() elif which == MetricsCollectors.custom: if custom_import is None: raise ValueError(f"MetricsCollectors value ' {which} ' is -1 and custom_import is None") split_import_string = custom_import.split(".") import_path = ".".join(split_import_string[:-1]) import_class_string = split_import_string[-1] mod = importlib.import_module(import_path) metrics_class = getattr(mod, import_class_string) if issubclass(metrics_class, MetricsCollector): return metrics_class() else: raise ValueError(f"Custom Import: {custom_import} is not a subclass of MetricsCollector") else: raise ValueError("MetricsCollectors value ' {0} ' isn't supported", which)
[docs] class MetricsCollector(metaclass=abc.ABCMeta): """Abstractable MetricsCollector base class that can be replace by tool specific implementation. """ @abc.abstractmethod def __init__(self): pass
[docs] @abc.abstractmethod def handle_task_started(self, task): pass
[docs] @abc.abstractmethod def handle_task_failed(self, task): pass
[docs] @abc.abstractmethod def handle_task_disabled(self, task, config): pass
[docs] @abc.abstractmethod def handle_task_done(self, task): pass
[docs] def generate_latest(self): return
[docs] def configure_http_handler(self, http_handler): pass
[docs] class NoMetricsCollector(MetricsCollector): """Empty MetricsCollector when no collector is being used """ def __init__(self): pass
[docs] def handle_task_started(self, task): pass
[docs] def handle_task_failed(self, task): pass
[docs] def handle_task_disabled(self, task, config): pass
[docs] def handle_task_done(self, task): pass