# -*- coding: utf-8 -*-
"""
.. Copyright 2012-2015 Spotify AB
Copyright 2018
Copyright 2018 EMBL-European Bioinformatics Institute
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import subprocess
import time
import sys
import logging
import random
import shutil
try:
# Dill is used for handling pickling and unpickling if there is a deference
# in server setups between the LSF submission node and the nodes in the
# cluster
import dill as pickle
except ImportError:
import pickle
import luigi
import luigi.configuration
from luigi.contrib.hadoop import create_packages_archive
from luigi.contrib import lsf_runner
from luigi.task_status import PENDING, FAILED, DONE, RUNNING, UNKNOWN
"""
LSF batch system Tasks.
=======================
What's LSF? see http://en.wikipedia.org/wiki/Platform_LSF
and https://wiki.med.harvard.edu/Orchestra/IntroductionToLSF
See: https://github.com/spotify/luigi/issues/1936
This extension is modeled after the hadoop.py approach.
I'll be making a few assumptions, and will try to note them.
Going into it, the assumptions are:
- You schedule your jobs on an LSF submission node.
- The 'bjobs' command on an LSF batch submission system returns a standardized format.
- All nodes have access to the code you're running.
- The sysadmin won't get pissed if we run a 'bjobs' check every thirty
seconds or so per job (there are ways of coalescing the bjobs calls if that's not cool).
The procedure:
- Pickle the class
- Construct a bsub argument that runs a generic runner function with the path to the pickled class
- Runner function loads the class from pickle
- Runner function hits the work button on it
"""
LOGGER = logging.getLogger('luigi-interface')
[docs]
def track_job(job_id):
"""
Tracking is done by requesting each job and then searching for whether the job
has one of the following states:
- "RUN",
- "PEND",
- "SSUSP",
- "EXIT"
based on the LSF documentation
"""
cmd = "bjobs -noheader -o stat {}".format(job_id)
track_job_proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, shell=True)
status = track_job_proc.communicate()[0].strip('\n')
return status
[docs]
def kill_job(job_id):
"""
Kill a running LSF job
"""
subprocess.call(['bkill', job_id])
[docs]
class LSFJobTask(luigi.Task):
"""
Takes care of uploading and executing an LSF job
"""
n_cpu_flag = luigi.IntParameter(default=2, significant=False)
shared_tmp_dir = luigi.Parameter(default='/tmp', significant=False)
resource_flag = luigi.Parameter(default='mem=8192', significant=False)
memory_flag = luigi.Parameter(default='8192', significant=False)
queue_flag = luigi.Parameter(default='queue_name', significant=False)
runtime_flag = luigi.IntParameter(default=60)
job_name_flag = luigi.Parameter(default='')
poll_time = luigi.FloatParameter(
significant=False, default=5,
description="specify the wait time to poll bjobs for the job status")
save_job_info = luigi.BoolParameter(default=False)
output = luigi.Parameter(default='')
extra_bsub_args = luigi.Parameter(default='')
job_status = None
[docs]
def fetch_task_failures(self):
"""
Read in the error file from bsub
"""
error_file = os.path.join(self.tmp_dir, "job.err")
if os.path.isfile(error_file):
with open(error_file, "r") as f_err:
errors = f_err.readlines()
else:
errors = ''
return errors
[docs]
def fetch_task_output(self):
"""
Read in the output file
"""
# Read in the output file
if os.path.isfile(os.path.join(self.tmp_dir, "job.out")):
with open(os.path.join(self.tmp_dir, "job.out"), "r") as f_out:
outputs = f_out.readlines()
else:
outputs = ''
return outputs
def _init_local(self):
base_tmp_dir = self.shared_tmp_dir
random_id = '%016x' % random.getrandbits(64)
task_name = random_id + self.task_id
# If any parameters are directories, if we don't
# replace the separators on *nix, it'll create a weird nested directory
task_name = task_name.replace("/", "::")
# Max filename length
max_filename_length = os.fstatvfs(0).f_namemax
self.tmp_dir = os.path.join(base_tmp_dir, task_name[:max_filename_length])
LOGGER.info("Tmp dir: %s", self.tmp_dir)
os.makedirs(self.tmp_dir)
# Dump the code to be run into a pickle file
LOGGER.debug("Dumping pickled class")
self._dump(self.tmp_dir)
# Make sure that all the class's dependencies are tarred and available
LOGGER.debug("Tarballing dependencies")
# Grab luigi and the module containing the code to be run
packages = [luigi, __import__(self.__module__, None, None, 'dummy')]
create_packages_archive(packages, os.path.join(self.tmp_dir, "packages.tar"))
# Now, pass onto the class's specified init_local() method.
self.init_local()
[docs]
def init_local(self):
"""
Implement any work to setup any internal datastructure etc here.
You can add extra input using the requires_local/input_local methods.
Anything you set on the object will be pickled and available on the compute nodes.
"""
pass
[docs]
def run(self):
"""
The procedure:
- Pickle the class
- Tarball the dependencies
- Construct a bsub argument that runs a generic runner function with the path to the pickled class
- Runner function loads the class from pickle
- Runner class untars the dependencies
- Runner function hits the button on the class's work() method
"""
self._init_local()
self._run_job()
[docs]
def work(self):
"""
Subclass this for where you're doing your actual work.
Why not run(), like other tasks? Because we need run to always be
something that the Worker can call, and that's the real logical place to
do LSF scheduling.
So, the work will happen in work().
"""
pass
def _dump(self, out_dir=''):
"""
Dump instance to file.
"""
self.job_file = os.path.join(out_dir, 'job-instance.pickle')
if self.__module__ == '__main__':
dump_inst = pickle.dumps(self)
module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0]
dump_inst = dump_inst.replace('(c__main__', "(c" + module_name)
open(self.job_file, "w").write(dump_inst)
else:
pickle.dump(self, open(self.job_file, "w"))
def _run_job(self):
"""
Build a bsub argument that will run lsf_runner.py on the directory we've specified.
"""
args = []
if isinstance(self.output(), list):
log_output = os.path.split(self.output()[0].path)
else:
log_output = os.path.split(self.output().path)
args += ["bsub", "-q", self.queue_flag]
args += ["-n", str(self.n_cpu_flag)]
args += ["-M", str(self.memory_flag)]
args += ["-R", "rusage[%s]" % self.resource_flag]
args += ["-W", str(self.runtime_flag)]
if self.job_name_flag:
args += ["-J", str(self.job_name_flag)]
args += ["-o", os.path.join(log_output[0], "job.out")]
args += ["-e", os.path.join(log_output[0], "job.err")]
if self.extra_bsub_args:
args += self.extra_bsub_args.split()
# Find where the runner file is
runner_path = os.path.abspath(lsf_runner.__file__)
args += [runner_path]
args += [self.tmp_dir]
# That should do it. Let the world know what we're doing.
LOGGER.info("### LSF SUBMISSION ARGS: %s",
" ".join([str(a) for a in args]))
# Submit the job
run_job_proc = subprocess.Popen(
[str(a) for a in args],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, cwd=self.tmp_dir)
output = run_job_proc.communicate()[0]
# ASSUMPTION
# The result will be of the format
# Job <123> is submitted ot queue <myqueue>
# So get the number in those first brackets.
# I cannot think of a better workaround that leaves logic on the Task side of things.
LOGGER.info("### JOB SUBMISSION OUTPUT: %s", str(output))
self.job_id = int(output.split("<")[1].split(">")[0])
LOGGER.info(
"Job %ssubmitted as job %s",
self.job_name_flag + ' ',
str(self.job_id)
)
self._track_job()
# If we want to save the job temporaries, then do so
# We'll move them to be next to the job output
if self.save_job_info:
LOGGER.info("Saving up temporary bits")
# dest_dir = self.output().path
shutil.move(self.tmp_dir, "/".join(log_output[0:-1]))
# Now delete the temporaries, if they're there.
self._finish()
def _track_job(self):
time0 = 0
while True:
# Sleep for a little bit
time.sleep(self.poll_time)
# See what the job's up to
# ASSUMPTION
lsf_status = track_job(self.job_id)
if lsf_status == "RUN":
self.job_status = RUNNING
LOGGER.info("Job is running...")
if time0 == 0:
time0 = int(round(time.time()))
elif lsf_status == "PEND":
self.job_status = PENDING
LOGGER.info("Job is pending...")
elif lsf_status == "DONE" or lsf_status == "EXIT":
# Then the job could either be failed or done.
errors = self.fetch_task_failures()
if not errors:
self.job_status = DONE
LOGGER.info("Job is done")
time1 = int(round(time.time()))
# Return a near estimate of the run time to with +/- the
# self.poll_time
job_name = str(self.job_id)
if self.job_name_flag:
job_name = "%s %s" % (self.job_name_flag, job_name)
LOGGER.info(
"### JOB COMPLETED: %s in %s seconds",
job_name,
str(time1-time0)
)
else:
self.job_status = FAILED
LOGGER.error("Job has FAILED")
LOGGER.error("\n\n")
LOGGER.error("Traceback: ")
for error in errors:
LOGGER.error(error)
break
elif lsf_status == "SSUSP":
self.job_status = PENDING
LOGGER.info("Job is suspended (basically, pending)...")
else:
self.job_status = UNKNOWN
LOGGER.info("Job status is UNKNOWN!")
LOGGER.info("Status is : %s", lsf_status)
break
def _finish(self):
LOGGER.info("Cleaning up temporary bits")
if self.tmp_dir and os.path.exists(self.tmp_dir):
LOGGER.info('Removing directory %s', self.tmp_dir)
shutil.rmtree(self.tmp_dir)
def __del__(self):
pass
# self._finish()
[docs]
class LocalLSFJobTask(LSFJobTask):
"""
A local version of JobTask, for easier debugging.
"""
[docs]
def run(self):
self.init_local()
self.work()