import logging
import os
import luigi.format
from luigi.contrib.hdfs.config import load_hadoop_cmd
from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs.clients import remove, rename, mkdir, listdir, exists
from luigi.contrib.hdfs.error import HDFSCliError
logger = logging.getLogger('luigi-interface')
[docs]class HdfsAtomicWriteError(IOError):
pass
[docs]class HdfsReadPipe(luigi.format.InputPipeProcessWrapper):
def __init__(self, path):
super(HdfsReadPipe, self).__init__(load_hadoop_cmd() + ['fs', '-cat', path])
[docs]class HdfsAtomicWritePipe(luigi.format.OutputPipeProcessWrapper):
"""
File like object for writing to HDFS
The referenced file is first written to a temporary location and then
renamed to final location on close(). If close() isn't called
the temporary file will be cleaned up when this object is
garbage collected
TODO: if this is buggy, change it so it first writes to a
local temporary file and then uploads it on completion
"""
def __init__(self, path):
self.path = path
self.tmppath = hdfs_config.tmppath(self.path)
parent_dir = os.path.dirname(self.tmppath)
mkdir(parent_dir, parents=True, raise_if_exists=False)
super(HdfsAtomicWritePipe, self).__init__(load_hadoop_cmd() + ['fs', '-put', '-', self.tmppath])
[docs] def abort(self):
logger.info("Aborting %s('%s'). Removing temporary file '%s'",
self.__class__.__name__, self.path, self.tmppath)
super(HdfsAtomicWritePipe, self).abort()
remove(self.tmppath, skip_trash=True)
[docs] def close(self):
super(HdfsAtomicWritePipe, self).close()
try:
if exists(self.path):
remove(self.path)
except Exception as ex:
if isinstance(ex, HDFSCliError) or ex.args[0].contains("FileNotFoundException"):
pass
else:
raise ex
if not all(result['result'] for result in rename(self.tmppath, self.path) or []):
raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path))
[docs]class HdfsAtomicWriteDirPipe(luigi.format.OutputPipeProcessWrapper):
"""
Writes a data<data_extension> file to a directory at <path>.
"""
def __init__(self, path, data_extension=""):
self.path = path
self.tmppath = hdfs_config.tmppath(self.path)
self.datapath = self.tmppath + ("/data%s" % data_extension)
super(HdfsAtomicWriteDirPipe, self).__init__(load_hadoop_cmd() + ['fs', '-put', '-', self.datapath])
[docs] def abort(self):
logger.info("Aborting %s('%s'). Removing temporary dir '%s'",
self.__class__.__name__, self.path, self.tmppath)
super(HdfsAtomicWriteDirPipe, self).abort()
remove(self.tmppath, skip_trash=True)
[docs] def close(self):
super(HdfsAtomicWriteDirPipe, self).close()
try:
if exists(self.path):
remove(self.path)
except Exception as ex:
if isinstance(ex, HDFSCliError) or ex.args[0].contains("FileNotFoundException"):
pass
else:
raise ex
# it's unlikely to fail in this way but better safe than sorry
if not all(result['result'] for result in rename(self.tmppath, self.path) or []):
raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path))
if os.path.basename(self.tmppath) in map(os.path.basename, listdir(self.path)):
remove(self.path)
raise HdfsAtomicWriteError('Atomic write to {} failed'.format(self.path))
Plain = PlainFormat()
PlainDir = PlainDirFormat()