# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This library is a wrapper of ftplib or pysftp.
It is convenient to move data from/to (S)FTP servers.
There is an example on how to use it (example/ftp_experiment_outputs.py)
You can also find unittest for each class.
Be aware that normal ftp does not provide secure communication.
"""
import datetime
import ftplib
import os
import random
import tempfile
import io
import luigi
import luigi.local_target
import luigi.format
import luigi.target
from luigi.format import FileWrapper
import logging
logger = logging.getLogger('luigi-interface')
[docs]
class RemoteFileSystem(luigi.target.FileSystem):
def __init__(self, host, username=None, password=None, port=None,
tls=False, timeout=60, sftp=False, pysftp_conn_kwargs=None):
self.host = host
self.username = username
self.password = password
self.tls = tls
self.timeout = timeout
self.sftp = sftp
self.pysftp_conn_kwargs = pysftp_conn_kwargs or {}
if port is None:
if self.sftp:
self.port = 22
else:
self.port = 21
else:
self.port = port
def _connect(self):
"""
Log in to ftp.
"""
if self.sftp:
self._sftp_connect()
else:
self._ftp_connect()
def _sftp_connect(self):
try:
import pysftp
except ImportError:
logger.warning('Please install pysftp to use SFTP.')
self.conn = pysftp.Connection(self.host, username=self.username, password=self.password,
port=self.port, **self.pysftp_conn_kwargs)
def _ftp_connect(self):
if self.tls:
self.conn = ftplib.FTP_TLS()
else:
self.conn = ftplib.FTP()
self.conn.connect(self.host, self.port, timeout=self.timeout)
self.conn.login(self.username, self.password)
if self.tls:
self.conn.prot_p()
def _close(self):
"""
Close ftp connection.
"""
if self.sftp:
self._sftp_close()
else:
self._ftp_close()
def _sftp_close(self):
self.conn.close()
def _ftp_close(self):
self.conn.quit()
[docs]
def exists(self, path, mtime=None):
"""
Return `True` if file or directory at `path` exist, False otherwise.
Additional check on modified time when mtime is passed in.
Return False if the file's modified time is older mtime.
"""
self._connect()
if self.sftp:
exists = self._sftp_exists(path, mtime)
else:
exists = self._ftp_exists(path, mtime)
self._close()
return exists
def _sftp_exists(self, path, mtime):
exists = False
if mtime:
exists = self.conn.stat(path).st_mtime > mtime
elif self.conn.exists(path):
exists = True
return exists
def _ftp_exists(self, path, mtime):
dirname, fn = os.path.split(path)
files = self.conn.nlst(dirname)
exists = False
if path in files or fn in files:
if mtime:
mdtm = self.conn.sendcmd('MDTM ' + path)
modified = datetime.datetime.strptime(mdtm[4:], "%Y%m%d%H%M%S")
exists = modified > mtime
else:
exists = True
return exists
[docs]
def remove(self, path, recursive=True):
"""
Remove file or directory at location ``path``.
:param path: a path within the FileSystem to remove.
:type path: str
:param recursive: if the path is a directory, recursively remove the directory and
all of its descendants. Defaults to ``True``.
:type recursive: bool
"""
self._connect()
if self.sftp:
self._sftp_remove(path, recursive)
else:
self._ftp_remove(path, recursive)
self._close()
def _sftp_remove(self, path, recursive):
if self.conn.isfile(path):
self.conn.unlink(path)
else:
if not recursive:
raise RuntimeError("Path is not a regular file, and recursive option is not set")
directories = []
# walk the tree, and execute call backs when files,
# directories and unknown types are encountered
# files must be removed first. then directories can be removed
# after the files are gone.
self.conn.walktree(path, self.conn.unlink, directories.append, self.conn.unlink)
for directory in reversed(directories):
self.conn.rmdir(directory)
self.conn.rmdir(path)
def _ftp_remove(self, path, recursive):
if recursive:
self._rm_recursive(self.conn, path)
else:
try:
# try delete file
self.conn.delete(path)
except ftplib.all_errors:
# it is a folder, delete it
self.conn.rmd(path)
def _rm_recursive(self, ftp, path):
"""
Recursively delete a directory tree on a remote server.
Source: https://gist.github.com/artlogic/2632647
"""
wd = ftp.pwd()
# check if it is a file first, because some FTP servers don't return
# correctly on ftp.nlst(file)
try:
ftp.cwd(path)
except ftplib.all_errors:
# this is a file, we will just delete the file
ftp.delete(path)
return
try:
names = ftp.nlst()
except ftplib.all_errors:
# some FTP servers complain when you try and list non-existent paths
return
for name in names:
if os.path.split(name)[1] in ('.', '..'):
continue
try:
ftp.cwd(name) # if we can cwd to it, it's a folder
ftp.cwd(wd) # don't try a nuke a folder we're in
ftp.cwd(path) # then go back to where we were
self._rm_recursive(ftp, name)
except ftplib.all_errors:
ftp.delete(name)
try:
ftp.cwd(wd) # do not delete the folder that we are in
ftp.rmd(path)
except ftplib.all_errors as e:
print('_rm_recursive: Could not remove {0}: {1}'.format(path, e))
[docs]
def put(self, local_path, path, atomic=True):
"""
Put file from local filesystem to (s)FTP.
"""
self._connect()
if self.sftp:
self._sftp_put(local_path, path, atomic)
else:
self._ftp_put(local_path, path, atomic)
self._close()
def _sftp_put(self, local_path, path, atomic):
normpath = os.path.normpath(path)
directory = os.path.dirname(normpath)
self.conn.makedirs(directory)
if atomic:
tmp_path = os.path.join(directory, 'luigi-tmp-{:09d}'.format(random.randrange(0, 10_000_000_000)))
else:
tmp_path = normpath
self.conn.put(local_path, tmp_path)
if atomic:
self.conn.rename(tmp_path, normpath)
def _ftp_put(self, local_path, path, atomic):
normpath = os.path.normpath(path)
folder = os.path.dirname(normpath)
# create paths if do not exists
for subfolder in folder.split(os.sep):
if subfolder and subfolder not in self.conn.nlst():
self.conn.mkd(subfolder)
self.conn.cwd(subfolder)
# go back to ftp root folder
self.conn.cwd("/")
# random file name
if atomic:
tmp_path = folder + os.sep + 'luigi-tmp-%09d' % random.randrange(0, 10_000_000_000)
else:
tmp_path = normpath
self.conn.storbinary('STOR %s' % tmp_path, open(local_path, 'rb'))
if atomic:
self.conn.rename(tmp_path, normpath)
[docs]
def get(self, path, local_path):
"""
Download file from (s)FTP to local filesystem.
"""
normpath = os.path.normpath(local_path)
folder = os.path.dirname(normpath)
if folder and not os.path.exists(folder):
os.makedirs(folder)
tmp_local_path = local_path + '-luigi-tmp-%09d' % random.randrange(0, 10_000_000_000)
# download file
self._connect()
if self.sftp:
self._sftp_get(path, tmp_local_path)
else:
self._ftp_get(path, tmp_local_path)
self._close()
os.rename(tmp_local_path, local_path)
def _sftp_get(self, path, tmp_local_path):
self.conn.get(path, tmp_local_path)
def _ftp_get(self, path, tmp_local_path):
self.conn.retrbinary('RETR %s' % path, open(tmp_local_path, 'wb').write)
[docs]
def listdir(self, path='.'):
"""
Gets an list of the contents of path in (s)FTP
"""
self._connect()
if self.sftp:
contents = self._sftp_listdir(path)
else:
contents = self._ftp_listdir(path)
self._close()
return contents
def _sftp_listdir(self, path):
return self.conn.listdir(remotepath=path)
def _ftp_listdir(self, path):
return self.conn.nlst(path)
[docs]
class AtomicFtpFile(luigi.target.AtomicLocalFile):
"""
Simple class that writes to a temp file and upload to ftp on close().
Also cleans up the temp file if close is not invoked.
"""
def __init__(self, fs, path):
"""
Initializes an AtomicFtpfile instance.
:param fs:
:param path:
:type path: str
"""
self._fs = fs
super(AtomicFtpFile, self).__init__(path)
[docs]
def move_to_final_destination(self):
self._fs.put(self.tmp_path, self.path)
@property
def fs(self):
return self._fs
[docs]
class RemoteTarget(luigi.target.FileSystemTarget):
"""
Target used for reading from remote files.
The target is implemented using intermediate files on the local system.
On Python2, these files may not be cleaned up.
"""
def __init__(
self, path, host, format=None, username=None,
password=None, port=None, mtime=None, tls=False,
timeout=60, sftp=False, pysftp_conn_kwargs=None
):
if format is None:
format = luigi.format.get_default_format()
self.path = path
self.mtime = mtime
self.format = format
self.tls = tls
self.timeout = timeout
self.sftp = sftp
self._fs = RemoteFileSystem(host, username, password, port, tls, timeout, sftp, pysftp_conn_kwargs)
@property
def fs(self):
return self._fs
[docs]
def open(self, mode):
"""
Open the FileSystem target.
This method returns a file-like object which can either be read from or written to depending
on the specified mode.
:param mode: the mode `r` opens the FileSystemTarget in read-only mode, whereas `w` will
open the FileSystemTarget in write mode. Subclasses can implement
additional options.
:type mode: str
"""
if mode == 'w':
return self.format.pipe_writer(AtomicFtpFile(self._fs, self.path))
elif mode == 'r':
temppath = '{}-luigi-tmp-{:09d}'.format(
self.path.lstrip('/'), random.randrange(0, 10_000_000_000)
)
try:
# store reference to the TemporaryDirectory because it will be removed on GC
self.__temp_dir = tempfile.TemporaryDirectory(
prefix="luigi-contrib-ftp_"
)
except AttributeError:
# TemporaryDirectory only available in Python3, use old behaviour in Python2
# this file will not be cleaned up automatically
self.__tmp_path = os.path.join(
tempfile.gettempdir(), 'luigi-contrib-ftp', temppath
)
else:
self.__tmp_path = os.path.join(self.__temp_dir.name, temppath)
# download file to local
self._fs.get(self.path, self.__tmp_path)
return self.format.pipe_reader(
FileWrapper(io.BufferedReader(io.FileIO(self.__tmp_path, 'r')))
)
else:
raise Exception("mode must be 'r' or 'w' (got: %s)" % mode)
[docs]
def exists(self):
return self.fs.exists(self.path, self.mtime)
[docs]
def put(self, local_path, atomic=True):
self.fs.put(local_path, self.path, atomic)
[docs]
def get(self, local_path):
self.fs.get(self.path, local_path)