Source code for luigi.contrib.datadog_metric

import logging

from luigi import parameter
from luigi.metrics import MetricsCollector
from luigi.task import Config

logger = logging.getLogger('luigi-interface')

try:
    from datadog import initialize, api, statsd
except ImportError:
    logger.warning("Loading datadog module without datadog installed. Will crash at runtime if datadog functionality is used.")


[docs]class datadog(Config): api_key = parameter.Parameter(default='dummy_api_key', description='API key provided by Datadog') app_key = parameter.Parameter(default='dummy_app_key', description='APP key provided by Datadog') default_tags = parameter.Parameter(default='application:luigi', description='Default tags for every events and metrics sent to Datadog') environment = parameter.Parameter(default='development', description="Environment of which the pipeline is ran from (eg: 'production', 'staging', ...") metric_namespace = parameter.Parameter(default='luigi', description="Default namespace for events and metrics (eg: 'luigi' for 'luigi.task.started')") statsd_host = parameter.Parameter(default='localhost', description='StatsD host implementing the Datadog service') statsd_port = parameter.IntParameter(default=8125, description='StatsD port implementing the Datadog service')
[docs]class DatadogMetricsCollector(MetricsCollector): def __init__(self, *args, **kwargs): self._config = datadog(**kwargs) initialize(api_key=self._config.api_key, app_key=self._config.app_key, statsd_host=self._config.statsd_host, statsd_port=self._config.statsd_port)
[docs] def handle_task_started(self, task): title = "Luigi: A task has been started!" text = "A task has been started in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) self._send_increment('task.started', tags=tags) event_tags = tags + ["task_state:STARTED"] self._send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')
[docs] def handle_task_failed(self, task): title = "Luigi: A task has failed!" text = "A task has failed in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) self._send_increment('task.failed', tags=tags) event_tags = tags + ["task_state:FAILED"] self._send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')
[docs] def handle_task_disabled(self, task, config): title = "Luigi: A task has been disabled!" lines = ['A task has been disabled in the pipeline named: {name}.'] lines.append('The task has failed {failures} times in the last {window}') lines.append('seconds, so it is being disabled for {persist} seconds.') preformated_text = ' '.join(lines) text = preformated_text.format(name=task.family, persist=config.disable_persist, failures=config.retry_count, window=config.disable_window) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) self._send_increment('task.disabled', tags=tags) event_tags = tags + ["task_state:DISABLED"] self._send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal')
[docs] def handle_task_done(self, task): # The task is already done -- Let's not re-create an event if task.time_running is None: return title = "Luigi: A task has been completed!" text = "A task has completed in the pipeline named: {name}".format(name=task.family) tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) time_elapse = task.updated - task.time_running self._send_increment('task.done', tags=tags) self._send_gauge('task.execution_time', time_elapse, tags=tags) event_tags = tags + ["task_state:DONE"] self._send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low')
def _send_event(self, **params): params['tags'] += self.default_tags api.Event.create(**params) def _send_gauge(self, metric_name, value, tags=[]): all_tags = tags + self.default_tags namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, metric_name=metric_name) statsd.gauge(namespaced_metric, value, tags=all_tags) def _send_increment(self, metric_name, value=1, tags=[]): all_tags = tags + self.default_tags namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, metric_name=metric_name) statsd.increment(namespaced_metric, value, tags=all_tags) def _format_task_params_to_tags(self, task): params = [] for key, value in task.params.items(): params.append("{key}:{value}".format(key=key, value=value)) return params @property def default_tags(self): default_tags = [] env_tag = "environment:{environment}".format(environment=self._config.environment) default_tags.append(env_tag) if self._config.default_tags: default_tags = default_tags + str.split(self._config.default_tags, ',') return default_tags