# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Provides a database backend to the central scheduler. This lets you see historical runs.
See :ref:`TaskHistory` for information about how to turn out the task history feature.
"""
#
# Description: Added codes for visualization of how long each task takes
# running-time until it reaches the next status (failed or done)
# At "{base_url}/tasklist", all completed(failed or done) tasks are shown.
# At "{base_url}/tasklist", a user can select one specific task to see
# how its running-time has changed over time.
# At "{base_url}/tasklist/{task_name}", it visualizes a multi-bar graph
# that represents the changes of the running-time for a selected task
# up to the next status (failed or done).
# This visualization let us know how the running-time of the specific task
# has changed over time.
#
# Copyright 2015 Naver Corp.
# Author Yeseul Park (yeseul.park@navercorp.com)
#
import datetime
import logging
from contextlib import contextmanager
from luigi import configuration
from luigi import task_history
from luigi.task_status import DONE, FAILED, PENDING, RUNNING
import sqlalchemy
import sqlalchemy.ext.declarative
import sqlalchemy.orm
import sqlalchemy.orm.collections
from sqlalchemy.engine import reflection
Base = sqlalchemy.ext.declarative.declarative_base()
logger = logging.getLogger('luigi-interface')
[docs]
class DbTaskHistory(task_history.TaskHistory):
"""
Task History that writes to a database using sqlalchemy.
Also has methods for useful db queries.
"""
CURRENT_SOURCE_VERSION = 1
@contextmanager
def _session(self, session=None):
if session:
yield session
else:
session = self.session_factory()
try:
yield session
except BaseException:
session.rollback()
raise
else:
session.commit()
def __init__(self):
config = configuration.get_config()
connection_string = config.get('task_history', 'db_connection')
self.engine = sqlalchemy.create_engine(connection_string)
self.session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine, expire_on_commit=False)
Base.metadata.create_all(self.engine)
self.tasks = {} # task_id -> TaskRecord
_upgrade_schema(self.engine)
[docs]
def task_scheduled(self, task):
htask = self._get_task(task, status=PENDING)
self._add_task_event(htask, TaskEvent(event_name=PENDING, ts=datetime.datetime.now()))
[docs]
def task_finished(self, task, successful):
event_name = DONE if successful else FAILED
htask = self._get_task(task, status=event_name)
self._add_task_event(htask, TaskEvent(event_name=event_name, ts=datetime.datetime.now()))
[docs]
def task_started(self, task, worker_host):
htask = self._get_task(task, status=RUNNING, host=worker_host)
self._add_task_event(htask, TaskEvent(event_name=RUNNING, ts=datetime.datetime.now()))
def _get_task(self, task, status, host=None):
if task.id in self.tasks:
htask = self.tasks[task.id]
htask.status = status
if host:
htask.host = host
else:
htask = self.tasks[task.id] = task_history.StoredTask(task, status, host)
return htask
def _add_task_event(self, task, event):
for (task_record, session) in self._find_or_create_task(task):
task_record.events.append(event)
def _find_or_create_task(self, task):
with self._session() as session:
if task.record_id is not None:
logger.debug("Finding task with record_id [%d]", task.record_id)
task_record = session.query(TaskRecord).get(task.record_id)
if not task_record:
raise Exception("Task with record_id, but no matching Task record!")
yield (task_record, session)
else:
task_record = TaskRecord(task_id=task._task.id, name=task.task_family, host=task.host)
for k, v in task.parameters.items():
task_record.parameters[k] = TaskParameter(name=k, value=v)
session.add(task_record)
yield (task_record, session)
if task.host:
task_record.host = task.host
task.record_id = task_record.id
[docs]
def find_all_by_parameters(self, task_name, session=None, **task_params):
"""
Find tasks with the given task_name and the same parameters as the kwargs.
"""
with self._session(session) as session:
query = session.query(TaskRecord).join(TaskEvent).filter(TaskRecord.name == task_name)
for k, v in task_params.items():
alias = sqlalchemy.orm.aliased(TaskParameter)
query = query.join(alias).filter(alias.name == k, alias.value == v)
tasks = query.order_by(TaskEvent.ts)
for task in tasks:
# Sanity check
assert all(k in task.parameters and v == str(task.parameters[k].value) for k, v in task_params.items())
yield task
[docs]
def find_all_by_name(self, task_name, session=None):
"""
Find all tasks with the given task_name.
"""
return self.find_all_by_parameters(task_name, session)
[docs]
def find_latest_runs(self, session=None):
"""
Return tasks that have been updated in the past 24 hours.
"""
with self._session(session) as session:
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
return session.query(TaskRecord).\
join(TaskEvent).\
filter(TaskEvent.ts >= yesterday).\
group_by(TaskRecord.id, TaskEvent.event_name, TaskEvent.ts).\
order_by(TaskEvent.ts.desc()).\
all()
[docs]
def find_all_runs(self, session=None):
"""
Return all tasks that have been updated.
"""
with self._session(session) as session:
return session.query(TaskRecord).all()
[docs]
def find_all_events(self, session=None):
"""
Return all running/failed/done events.
"""
with self._session(session) as session:
return session.query(TaskEvent).all()
[docs]
def find_task_by_id(self, id, session=None):
"""
Find task with the given record ID.
"""
with self._session(session) as session:
return session.query(TaskRecord).get(id)
[docs]
def find_task_by_task_id(self, task_id, session=None):
"""
Find task with the given task ID.
"""
with self._session(session) as session:
return session.query(TaskRecord).filter(TaskRecord.task_id == task_id).all()[-1]
[docs]
class TaskParameter(Base): # type: ignore
"""
Table to track luigi.Parameter()s of a Task.
"""
__tablename__ = 'task_parameters'
task_id = sqlalchemy.Column(sqlalchemy.Integer, sqlalchemy.ForeignKey('tasks.id'), primary_key=True)
name = sqlalchemy.Column(sqlalchemy.String(128), primary_key=True)
value = sqlalchemy.Column(sqlalchemy.Text())
def __repr__(self):
return "TaskParameter(task_id=%d, name=%s, value=%s)" % (self.task_id, self.name, self.value)
[docs]
class TaskEvent(Base): # type: ignore
"""
Table to track when a task is scheduled, starts, finishes, and fails.
"""
__tablename__ = 'task_events'
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
task_id = sqlalchemy.Column(sqlalchemy.Integer, sqlalchemy.ForeignKey('tasks.id'), index=True)
event_name = sqlalchemy.Column(sqlalchemy.String(20))
ts = sqlalchemy.Column(sqlalchemy.TIMESTAMP, index=True, nullable=False)
def __repr__(self):
return "TaskEvent(task_id=%s, event_name=%s, ts=%s" % (self.task_id, self.event_name, self.ts)
[docs]
class TaskRecord(Base): # type: ignore
"""
Base table to track information about a luigi.Task.
References to other tables are available through task.events, task.parameters, etc.
"""
__tablename__ = 'tasks'
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
task_id = sqlalchemy.Column(sqlalchemy.String(200), index=True)
name = sqlalchemy.Column(sqlalchemy.String(128), index=True)
host = sqlalchemy.Column(sqlalchemy.String(128))
parameters = sqlalchemy.orm.relationship(
'TaskParameter',
collection_class=sqlalchemy.orm.collections.attribute_mapped_collection('name'),
cascade="all, delete-orphan")
events = sqlalchemy.orm.relationship(
'TaskEvent',
order_by=(sqlalchemy.desc(TaskEvent.ts), sqlalchemy.desc(TaskEvent.id)),
backref='task')
def __repr__(self):
return "TaskRecord(name=%s, host=%s)" % (self.name, self.host)
def _upgrade_schema(engine):
"""
Ensure the database schema is up to date with the codebase.
:param engine: SQLAlchemy engine of the underlying database.
"""
inspector = reflection.Inspector.from_engine(engine)
with engine.connect() as conn:
# Upgrade 1. Add task_id column and index to tasks
if 'task_id' not in [x['name'] for x in inspector.get_columns('tasks')]:
logger.warning('Upgrading DbTaskHistory schema: Adding tasks.task_id')
conn.execute('ALTER TABLE tasks ADD COLUMN task_id VARCHAR(200)')
conn.execute('CREATE INDEX ix_task_id ON tasks (task_id)')
# Upgrade 2. Alter value column to be TEXT, note that this is idempotent so no if-guard
if 'mysql' in engine.dialect.name:
conn.execute('ALTER TABLE task_parameters MODIFY COLUMN value TEXT')
elif 'oracle' in engine.dialect.name:
conn.execute('ALTER TABLE task_parameters MODIFY value TEXT')
elif 'mssql' in engine.dialect.name:
conn.execute('ALTER TABLE task_parameters ALTER COLUMN value TEXT')
elif 'postgresql' in engine.dialect.name:
if str([x for x in inspector.get_columns('task_parameters')
if x['name'] == 'value'][0]['type']) != 'TEXT':
conn.execute('ALTER TABLE task_parameters ALTER COLUMN value TYPE TEXT')
elif 'sqlite' in engine.dialect.name:
# SQLite does not support changing column types. A database file will need
# to be used to pickup this migration change.
for i in conn.execute('PRAGMA table_info(task_parameters);').fetchall():
if i['name'] == 'value' and i['type'] != 'TEXT':
logger.warning(
'SQLite can not change column types. Please use a new database '
'to pickup column type changes.'
)
else:
logger.warning(
'SQLAlcheny dialect {} could not be migrated to the TEXT type'.format(
engine.dialect
)
)