luigi.contrib.s3
Implementation of Simple Storage Service support.
S3Target is a subclass of the Target class to support S3 file
system operations. The boto3 library is required to use S3 targets.
Classes
|
An S3 file that writes to a temp file and puts to S3 on close. |
|
|
|
boto3-powered S3 client. |
|
Deprecated. |
|
An external task that requires the existence of EMR output in S3. |
|
Defines a target directory with a flag-file (defaults to _SUCCESS) used to signify job success. |
|
An external task that requires the existence of EMR output in S3. |
|
A external task that to require existence of a path in S3. |
|
Target S3 file object |
Exceptions
- class luigi.contrib.s3.S3Client(aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, **kwargs)[source]
boto3-powered S3 client.
- DEFAULT_PART_SIZE = 8388608
- DEFAULT_THREADS = 100
- property s3
- remove(path, recursive=True)[source]
Remove a file or directory from S3. :param path: File or directory to remove :param recursive: Boolean indicator to remove object and children :return: Boolean indicator denoting success of the removal of 1 or more files
- move(source_path, destination_path, **kwargs)[source]
Rename/move an object from one S3 location to another. :param source_path: The s3:// path of the directory or key to copy from :param destination_path: The s3:// path of the directory or key to copy to :param kwargs: Keyword arguments are passed to the boto3 function copy
- put(local_path, destination_s3_path, **kwargs)[source]
Put an object stored locally to an S3 path. :param local_path: Path to source local file :param destination_s3_path: URL for target S3 location :param kwargs: Keyword arguments are passed to the boto function put_object
- put_string(content, destination_s3_path, **kwargs)[source]
Put a string to an S3 path. :param content: Data str :param destination_s3_path: URL for target S3 location :param kwargs: Keyword arguments are passed to the boto3 function put_object
- put_multipart(local_path, destination_s3_path, part_size=8388608, **kwargs)[source]
Put an object stored locally to an S3 path using S3 multi-part upload (for files > 8Mb). :param local_path: Path to source local file :param destination_s3_path: URL for target S3 location :param part_size: Part size in bytes. Default: 8388608 (8MB) :param kwargs: Keyword arguments are passed to the boto function upload_fileobj as ExtraArgs
- copy(source_path, destination_path, threads=100, start_time=None, end_time=None, part_size=8388608, **kwargs)[source]
Copy object(s) from one S3 location to another. Works for individual keys or entire directories. When files are larger than part_size, multipart uploading will be used. :param source_path: The s3:// path of the directory or key to copy from :param destination_path: The s3:// path of the directory or key to copy to :param threads: Optional argument to define the number of threads to use when copying (min: 3 threads) :param start_time: Optional argument to copy files with modified dates after start_time :param end_time: Optional argument to copy files with modified dates before end_time :param part_size: Part size in bytes :param kwargs: Keyword arguments are passed to the boto function copy as ExtraArgs :returns tuple (number_of_files_copied, total_size_copied_in_bytes)
- get(s3_path, destination_local_path)[source]
Get an object stored in S3 and write it to a local path.
- get_as_bytes(s3_path)[source]
Get the contents of an object stored in S3 as bytes
- Parameters:
s3_path – URL for target S3 location
- Returns:
File contents as pure bytes
- get_as_string(s3_path, encoding='utf-8')[source]
Get the contents of an object stored in S3 as string.
- Parameters:
s3_path – URL for target S3 location
encoding – Encoding to decode bytes to string
- Returns:
File contents as a string
- is_dir(path)
Is the parameter S3 path a directory?
- mkdir(path, parents=True, raise_if_exists=False)[source]
Create directory at location
pathCreates the directory at
pathand implicitly create parent directories if they do not already exist.- Parameters:
path (str) – a path within the FileSystem to create as a directory.
parents (bool) – Create parent directories when necessary. When parents=False and the parent directory doesn’t exist, raise luigi.target.MissingParentDirectory
raise_if_exists (bool) – raise luigi.target.FileAlreadyExists if the folder already exists.
- listdir(path, start_time=None, end_time=None, return_key=False)[source]
Get an iterable with S3 folder contents. Iterable contains absolute paths for which queried path is a prefix.
- Parameters:
path – URL for target S3 location
start_time – Optional argument to list files with modified (offset aware) datetime after start_time
end_time – Optional argument to list files with modified (offset aware) datetime before end_time
return_key – Optional argument, when set to True will return boto3’s ObjectSummary (instead of the filename)
- list(path, start_time=None, end_time=None, return_key=False)[source]
Get an iterable with S3 folder contents. Iterable contains paths relative to queried path.
- Parameters:
path – URL for target S3 location
start_time – Optional argument to list files with modified (offset aware) datetime after start_time
end_time – Optional argument to list files with modified (offset aware) datetime before end_time
return_key – Optional argument, when set to True will return boto3’s ObjectSummary (instead of the filename)
- class luigi.contrib.s3.AtomicS3File(path, s3_client, **kwargs)[source]
An S3 file that writes to a temp file and puts to S3 on close.
- Parameters:
kwargs – Keyword arguments are passed to the boto function initiate_multipart_upload
- class luigi.contrib.s3.S3Target(path, format=None, client=None, **kwargs)[source]
Target S3 file object
- Parameters:
kwargs – Keyword arguments are passed to the boto function initiate_multipart_upload
Initializes a FileSystemTarget instance.
- Parameters:
path – the path associated with this FileSystemTarget.
- fs = None
- open(mode='r')[source]
Open the FileSystem target.
This method returns a file-like object which can either be read from or written to depending on the specified mode.
- Parameters:
mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options. Using b is not supported; initialize with format=Nop instead.
- class luigi.contrib.s3.S3FlagTarget(path, format=None, client=None, flag='_SUCCESS')[source]
Defines a target directory with a flag-file (defaults to _SUCCESS) used to signify job success.
This checks for two things:
the path exists (just like the S3Target)
the _SUCCESS file exists within the directory.
Because Hadoop outputs into a directory and not a single file, the path is assumed to be a directory.
This is meant to be a handy alternative to AtomicS3File.
The AtomicFile approach can be burdensome for S3 since there are no directories, per se.
If we have 1,000,000 output files, then we have to rename 1,000,000 objects.
Initializes a S3FlagTarget.
- Parameters:
path (str) – the directory where the files are stored.
client
flag (str)
- fs = None
- class luigi.contrib.s3.S3EmrTarget(*args, **kwargs)[source]
Deprecated. Use
S3FlagTargetInitializes a S3FlagTarget.
- Parameters:
path (str) – the directory where the files are stored.
client
flag (str)
- class luigi.contrib.s3.S3PathTask(*args, **kwargs)[source]
A external task that to require existence of a path in S3.
- path
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- output()[source]
The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
- class luigi.contrib.s3.S3EmrTask(*args, **kwargs)[source]
An external task that requires the existence of EMR output in S3.
- path
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- output()[source]
The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
- class luigi.contrib.s3.S3FlagTask(*args, **kwargs)[source]
An external task that requires the existence of EMR output in S3.
- path
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- flag
Class to parse optional parameters.
- output()[source]
The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output