luigi.contrib.redshift
Classes
|
An task for killing any open Redshift sessions in a given database. |
|
Generic task to generate a manifest file that can be used in S3CopyToTable in order to copy multiple files from your s3 folder into a redshift table at once. |
|
Template task for querying an Amazon Redshift database |
|
Target for a resource in Redshift. |
|
Template task for running UNLOAD on an Amazon Redshift database |
|
Template task for inserting a JSON data set into Redshift from s3. |
|
Template task for inserting a data set into Redshift from s3. |
- class luigi.contrib.redshift.RedshiftTarget(host, database, user, password, table, update_id, port=None)[source]
Target for a resource in Redshift.
Redshift is similar to postgres with a few adjustments required by redshift.
- Args:
host (str): Postgres server address. Possibly a host:port string. database (str): Database name user (str): Database user password (str): Password for specified user update_id (str): An identifier for this data set port (int): Postgres server port.
- marker_table = 'table_updates'
- DEFAULT_DB_PORT = 5439
- use_db_timestamps = False
- class luigi.contrib.redshift.S3CopyToTable(*args, **kwargs)[source]
Template task for inserting a data set into Redshift from s3.
Usage:
Subclass and override the required attributes:
host,
database,
user,
password,
table,
columns,
s3_load_path.
You can also override the attributes provided by the CredentialsMixin if they are not supplied by your configuration or environment variables.
- abstract property copy_options
Add extra copy options, for example:
TIMEFORMAT ‘auto’
IGNOREHEADER 1
TRUNCATECOLUMNS
IGNOREBLANKLINES
DELIMITER ‘ ‘
- property prune_table
Override to set equal to the name of the table which is to be pruned. Intended to be used in conjunction with prune_column and prune_date i.e. copy to temp table, prune production table to prune_column with a date greater than prune_date, then insert into production table from temp table
- property prune_column
Override to set equal to the column of the prune_table which is to be compared Intended to be used in conjunction with prune_table and prune_date i.e. copy to temp table, prune production table to prune_column with a date greater than prune_date, then insert into production table from temp table
- property prune_date
Override to set equal to the date by which prune_column is to be compared Intended to be used in conjunction with prune_table and prune_column i.e. copy to temp table, prune production table to prune_column with a date greater than prune_date, then insert into production table from temp table
- property table_attributes
Add extra table attributes, for example:
DISTSTYLE KEY DISTKEY (MY_FIELD) SORTKEY (MY_FIELD_2, MY_FIELD_3)
- property table_constraints
Add extra table constraints, for example:
PRIMARY KEY (MY_FIELD, MY_FIELD_2) UNIQUE KEY (MY_FIELD_3)
- property do_truncate_table
Return True if table should be truncated before copying new data in.
- do_prune()[source]
Return True if prune_table, prune_column, and prune_date are implemented. If only a subset of prune variables are override, an exception is raised to remind the user to implement all or none. Prune (data newer than prune_date deleted) before copying new data in.
- property table_type
Return table type (i.e. ‘temp’).
- property queries
Override to return a list of queries to be executed in order.
- create_table(connection)[source]
Override to provide code for creating the target table.
By default it will be created using types (optionally) specified in columns.
If overridden, use the provided connection object for setting up the table in order to create the table and insert data using the same transaction.
- run()[source]
If the target table doesn’t exist, self.create_table will be called to attempt to create the table.
- copy(cursor, f)[source]
Defines copying from s3 into redshift.
If both key-based and role-based credentials are provided, role-based will be used.
- output()[source]
Returns a RedshiftTarget representing the inserted dataset.
Normally you don’t override this.
- init_copy(connection)[source]
Perform pre-copy sql - such as creating table, truncating, or removing data older than x.
- class luigi.contrib.redshift.S3CopyJSONToTable(*args, **kwargs)[source]
Template task for inserting a JSON data set into Redshift from s3.
Usage:
Subclass and override the required attributes:
host,
database,
user,
password,
table,
columns,
s3_load_path,
jsonpath,
copy_json_options.
You can also override the attributes provided by the CredentialsMixin if they are not supplied by your configuration or environment variables.
- abstract property jsonpath
Override the jsonpath schema location for the table.
- abstract property copy_json_options
Add extra copy options, for example:
GZIP
LZOP
- class luigi.contrib.redshift.RedshiftManifestTask(*args, **kwargs)[source]
Generic task to generate a manifest file that can be used in S3CopyToTable in order to copy multiple files from your s3 folder into a redshift table at once.
For full description on how to use the manifest file see http://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html
Usage:
- requires parameters
- path - s3 path to the generated manifest file, including the
name of the generated file to be copied into a redshift table
folder_paths - s3 paths to the folders containing files you wish to be copied
Output:
generated manifest file
- folder_paths
Parameter whose value is a
str, and a base class for other parameter types.Parameters are objects set on the Task class level to make it possible to parameterize tasks. For instance:
class MyTask(luigi.Task): foo = luigi.Parameter() class RequiringTask(luigi.Task): def requires(self): return MyTask(foo="hello") def run(self): print(self.requires().foo) # prints "hello"
This makes it possible to instantiate multiple tasks, eg
MyTask(foo='bar')andMyTask(foo='baz'). The task will then have thefooattribute set appropriately.When a task is instantiated, it will first use any argument as the value of the parameter, eg. if you instantiate
a = TaskA(x=44)thena.x == 44. When the value is not provided, the value will be resolved in this order of falling priority:Any value provided on the command line:
To the root task (eg.
--param xyz)Then to the class, using the qualified task name syntax (eg.
--TaskA-param xyz).
With
[TASK_NAME]>PARAM_NAME: <serialized value>syntax. See Parameters from config IngestionAny default value set using the
defaultflag.
Parameter objects may be reused, but you must then set the
positional=Falseflag.
- text_target = True
- class luigi.contrib.redshift.KillOpenRedshiftSessions(*args, **kwargs)[source]
An task for killing any open Redshift sessions in a given database. This is necessary to prevent open user sessions with transactions against the table from blocking drop or truncate table commands.
Usage:
Subclass and override the required host, database, user, and password attributes.
- connection_reset_wait_seconds
Parameter whose value is an
int.
- abstract property host
- abstract property database
- abstract property user
- abstract property password
- property update_id
This update id will be a unique identifier for this insert on this table.
- class luigi.contrib.redshift.RedshiftQuery(*args, **kwargs)[source]
Template task for querying an Amazon Redshift database
Usage: Subclass and override the required host, database, user, password, table, and query attributes.
Override the run method if your use case requires some action with the query result.
Task instances require a dynamic update_id, e.g. via parameter(s), otherwise the query will only execute once
To customize the query signature as recorded in the database marker table, override the update_id property.
- class luigi.contrib.redshift.RedshiftUnloadTask(*args, **kwargs)[source]
Template task for running UNLOAD on an Amazon Redshift database
Usage: Subclass and override the required host, database, user, password, table, and query attributes. Optionally, override the autocommit attribute to run the query in autocommit mode - this is necessary to run VACUUM for example. Override the run method if your use case requires some action with the query result. Task instances require a dynamic update_id, e.g. via parameter(s), otherwise the query will only execute once To customize the query signature as recorded in the database marker table, override the update_id property. You can also override the attributes provided by the CredentialsMixin if they are not supplied by your configuration or environment variables.
- property s3_unload_path
Override to return the load path.
- property unload_options
Add extra or override default unload options:
- property unload_query
Default UNLOAD command