#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Provides access to HDFS using the :py:class:`HdfsTarget`, a subclass of :py:class:`~luigi.target.Target`.
"""
import luigi
import random
import warnings
from luigi.target import FileSystemTarget
from luigi.contrib.hdfs.config import tmppath
from luigi.contrib.hdfs import format as hdfs_format
from luigi.contrib.hdfs import clients as hdfs_clients
from urllib import parse as urlparse
[docs]
class HdfsTarget(FileSystemTarget):
def __init__(self, path=None, format=None, is_tmp=False, fs=None):
if path is None:
assert is_tmp
path = tmppath()
super(HdfsTarget, self).__init__(path)
if format is None:
format = luigi.format.get_default_format() >> hdfs_format.Plain
old_format = (
(
hasattr(format, 'hdfs_writer') or
hasattr(format, 'hdfs_reader')
) and
not hasattr(format, 'output')
)
if not old_format and getattr(format, 'output', '') != 'hdfs':
format = format >> hdfs_format.Plain
if old_format:
warnings.warn(
'hdfs_writer and hdfs_reader method for format is deprecated,'
'specify the property output of your format as \'hdfs\' instead',
DeprecationWarning,
stacklevel=2
)
if hasattr(format, 'hdfs_writer'):
format_writer = format.hdfs_writer
else:
w_format = format >> hdfs_format.Plain
format_writer = w_format.pipe_writer
if hasattr(format, 'hdfs_reader'):
format_reader = format.hdfs_reader
else:
r_format = format >> hdfs_format.Plain
format_reader = r_format.pipe_reader
format = hdfs_format.CompatibleHdfsFormat(
format_writer,
format_reader,
)
else:
format = hdfs_format.CompatibleHdfsFormat(
format.pipe_writer,
format.pipe_reader,
getattr(format, 'input', None),
)
self.format = format
self.is_tmp = is_tmp
(scheme, netloc, path, query, fragment) = urlparse.urlsplit(path)
if ":" in path:
raise ValueError('colon is not allowed in hdfs filenames')
self._fs = fs or hdfs_clients.get_autoconfig_client()
def __del__(self):
# TODO: not sure is_tmp belongs in Targets construction arguments
if self.is_tmp and self.exists():
self.remove(skip_trash=True)
@property
def fs(self):
return self._fs
[docs]
def glob_exists(self, expected_files):
ls = list(self.fs.listdir(self.path))
if len(ls) == expected_files:
return True
return False
[docs]
def open(self, mode='r'):
if mode not in ('r', 'w'):
raise ValueError("Unsupported open mode '%s'" % mode)
if mode == 'r':
return self.format.pipe_reader(self.path)
else:
return self.format.pipe_writer(self.path)
[docs]
def remove(self, skip_trash=False):
self.fs.remove(self.path, skip_trash=skip_trash)
[docs]
def rename(self, path, raise_if_exists=False):
"""
Does not change self.path.
Unlike ``move_dir()``, ``rename()`` might cause nested directories.
See spotify/luigi#522
"""
if isinstance(path, HdfsTarget):
path = path.path
if raise_if_exists and self.fs.exists(path):
raise RuntimeError('Destination exists: %s' % path)
self.fs.rename(self.path, path)
[docs]
def move(self, path, raise_if_exists=False):
"""
Alias for ``rename()``
"""
self.rename(path, raise_if_exists=raise_if_exists)
[docs]
def move_dir(self, path):
"""
Move using :py:class:`~luigi.contrib.hdfs.abstract_client.HdfsFileSystem.rename_dont_move`
New since after luigi v2.1: Does not change self.path
One could argue that the implementation should use the
mkdir+raise_if_exists approach, but we at Spotify have had more trouble
with that over just using plain mv. See spotify/luigi#557
"""
self.fs.rename_dont_move(self.path, path)
[docs]
def copy(self, dst_dir):
"""
Copy to destination directory.
"""
self.fs.copy(self.path, dst_dir)
[docs]
def is_writable(self):
"""
Currently only works with hadoopcli
"""
if "/" in self.path:
# example path: /log/ap/2013-01-17/00
parts = self.path.split("/")
# start with the full path and then up the tree until we can check
length = len(parts)
for part in range(length):
path = "/".join(parts[0:length - part]) + "/"
if self.fs.exists(path):
# if the path exists and we can write there, great!
if self._is_writable(path):
return True
# if it exists and we can't =( sad panda
else:
return False
# We went through all parts of the path and we still couldn't find
# one that exists.
return False
def _is_writable(self, path):
test_path = path + '.test_write_access-%09d' % random.randrange(10_000_000_000)
try:
self.fs.touchz(test_path)
self.fs.remove(test_path, recursive=False)
return True
except hdfs_clients.HDFSCliError:
return False
[docs]
class HdfsFlagTarget(HdfsTarget):
"""
Defines a target directory with a flag-file (defaults to `_SUCCESS`) used
to signify job success.
This checks for two things:
* the path exists (just like the HdfsTarget)
* the _SUCCESS file exists within the directory.
Because Hadoop outputs into a directory and not a single file,
the path is assumed to be a directory.
"""
def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
"""
Initializes a HdfsFlagTarget.
:param path: the directory where the files are stored.
:type path: str
:param client:
:type client:
:param flag:
:type flag: str
"""
if path[-1] != "/":
raise ValueError("HdfsFlagTarget requires the path to be to a "
"directory. It must end with a slash ( / ).")
super(HdfsFlagTarget, self).__init__(path, format, client)
self.flag = flag
[docs]
def exists(self):
hadoopSemaphore = self.path + self.flag
return self.fs.exists(hadoopSemaphore)