luigi.contrib.s3 module¶
Implementation of Simple Storage Service support.
S3Target
is a subclass of the Target class to support S3 file
system operations. The boto3 library is required to use S3 targets.
-
class
luigi.contrib.s3.
S3Client
(aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, **kwargs)[source]¶ Bases:
luigi.target.FileSystem
boto3-powered S3 client.
-
DEFAULT_PART_SIZE
= 8388608¶
-
DEFAULT_THREADS
= 100¶
-
s3
¶
-
remove
(path, recursive=True)[source]¶ Remove a file or directory from S3. :param path: File or directory to remove :param recursive: Boolean indicator to remove object and children :return: Boolean indicator denoting success of the removal of 1 or more files
-
move
(source_path, destination_path, **kwargs)[source]¶ Rename/move an object from one S3 location to another. :param source_path: The s3:// path of the directory or key to copy from :param destination_path: The s3:// path of the directory or key to copy to :param kwargs: Keyword arguments are passed to the boto3 function copy
-
put
(local_path, destination_s3_path, **kwargs)[source]¶ Put an object stored locally to an S3 path. :param local_path: Path to source local file :param destination_s3_path: URL for target S3 location :param kwargs: Keyword arguments are passed to the boto function put_object
-
put_string
(content, destination_s3_path, **kwargs)[source]¶ Put a string to an S3 path. :param content: Data str :param destination_s3_path: URL for target S3 location :param kwargs: Keyword arguments are passed to the boto3 function put_object
-
put_multipart
(local_path, destination_s3_path, part_size=8388608, **kwargs)[source]¶ Put an object stored locally to an S3 path using S3 multi-part upload (for files > 8Mb). :param local_path: Path to source local file :param destination_s3_path: URL for target S3 location :param part_size: Part size in bytes. Default: 8388608 (8MB) :param kwargs: Keyword arguments are passed to the boto function upload_fileobj as ExtraArgs
-
copy
(source_path, destination_path, threads=100, start_time=None, end_time=None, part_size=8388608, **kwargs)[source]¶ Copy object(s) from one S3 location to another. Works for individual keys or entire directories. When files are larger than part_size, multipart uploading will be used. :param source_path: The s3:// path of the directory or key to copy from :param destination_path: The s3:// path of the directory or key to copy to :param threads: Optional argument to define the number of threads to use when copying (min: 3 threads) :param start_time: Optional argument to copy files with modified dates after start_time :param end_time: Optional argument to copy files with modified dates before end_time :param part_size: Part size in bytes :param kwargs: Keyword arguments are passed to the boto function copy as ExtraArgs :returns tuple (number_of_files_copied, total_size_copied_in_bytes)
-
get
(s3_path, destination_local_path)[source]¶ Get an object stored in S3 and write it to a local path.
-
get_as_bytes
(s3_path)[source]¶ Get the contents of an object stored in S3 as bytes
Parameters: s3_path – URL for target S3 location Returns: File contents as pure bytes
-
get_as_string
(s3_path, encoding='utf-8')[source]¶ Get the contents of an object stored in S3 as string.
Parameters: - s3_path – URL for target S3 location
- encoding – Encoding to decode bytes to string
Returns: File contents as a string
-
is_dir
(path)¶ Is the parameter S3 path a directory?
-
mkdir
(path, parents=True, raise_if_exists=False)[source]¶ Create directory at location
path
Creates the directory at
path
and implicitly create parent directories if they do not already exist.Parameters: - path (str) – a path within the FileSystem to create as a directory.
- parents (bool) – Create parent directories when necessary. When parents=False and the parent directory doesn’t exist, raise luigi.target.MissingParentDirectory
- raise_if_exists (bool) – raise luigi.target.FileAlreadyExists if the folder already exists.
-
listdir
(path, start_time=None, end_time=None, return_key=False)[source]¶ Get an iterable with S3 folder contents. Iterable contains paths relative to queried path. :param path: URL for target S3 location :param start_time: Optional argument to list files with modified (offset aware) datetime after start_time :param end_time: Optional argument to list files with modified (offset aware) datetime before end_time :param return_key: Optional argument, when set to True will return boto3’s ObjectSummary (instead of the filename)
-
-
class
luigi.contrib.s3.
AtomicS3File
(path, s3_client, **kwargs)[source]¶ Bases:
luigi.target.AtomicLocalFile
An S3 file that writes to a temp file and puts to S3 on close.
Parameters: kwargs – Keyword arguments are passed to the boto function initiate_multipart_upload
-
class
luigi.contrib.s3.
S3Target
(path, format=None, client=None, **kwargs)[source]¶ Bases:
luigi.target.FileSystemTarget
Target S3 file object
Parameters: kwargs – Keyword arguments are passed to the boto function initiate_multipart_upload -
fs
= None¶
-
open
(mode='r')[source]¶ Open the FileSystem target.
This method returns a file-like object which can either be read from or written to depending on the specified mode.
Parameters: mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options. Using b is not supported; initialize with format=Nop instead.
-
-
class
luigi.contrib.s3.
S3FlagTarget
(path, format=None, client=None, flag='_SUCCESS')[source]¶ Bases:
luigi.contrib.s3.S3Target
Defines a target directory with a flag-file (defaults to _SUCCESS) used to signify job success.
This checks for two things:
- the path exists (just like the S3Target)
- the _SUCCESS file exists within the directory.
Because Hadoop outputs into a directory and not a single file, the path is assumed to be a directory.
This is meant to be a handy alternative to AtomicS3File.
The AtomicFile approach can be burdensome for S3 since there are no directories, per se.
If we have 1,000,000 output files, then we have to rename 1,000,000 objects.
Initializes a S3FlagTarget.
Parameters: - path (str) – the directory where the files are stored.
- client –
- flag (str) –
-
fs
= None¶
-
class
luigi.contrib.s3.
S3EmrTarget
(*args, **kwargs)[source]¶ Bases:
luigi.contrib.s3.S3FlagTarget
Deprecated. Use
S3FlagTarget
-
class
luigi.contrib.s3.
S3PathTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
A external task that to require existence of a path in S3.
-
path
= Parameter¶
-
output
()[source]¶ The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Target
or a list ofTarget
instances.- Implementation note
- If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
-
-
class
luigi.contrib.s3.
S3EmrTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
An external task that requires the existence of EMR output in S3.
-
path
= Parameter¶
-
output
()[source]¶ The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Target
or a list ofTarget
instances.- Implementation note
- If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
-
-
class
luigi.contrib.s3.
S3FlagTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
An external task that requires the existence of EMR output in S3.
-
path
= Parameter¶
-
flag
= OptionalParameter (defaults to None)¶
-
output
()[source]¶ The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Target
or a list ofTarget
instances.- Implementation note
- If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
-