Source code for luigi.contrib.datadog_metric

import logging

from luigi import parameter
from luigi.metrics import MetricsCollector
from luigi.task import Config

logger = logging.getLogger("luigi-interface")

try:
    from datadog import api, initialize, statsd
except ImportError:
    logger.warning("Loading datadog module without datadog installed. Will crash at runtime if datadog functionality is used.")


[docs] class datadog(Config): api_key = parameter.Parameter(default="dummy_api_key", description="API key provided by Datadog") app_key = parameter.Parameter(default="dummy_app_key", description="APP key provided by Datadog") default_tags = parameter.Parameter(default="application:luigi", description="Default tags for every events and metrics sent to Datadog") environment = parameter.Parameter(default="development", description="Environment of which the pipeline is ran from (eg: 'production', 'staging', ...") metric_namespace = parameter.Parameter(default="luigi", description="Default namespace for events and metrics (eg: 'luigi' for 'luigi.task.started')") statsd_host = parameter.Parameter(default="localhost", description="StatsD host implementing the Datadog service") statsd_port = parameter.IntParameter(default=8125, description="StatsD port implementing the Datadog service")
[docs] class DatadogMetricsCollector(MetricsCollector): def __init__(self, *args, **kwargs): self._config = datadog(**kwargs) initialize(api_key=self._config.api_key, app_key=self._config.app_key, statsd_host=self._config.statsd_host, statsd_port=self._config.statsd_port)
[docs] def handle_task_started(self, task): title = "Luigi: A task has been started!" text = "A task has been started in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) self._send_increment("task.started", tags=tags) event_tags = tags + ["task_state:STARTED"] self._send_event(title=title, text=text, tags=event_tags, alert_type="info", priority="low")
[docs] def handle_task_failed(self, task): title = "Luigi: A task has failed!" text = "A task has failed in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) self._send_increment("task.failed", tags=tags) event_tags = tags + ["task_state:FAILED"] self._send_event(title=title, text=text, tags=event_tags, alert_type="error", priority="normal")
[docs] def handle_task_disabled(self, task, config): title = "Luigi: A task has been disabled!" lines = ["A task has been disabled in the pipeline named: {name}."] lines.append("The task has failed {failures} times in the last {window}") lines.append("seconds, so it is being disabled for {persist} seconds.") preformated_text = " ".join(lines) text = preformated_text.format(name=task.family, persist=config.disable_persist, failures=config.retry_count, window=config.disable_window) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) self._send_increment("task.disabled", tags=tags) event_tags = tags + ["task_state:DISABLED"] self._send_event(title=title, text=text, tags=event_tags, alert_type="error", priority="normal")
[docs] def handle_task_done(self, task): # The task is already done -- Let's not re-create an event if task.time_running is None: return title = "Luigi: A task has been completed!" text = "A task has completed in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) time_elapse = task.updated - task.time_running self._send_increment("task.done", tags=tags) self._send_gauge("task.execution_time", time_elapse, tags=tags) event_tags = tags + ["task_state:DONE"] self._send_event(title=title, text=text, tags=event_tags, alert_type="info", priority="low")
def _send_event(self, **params): params["tags"] += self.default_tags api.Event.create(**params) def _send_gauge(self, metric_name, value, tags=[]): all_tags = tags + self.default_tags namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, metric_name=metric_name) statsd.gauge(namespaced_metric, value, tags=all_tags) def _send_increment(self, metric_name, value=1, tags=[]): all_tags = tags + self.default_tags namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, metric_name=metric_name) statsd.increment(namespaced_metric, value, tags=all_tags) def _format_task_params_to_tags(self, task): params = [] for key, value in task.params.items(): params.append("{key}:{value}".format(key=key, value=value)) return params @property def default_tags(self): default_tags = [] env_tag = "environment:{environment}".format(environment=self._config.environment) default_tags.append(env_tag) if self._config.default_tags: default_tags = default_tags + str.split(self._config.default_tags, ",") return default_tags