# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Provides a database backend to the central scheduler. This lets you see historical runs.
See :ref:`TaskHistory` for information about how to turn out the task history feature.
"""
#
# Description: Added codes for visualization of how long each task takes
# running-time until it reaches the next status (failed or done)
# At "{base_url}/tasklist", all completed(failed or done) tasks are shown.
# At "{base_url}/tasklist", a user can select one specific task to see
# how its running-time has changed over time.
# At "{base_url}/tasklist/{task_name}", it visualizes a multi-bar graph
# that represents the changes of the running-time for a selected task
# up to the next status (failed or done).
# This visualization let us know how the running-time of the specific task
# has changed over time.
#
# Copyright 2015 Naver Corp.
# Author Yeseul Park (yeseul.park@navercorp.com)
#
import datetime
import logging
from contextlib import contextmanager
import sqlalchemy
import sqlalchemy.ext.declarative
import sqlalchemy.orm
import sqlalchemy.orm.collections
from sqlalchemy import text
from sqlalchemy.engine import reflection
from luigi import configuration, task_history
from luigi.task_status import DONE, FAILED, PENDING, RUNNING
Base = sqlalchemy.ext.declarative.declarative_base()
logger = logging.getLogger("luigi-interface")
[docs]
class DbTaskHistory(task_history.TaskHistory):
"""
Task History that writes to a database using sqlalchemy.
Also has methods for useful db queries.
"""
CURRENT_SOURCE_VERSION = 1
@contextmanager
def _session(self, session=None):
if session:
yield session
else:
session = self.session_factory()
try:
yield session
except BaseException:
session.rollback()
raise
else:
session.commit()
def __init__(self):
config = configuration.get_config()
connection_string = config.get("task_history", "db_connection")
self.engine = sqlalchemy.create_engine(connection_string)
self.session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine, expire_on_commit=False)
Base.metadata.create_all(self.engine)
self.tasks = {} # task_id -> TaskRecord
_upgrade_schema(self.engine)
[docs]
def task_scheduled(self, task):
htask = self._get_task(task, status=PENDING)
self._add_task_event(htask, TaskEvent(event_name=PENDING, ts=datetime.datetime.now()))
[docs]
def task_finished(self, task, successful):
event_name = DONE if successful else FAILED
htask = self._get_task(task, status=event_name)
self._add_task_event(htask, TaskEvent(event_name=event_name, ts=datetime.datetime.now()))
[docs]
def task_started(self, task, worker_host):
htask = self._get_task(task, status=RUNNING, host=worker_host)
self._add_task_event(htask, TaskEvent(event_name=RUNNING, ts=datetime.datetime.now()))
def _get_task(self, task, status, host=None):
if task.id in self.tasks:
htask = self.tasks[task.id]
htask.status = status
if host:
htask.host = host
else:
htask = self.tasks[task.id] = task_history.StoredTask(task, status, host)
return htask
def _add_task_event(self, task, event):
for task_record, session in self._find_or_create_task(task):
task_record.events.append(event)
def _find_or_create_task(self, task):
with self._session() as session:
if task.record_id is not None:
logger.debug("Finding task with record_id [%d]", task.record_id)
task_record = session.query(TaskRecord).get(task.record_id)
if not task_record:
raise Exception("Task with record_id, but no matching Task record!")
yield (task_record, session)
else:
task_record = TaskRecord(task_id=task._task.id, name=task.task_family, host=task.host)
for k, v in task.parameters.items():
task_record.parameters[k] = TaskParameter(name=k, value=v)
session.add(task_record)
yield (task_record, session)
if task.host:
task_record.host = task.host
task.record_id = task_record.id
[docs]
def find_all_by_parameters(self, task_name, session=None, **task_params):
"""
Find tasks with the given task_name and the same parameters as the kwargs.
"""
with self._session(session) as session:
query = session.query(TaskRecord).join(TaskEvent).filter(TaskRecord.name == task_name)
for k, v in task_params.items():
alias = sqlalchemy.orm.aliased(TaskParameter)
query = query.join(alias).filter(alias.name == k, alias.value == v)
tasks = query.order_by(TaskEvent.ts)
for task in tasks:
# Sanity check
assert all(k in task.parameters and v == str(task.parameters[k].value) for k, v in task_params.items())
yield task
[docs]
def find_all_by_name(self, task_name, session=None):
"""
Find all tasks with the given task_name.
"""
return self.find_all_by_parameters(task_name, session)
[docs]
def find_latest_runs(self, session=None):
"""
Return tasks that have been updated in the past 24 hours.
"""
with self._session(session) as session:
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
return (
session.query(TaskRecord)
.join(TaskEvent)
.filter(TaskEvent.ts >= yesterday)
.group_by(TaskRecord.id, TaskEvent.event_name, TaskEvent.ts)
.order_by(TaskEvent.ts.desc())
.all()
)
[docs]
def find_all_runs(self, session=None):
"""
Return all tasks that have been updated.
"""
with self._session(session) as session:
return session.query(TaskRecord).all()
[docs]
def find_all_events(self, session=None):
"""
Return all running/failed/done events.
"""
with self._session(session) as session:
return session.query(TaskEvent).all()
[docs]
def find_task_by_id(self, id, session=None):
"""
Find task with the given record ID.
"""
with self._session(session) as session:
return session.query(TaskRecord).get(id)
[docs]
def find_task_by_task_id(self, task_id, session=None):
"""
Find task with the given task ID.
"""
with self._session(session) as session:
return session.query(TaskRecord).filter(TaskRecord.task_id == task_id).all()[-1]
[docs]
class TaskParameter(Base): # type: ignore
"""
Table to track luigi.Parameter()s of a Task.
"""
__tablename__ = "task_parameters"
task_id = sqlalchemy.Column(sqlalchemy.Integer, sqlalchemy.ForeignKey("tasks.id"), primary_key=True)
name = sqlalchemy.Column(sqlalchemy.String(128), primary_key=True)
value = sqlalchemy.Column(sqlalchemy.Text())
def __repr__(self):
return "TaskParameter(task_id=%d, name=%s, value=%s)" % (self.task_id, self.name, self.value)
[docs]
class TaskEvent(Base): # type: ignore
"""
Table to track when a task is scheduled, starts, finishes, and fails.
"""
__tablename__ = "task_events"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
task_id = sqlalchemy.Column(sqlalchemy.Integer, sqlalchemy.ForeignKey("tasks.id"), index=True)
event_name = sqlalchemy.Column(sqlalchemy.String(20))
ts = sqlalchemy.Column(sqlalchemy.TIMESTAMP, index=True, nullable=False)
def __repr__(self):
return "TaskEvent(task_id=%s, event_name=%s, ts=%s" % (self.task_id, self.event_name, self.ts)
[docs]
class TaskRecord(Base): # type: ignore
"""
Base table to track information about a luigi.Task.
References to other tables are available through task.events, task.parameters, etc.
"""
__tablename__ = "tasks"
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
task_id = sqlalchemy.Column(sqlalchemy.String(200), index=True)
name = sqlalchemy.Column(sqlalchemy.String(128), index=True)
host = sqlalchemy.Column(sqlalchemy.String(128))
parameters = sqlalchemy.orm.relationship(
"TaskParameter", collection_class=sqlalchemy.orm.collections.attribute_keyed_dict("name"), cascade="all, delete-orphan"
)
events = sqlalchemy.orm.relationship("TaskEvent", order_by=(sqlalchemy.desc(TaskEvent.ts), sqlalchemy.desc(TaskEvent.id)), backref="task")
def __repr__(self):
return "TaskRecord(name=%s, host=%s)" % (self.name, self.host)
def _upgrade_schema(engine):
"""
Ensure the database schema is up to date with the codebase.
:param engine: SQLAlchemy engine of the underlying database.
"""
inspector = reflection.Inspector.from_engine(engine)
with engine.connect() as conn:
# Upgrade 1. Add task_id column and index to tasks
if "task_id" not in [x["name"] for x in inspector.get_columns("tasks")]:
logger.warning("Upgrading DbTaskHistory schema: Adding tasks.task_id")
conn.execute(text("ALTER TABLE tasks ADD COLUMN task_id VARCHAR(200)"))
conn.execute(text("CREATE INDEX ix_task_id ON tasks (task_id)"))
# Upgrade 2. Alter value column to be TEXT, note that this is idempotent so no if-guard
if "mysql" in engine.dialect.name:
conn.execute(text("ALTER TABLE task_parameters MODIFY COLUMN value TEXT"))
elif "oracle" in engine.dialect.name:
conn.execute(text("ALTER TABLE task_parameters MODIFY value TEXT"))
elif "mssql" in engine.dialect.name:
conn.execute(text("ALTER TABLE task_parameters ALTER COLUMN value TEXT"))
elif "postgresql" in engine.dialect.name:
if str([x for x in inspector.get_columns("task_parameters") if x["name"] == "value"][0]["type"]) != "TEXT":
conn.execute(text("ALTER TABLE task_parameters ALTER COLUMN value TYPE TEXT"))
elif "sqlite" in engine.dialect.name:
# SQLite does not support changing column types. A database file will need
# to be used to pickup this migration change.
for row in conn.execute(text("PRAGMA table_info(task_parameters);")).fetchall():
row_as_dict = row._mapping
if row_as_dict["name"] == "value" and row_as_dict["type"] != "TEXT":
logger.warning("SQLite can not change column types. Please use a new database to pickup column type changes.")
else:
logger.warning("SQLAlcheny dialect {} could not be migrated to the TEXT type".format(engine.dialect))