# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Simple REST server that takes commands in a JSON payload
Interface to the :py:class:`~luigi.scheduler.Scheduler` class.
See :doc:`/central_scheduler` for more info.
"""
#
# Description: Added codes for visualization of how long each task takes
# running-time until it reaches the next status (failed or done)
# At "{base_url}/tasklist", all completed(failed or done) tasks are shown.
# At "{base_url}/tasklist", a user can select one specific task to see
# how its running-time has changed over time.
# At "{base_url}/tasklist/{task_name}", it visualizes a multi-bar graph
# that represents the changes of the running-time for a selected task
# up to the next status (failed or done).
# This visualization let us know how the running-time of the specific task
# has changed over time.
#
# Copyright 2015 Naver Corp.
# Author Yeseul Park (yeseul.park@navercorp.com)
#
import atexit
import datetime
import json
import logging
import os
import signal
import sys
import time
import pkg_resources
import tornado.httpserver
import tornado.ioloop
import tornado.netutil
import tornado.web
from luigi import Config, parameter
from luigi.scheduler import Scheduler, RPC_METHODS
logger = logging.getLogger("luigi.server")
[docs]
class cors(Config):
enabled = parameter.BoolParameter(
default=False,
description='Enables CORS support.')
allowed_origins = parameter.ListParameter(
default=[],
description='A list of allowed origins. Used only if `allow_any_origin` is false.')
allow_any_origin = parameter.BoolParameter(
default=False,
description='Accepts requests from any origin.')
allow_null_origin = parameter.BoolParameter(
default=False,
description='Allows the request to set `null` value of the `Origin` header.')
max_age = parameter.IntParameter(
default=86400,
description='Content of `Access-Control-Max-Age`.')
allowed_methods = parameter.Parameter(
default='GET, OPTIONS',
description='Content of `Access-Control-Allow-Methods`.')
allowed_headers = parameter.Parameter(
default='Accept, Content-Type, Origin',
description='Content of `Access-Control-Allow-Headers`.')
exposed_headers = parameter.Parameter(
default='',
description='Content of `Access-Control-Expose-Headers`.')
allow_credentials = parameter.BoolParameter(
default=False,
description='Indicates that the actual request can include user credentials.')
def __init__(self, *args, **kwargs):
super(cors, self).__init__(*args, **kwargs)
self.allowed_origins = set(i for i in self.allowed_origins if i not in ['*', 'null'])
[docs]
class RPCHandler(tornado.web.RequestHandler):
"""
Handle remote scheduling calls using rpc.RemoteSchedulerResponder.
"""
def __init__(self, *args, **kwargs):
super(RPCHandler, self).__init__(*args, **kwargs)
self._cors_config = cors()
[docs]
def initialize(self, scheduler):
self._scheduler = scheduler
[docs]
def options(self, *args):
if self._cors_config.enabled:
self._handle_cors_preflight()
self.set_status(204)
self.finish()
[docs]
def get(self, method):
if method not in RPC_METHODS:
self.send_error(404)
return
payload = self.get_argument('data', default="{}")
arguments = json.loads(payload)
if hasattr(self._scheduler, method):
result = getattr(self._scheduler, method)(**arguments)
if self._cors_config.enabled:
self._handle_cors()
self.write({"response": result}) # wrap all json response in a dictionary
else:
self.send_error(404)
post = get
def _handle_cors_preflight(self):
origin = self.request.headers.get('Origin')
if not origin:
return
if origin == 'null':
if self._cors_config.allow_null_origin:
self.set_header('Access-Control-Allow-Origin', 'null')
self._set_other_cors_headers()
else:
if self._cors_config.allow_any_origin:
self.set_header('Access-Control-Allow-Origin', '*')
self._set_other_cors_headers()
elif origin in self._cors_config.allowed_origins:
self.set_header('Access-Control-Allow-Origin', origin)
self._set_other_cors_headers()
def _handle_cors(self):
origin = self.request.headers.get('Origin')
if not origin:
return
if origin == 'null':
if self._cors_config.allow_null_origin:
self.set_header('Access-Control-Allow-Origin', 'null')
else:
if self._cors_config.allow_any_origin:
self.set_header('Access-Control-Allow-Origin', '*')
elif origin in self._cors_config.allowed_origins:
self.set_header('Access-Control-Allow-Origin', origin)
self.set_header('Vary', 'Origin')
def _set_other_cors_headers(self):
self.set_header('Access-Control-Max-Age', str(self._cors_config.max_age))
self.set_header('Access-Control-Allow-Methods', self._cors_config.allowed_methods)
self.set_header('Access-Control-Allow-Headers', self._cors_config.allowed_headers)
if self._cors_config.allow_credentials:
self.set_header('Access-Control-Allow-Credentials', 'true')
if self._cors_config.exposed_headers:
self.set_header('Access-Control-Expose-Headers', self._cors_config.exposed_headers)
[docs]
class BaseTaskHistoryHandler(tornado.web.RequestHandler):
[docs]
def initialize(self, scheduler):
self._scheduler = scheduler
[docs]
def get_template_path(self):
return pkg_resources.resource_filename(__name__, 'templates')
[docs]
class AllRunHandler(BaseTaskHistoryHandler):
[docs]
def get(self):
all_tasks = self._scheduler.task_history.find_all_runs()
tasknames = [task.name for task in all_tasks]
# show all tasks with their name list to be selected
# why all tasks? the duration of the event history of a selected task
# can be more than 24 hours.
self.render("menu.html", tasknames=tasknames)
[docs]
class SelectedRunHandler(BaseTaskHistoryHandler):
[docs]
def get(self, name):
statusResults = {}
taskResults = []
# get all tasks that has been updated
all_tasks = self._scheduler.task_history.find_all_runs()
# get events history for all tasks
all_tasks_event_history = self._scheduler.task_history.find_all_events()
# build the dictionary tasks with index: id, value: task_name
tasks = {task.id: str(task.name) for task in all_tasks}
for task in all_tasks_event_history:
# if the name of user-selected task is in tasks, get its task_id
if tasks.get(task.task_id) == str(name):
status = str(task.event_name)
if status not in statusResults:
statusResults[status] = []
# append the id, task_id, ts, y with 0, next_process with null
# for the status(running/failed/done) of the selected task
statusResults[status].append(({
'id': str(task.id), 'task_id': str(task.task_id),
'x': from_utc(str(task.ts)), 'y': 0, 'next_process': ''}))
# append the id, task_name, task_id, status, datetime, timestamp
# for the selected task
taskResults.append({
'id': str(task.id), 'taskName': str(name), 'task_id': str(task.task_id),
'status': str(task.event_name), 'datetime': str(task.ts),
'timestamp': from_utc(str(task.ts))})
statusResults = json.dumps(statusResults)
taskResults = json.dumps(taskResults)
statusResults = tornado.escape.xhtml_unescape(str(statusResults))
taskResults = tornado.escape.xhtml_unescape(str(taskResults))
self.render('history.html', name=name, statusResults=statusResults, taskResults=taskResults)
[docs]
def from_utc(utcTime, fmt=None):
"""convert UTC time string to time.struct_time: change datetime.datetime to time, return time.struct_time type"""
if fmt is None:
try_formats = ["%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"]
else:
try_formats = [fmt]
for fmt in try_formats:
try:
time_struct = datetime.datetime.strptime(utcTime, fmt)
except ValueError:
pass
else:
date = int(time.mktime(time_struct.timetuple()))
return date
else:
raise ValueError("No UTC format matches {}".format(utcTime))
[docs]
class RecentRunHandler(BaseTaskHistoryHandler):
[docs]
def get(self):
with self._scheduler.task_history._session(None) as session:
tasks = self._scheduler.task_history.find_latest_runs(session)
self.render("recent.html", tasks=tasks)
[docs]
class ByNameHandler(BaseTaskHistoryHandler):
[docs]
def get(self, name):
with self._scheduler.task_history._session(None) as session:
tasks = self._scheduler.task_history.find_all_by_name(name, session)
self.render("recent.html", tasks=tasks)
[docs]
class ByIdHandler(BaseTaskHistoryHandler):
[docs]
def get(self, id):
with self._scheduler.task_history._session(None) as session:
task = self._scheduler.task_history.find_task_by_id(id, session)
self.render("show.html", task=task)
[docs]
class ByTaskIdHandler(BaseTaskHistoryHandler):
[docs]
def get(self, task_id):
with self._scheduler.task_history._session(None) as session:
task = self._scheduler.task_history.find_task_by_task_id(task_id, session)
self.render("show.html", task=task)
[docs]
class ByParamsHandler(BaseTaskHistoryHandler):
[docs]
def get(self, name):
payload = self.get_argument('data', default="{}")
arguments = json.loads(payload)
with self._scheduler.task_history._session(None) as session:
tasks = self._scheduler.task_history.find_all_by_parameters(name, session=session, **arguments)
self.render("recent.html", tasks=tasks)
[docs]
class RootPathHandler(BaseTaskHistoryHandler):
[docs]
def get(self):
# we omit the leading slash in case the visualizer is behind a different
# path (as in a reverse proxy setup)
#
# For example, if luigi is behind my.app.com/my/luigi/, we want / to
# redirect relative (so it goes to my.app.com/my/luigi/static/visualizer/index.html)
# instead of absolute (which would be my.app.com/static/visualizer/index.html)
self.redirect("static/visualiser/index.html")
[docs]
def head(self):
"""HEAD endpoint for health checking the scheduler"""
self.set_status(204)
self.finish()
[docs]
class MetricsHandler(tornado.web.RequestHandler):
[docs]
def initialize(self, scheduler):
self._scheduler = scheduler
[docs]
def get(self):
metrics_collector = self._scheduler._state._metrics_collector
metrics = metrics_collector.generate_latest()
if metrics:
metrics_collector.configure_http_handler(self)
self.write(metrics)
[docs]
def app(scheduler):
settings = {"static_path": os.path.join(os.path.dirname(__file__), "static"),
"unescape": tornado.escape.xhtml_unescape,
"compress_response": True,
}
handlers = [
(r'/api/(.*)', RPCHandler, {"scheduler": scheduler}),
(r'/', RootPathHandler, {'scheduler': scheduler}),
(r'/tasklist', AllRunHandler, {'scheduler': scheduler}),
(r'/tasklist/(.*?)', SelectedRunHandler, {'scheduler': scheduler}),
(r'/history', RecentRunHandler, {'scheduler': scheduler}),
(r'/history/by_name/(.*?)', ByNameHandler, {'scheduler': scheduler}),
(r'/history/by_id/(.*?)', ByIdHandler, {'scheduler': scheduler}),
(r'/history/by_task_id/(.*?)', ByTaskIdHandler, {'scheduler': scheduler}),
(r'/history/by_params/(.*?)', ByParamsHandler, {'scheduler': scheduler}),
(r'/metrics', MetricsHandler, {'scheduler': scheduler})
]
api_app = tornado.web.Application(handlers, **settings)
return api_app
def _init_api(scheduler, api_port=None, address=None, unix_socket=None):
api_app = app(scheduler)
if unix_socket is not None:
api_sockets = [tornado.netutil.bind_unix_socket(unix_socket)]
else:
api_sockets = tornado.netutil.bind_sockets(api_port, address=address)
server = tornado.httpserver.HTTPServer(api_app)
server.add_sockets(api_sockets)
# Return the bound socket names. Useful for connecting client in test scenarios.
return [s.getsockname() for s in api_sockets]
[docs]
def run(api_port=8082, address=None, unix_socket=None, scheduler=None):
"""
Runs one instance of the API server.
"""
if scheduler is None:
scheduler = Scheduler()
# load scheduler state
scheduler.load()
_init_api(
scheduler=scheduler,
api_port=api_port,
address=address,
unix_socket=unix_socket,
)
# prune work DAG every 60 seconds
pruner = tornado.ioloop.PeriodicCallback(scheduler.prune, 60000)
pruner.start()
def shutdown_handler(signum, frame):
exit_handler()
sys.exit(0)
@atexit.register
def exit_handler():
logger.info("Scheduler instance shutting down")
scheduler.dump()
stop()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
if os.name == 'nt':
signal.signal(signal.SIGBREAK, shutdown_handler)
else:
signal.signal(signal.SIGQUIT, shutdown_handler)
logger.info("Scheduler starting up")
tornado.ioloop.IOLoop.instance().start()
[docs]
def stop():
tornado.ioloop.IOLoop.instance().stop()
if __name__ == "__main__":
run()