Source code for luigi.contrib.hdfs.snakebite_client

# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A luigi file system client that wraps around snakebite

Originally written by Alan Brenner <alan@magnetic.com> github.com/alanbbr
"""


from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs import abstract_client as hdfs_abstract_client
from luigi import six
import luigi.contrib.target
import logging
import datetime
import os

logger = logging.getLogger('luigi-interface')


[docs]class SnakebiteHdfsClient(hdfs_abstract_client.HdfsFileSystem): """ A hdfs client using snakebite. Since Snakebite has a python API, it'll be about 100 times faster than the hadoop cli client, which does shell out to a java program on each file system operation. """ def __init__(self): super(SnakebiteHdfsClient, self).__init__() self._bite = None self.pid = -1
[docs] @staticmethod def list_path(path): if isinstance(path, list) or isinstance(path, tuple): return path # TODO: Should this be: # isinstance(path, (six.text_type, six.binary_type))? if isinstance(path, six.string_types): return [path, ] return [str(path), ]
[docs] def get_bite(self): """ If Luigi has forked, we have a different PID, and need to reconnect. """ config = hdfs_config.hdfs() if self.pid != os.getpid() or not self._bite: client_kwargs = dict(filter( lambda k_v: k_v[1] is not None and k_v[1] != '', six.iteritems({ 'hadoop_version': config.client_version, 'effective_user': config.effective_user, }) )) if config.snakebite_autoconfig: """ This is fully backwards compatible with the vanilla Client and can be used for a non HA cluster as well. This client tries to read ``${HADOOP_PATH}/conf/hdfs-site.xml`` to get the address of the namenode. The behaviour is the same as Client. """ from snakebite.client import AutoConfigClient self._bite = AutoConfigClient(**client_kwargs) else: from snakebite.client import Client self._bite = Client(config.namenode_host, config.namenode_port, **client_kwargs) return self._bite
[docs] def exists(self, path): """ Use snakebite.test to check file existence. :param path: path to test :type path: string :return: boolean, True if path exists in HDFS """ return self.get_bite().test(path, exists=True)
[docs] def move(self, path, dest): """ Use snakebite.rename, if available. :param path: source file(s) :type path: either a string or sequence of strings :param dest: destination file (single input) or directory (multiple) :type dest: string :return: list of renamed items """ parts = dest.rstrip('/').split('/') if len(parts) > 1: dir_path = '/'.join(parts[0:-1]) if not self.exists(dir_path): self.mkdir(dir_path, parents=True) return list(self.get_bite().rename(self.list_path(path), dest))
[docs] def rename_dont_move(self, path, dest): """ Use snakebite.rename_dont_move, if available. :param path: source path (single input) :type path: string :param dest: destination path :type dest: string :return: True if succeeded :raises: snakebite.errors.FileAlreadyExistsException """ from snakebite.errors import FileAlreadyExistsException try: self.get_bite().rename2(path, dest, overwriteDest=False) except FileAlreadyExistsException: # Unfortunately python2 don't allow exception chaining. raise luigi.target.FileAlreadyExists()
[docs] def remove(self, path, recursive=True, skip_trash=False): """ Use snakebite.delete, if available. :param path: delete-able file(s) or directory(ies) :type path: either a string or a sequence of strings :param recursive: delete directories trees like \\*nix: rm -r :type recursive: boolean, default is True :param skip_trash: do or don't move deleted items into the trash first :type skip_trash: boolean, default is False (use trash) :return: list of deleted items """ return list(self.get_bite().delete(self.list_path(path), recurse=recursive))
[docs] def chmod(self, path, permissions, recursive=False): """ Use snakebite.chmod, if available. :param path: update-able file(s) :type path: either a string or sequence of strings :param permissions: \\*nix style permission number :type permissions: octal :param recursive: change just listed entry(ies) or all in directories :type recursive: boolean, default is False :return: list of all changed items """ if type(permissions) == str: permissions = int(permissions, 8) return list(self.get_bite().chmod(self.list_path(path), permissions, recursive))
[docs] def chown(self, path, owner, group, recursive=False): """ Use snakebite.chown/chgrp, if available. One of owner or group must be set. Just setting group calls chgrp. :param path: update-able file(s) :type path: either a string or sequence of strings :param owner: new owner, can be blank :type owner: string :param group: new group, can be blank :type group: string :param recursive: change just listed entry(ies) or all in directories :type recursive: boolean, default is False :return: list of all changed items """ bite = self.get_bite() if owner: if group: return all(bite.chown(self.list_path(path), "%s:%s" % (owner, group), recurse=recursive)) return all(bite.chown(self.list_path(path), owner, recurse=recursive)) return list(bite.chgrp(self.list_path(path), group, recurse=recursive))
[docs] def count(self, path): """ Use snakebite.count, if available. :param path: directory to count the contents of :type path: string :return: dictionary with content_size, dir_count and file_count keys """ try: res = self.get_bite().count(self.list_path(path)).next() dir_count = res['directoryCount'] file_count = res['fileCount'] content_size = res['spaceConsumed'] except StopIteration: dir_count = file_count = content_size = 0 return {'content_size': content_size, 'dir_count': dir_count, 'file_count': file_count}
[docs] def copy(self, path, destination): """ Raise a NotImplementedError exception. """ raise NotImplementedError("SnakebiteClient in luigi doesn't implement copy")
[docs] def put(self, local_path, destination): """ Raise a NotImplementedError exception. """ raise NotImplementedError("Snakebite doesn't implement put")
[docs] def get(self, path, local_destination): """ Use snakebite.copyToLocal, if available. :param path: HDFS file :type path: string :param local_destination: path on the system running Luigi :type local_destination: string """ return list(self.get_bite().copyToLocal(self.list_path(path), local_destination))
[docs] def get_merge(self, path, local_destination): """ Using snakebite getmerge to implement this. :param path: HDFS directory :param local_destination: path on the system running Luigi :return: merge of the directory """ return list(self.get_bite().getmerge(path=path, dst=local_destination))
[docs] def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False): """ Use snakebite.mkdir, if available. Snakebite's mkdir method allows control over full path creation, so by default, tell it to build a full path to work like ``hadoop fs -mkdir``. :param path: HDFS path to create :type path: string :param parents: create any missing parent directories :type parents: boolean, default is True :param mode: \\*nix style owner/group/other permissions :type mode: octal, default 0755 """ result = list(self.get_bite().mkdir(self.list_path(path), create_parent=parents, mode=mode)) if raise_if_exists and "ile exists" in result[0].get('error', ''): raise luigi.target.FileAlreadyExists("%s exists" % (path, )) return result
[docs] def listdir(self, path, ignore_directories=False, ignore_files=False, include_size=False, include_type=False, include_time=False, recursive=False): """ Use snakebite.ls to get the list of items in a directory. :param path: the directory to list :type path: string :param ignore_directories: if True, do not yield directory entries :type ignore_directories: boolean, default is False :param ignore_files: if True, do not yield file entries :type ignore_files: boolean, default is False :param include_size: include the size in bytes of the current item :type include_size: boolean, default is False (do not include) :param include_type: include the type (d or f) of the current item :type include_type: boolean, default is False (do not include) :param include_time: include the last modification time of the current item :type include_time: boolean, default is False (do not include) :param recursive: list subdirectory contents :type recursive: boolean, default is False (do not recurse) :return: yield with a string, or if any of the include_* settings are true, a tuple starting with the path, and include_* items in order """ bite = self.get_bite() for entry in bite.ls(self.list_path(path), recurse=recursive): if ignore_directories and entry['file_type'] == 'd': continue if ignore_files and entry['file_type'] == 'f': continue rval = [entry['path'], ] if include_size: rval.append(entry['length']) if include_type: rval.append(entry['file_type']) if include_time: rval.append(datetime.datetime.fromtimestamp(entry['modification_time'] / 1000)) if len(rval) > 1: yield tuple(rval) else: yield rval[0]
[docs] def touchz(self, path): """ Raise a NotImplementedError exception. """ raise NotImplementedError("SnakebiteClient in luigi doesn't implement touchz")