luigi.contrib.sqla module¶
Support for SQLAlchemy. Provides SQLAlchemyTarget for storing in databases supported by SQLAlchemy. The user would be responsible for installing the required database driver to connect using SQLAlchemy.
Minimal example of a job to copy data to database using SQLAlchemy is as shown below:
from sqlalchemy import String
import luigi
from luigi.contrib import sqla
class SQLATask(sqla.CopyToTable):
# columns defines the table schema, with each element corresponding
# to a column in the format (args, kwargs) which will be sent to
# the sqlalchemy.Column(*args, **kwargs)
columns = [
(["item", String(64)], {"primary_key": True}),
(["property", String(64)], {})
]
connection_string = "sqlite://" # in memory SQLite database
table = "item_property" # name of the table to store data
def rows(self):
for row in [("item1", "property1"), ("item2", "property2")]:
yield row
if __name__ == '__main__':
task = SQLATask()
luigi.build([task], local_scheduler=True)
If the target table where the data needs to be copied already exists, then the column schema definition can be skipped and instead the reflect flag can be set as True. Here is a modified version of the above example:
from sqlalchemy import String
import luigi
from luigi.contrib import sqla
class SQLATask(sqla.CopyToTable):
# If database table is already created, then the schema can be loaded
# by setting the reflect flag to True
reflect = True
connection_string = "sqlite://" # in memory SQLite database
table = "item_property" # name of the table to store data
def rows(self):
for row in [("item1", "property1"), ("item2", "property2")]:
yield row
if __name__ == '__main__':
task = SQLATask()
luigi.build([task], local_scheduler=True)
In the above examples, the data that needs to be copied was directly provided by overriding the rows method. Alternately, if the data comes from another task, the modified example would look as shown below:
from sqlalchemy import String
import luigi
from luigi.contrib import sqla
from luigi.mock import MockTarget
class BaseTask(luigi.Task):
def output(self):
return MockTarget("BaseTask")
def run(self):
out = self.output().open("w")
TASK_LIST = ["item%d\tproperty%d\n" % (i, i) for i in range(10)]
for task in TASK_LIST:
out.write(task)
out.close()
class SQLATask(sqla.CopyToTable):
# columns defines the table schema, with each element corresponding
# to a column in the format (args, kwargs) which will be sent to
# the sqlalchemy.Column(*args, **kwargs)
columns = [
(["item", String(64)], {"primary_key": True}),
(["property", String(64)], {})
]
connection_string = "sqlite://" # in memory SQLite database
table = "item_property" # name of the table to store data
def requires(self):
return BaseTask()
if __name__ == '__main__':
task1, task2 = SQLATask(), BaseTask()
luigi.build([task1, task2], local_scheduler=True)
In the above example, the output from BaseTask is copied into the database. Here we did not have to implement the rows method because by default rows implementation assumes every line is a row with column values separated by a tab. One can define column_separator option for the task if the values are say comma separated instead of tab separated.
You can pass in database specific connection arguments by setting the connect_args dictionary. The options will be passed directly to the DBAPI’s connect method as keyword arguments.
The other option to sqla.CopyToTable that can be of help with performance aspect is the chunk_size. The default is 5000. This is the number of rows that will be inserted in a transaction at a time. Depending on the size of the inserts, this value can be tuned for performance.
See here for a tutorial on building task pipelines using luigi and using SQLAlchemy in workflow pipelines.
Author: Gouthaman Balaraman Date: 01/02/2015
- class luigi.contrib.sqla.SQLAlchemyTarget(connection_string, target_table, update_id, echo=False, connect_args=None)[source]¶
Bases:
Target
Database target using SQLAlchemy.
This will rarely have to be directly instantiated by the user.
Typical usage would be to override luigi.contrib.sqla.CopyToTable class to create a task to write to the database.
Constructor for the SQLAlchemyTarget.
- Parameters:
connection_string (str) – SQLAlchemy connection string
target_table (str) – The table name for the data
update_id (str) – An identifier for this data set
echo (bool) – Flag to setup SQLAlchemy logging
connect_args (dict) – A dictionary of connection arguments
- Returns:
- marker_table = None¶
- class Connection(engine, pid)¶
Bases:
tuple
Create new instance of Connection(engine, pid)
- engine¶
Alias for field number 0
- pid¶
Alias for field number 1
- property engine¶
Return an engine instance, creating it if it doesn’t exist.
Recreate the engine connection if it wasn’t originally created by the current process.
- class luigi.contrib.sqla.CopyToTable(*args, **kwargs)[source]¶
Bases:
Task
An abstract task for inserting a data set into SQLAlchemy RDBMS
Usage:
subclass and override the required connection_string, table and columns attributes.
optionally override the schema attribute to use a different schema for the target table.
- echo = False¶
- connect_args = {}¶
- abstract property connection_string¶
- abstract property table¶
- columns = []¶
- schema = ''¶
- column_separator = '\t'¶
- chunk_size = 5000¶
- reflect = False¶
- create_table(engine)[source]¶
Override to provide code for creating the target table.
By default it will be created using types specified in columns. If the table exists, then it binds to the existing table.
If overridden, use the provided connection object for setting up the table in order to create the table and insert data using the same transaction. :param engine: The sqlalchemy engine instance :type engine: object
- output()[source]¶
The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Target
or a list ofTarget
instances.- Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
- rows()[source]¶
Return/yield tuples or lists corresponding to each row to be inserted.
This method can be overridden for custom file types or formats.
- copy(conn, ins_rows, table_bound)[source]¶
This method does the actual insertion of the rows of data given by ins_rows into the database. A task that needs row updates instead of insertions should overload this method. :param conn: The sqlalchemy connection object :param ins_rows: The dictionary of rows with the keys in the format _<column_name>. For example if you have a table with a column name “property”, then the key in the dictionary would be “_property”. This format is consistent with the bindparam usage in sqlalchemy. :param table_bound: The object referring to the table :return: