airflow.providers.openlineage.utils.sql

Module Contents

Classes

ColumnIndex

Enumerates the indices of columns in information schema view.

TableSchema

Temporary object used to construct OpenLineage Dataset.

Functions

get_table_schemas(hook, namespace, schema, database, ...)

Query database for table schemas.

parse_query_result(cursor)

Fetch results from DB-API 2.0 cursor and creates list of table schemas.

create_information_schema_query(columns, ...[, ...])

Create query for getting table schemas from information schema.

create_filter_clauses(mapping, information_schema_table)

Create comprehensive filter clauses for all tables in one database.

Attributes

TablesHierarchy

class airflow.providers.openlineage.utils.sql.ColumnIndex[source]

Bases: enum.IntEnum

Enumerates the indices of columns in information schema view.

SCHEMA = 0[source]
TABLE_NAME = 1[source]
COLUMN_NAME = 2[source]
ORDINAL_POSITION = 3[source]
UDT_NAME = 4[source]
DATABASE = 5[source]
airflow.providers.openlineage.utils.sql.TablesHierarchy[source]
class airflow.providers.openlineage.utils.sql.TableSchema[source]

Temporary object used to construct OpenLineage Dataset.

table: str[source]
schema: str | None[source]
database: str | None[source]
fields: list[openlineage.client.facet.SchemaField][source]
to_dataset(namespace, database=None, schema=None)[source]
airflow.providers.openlineage.utils.sql.get_table_schemas(hook, namespace, schema, database, in_query, out_query)[source]

Query database for table schemas.

Uses provided hook. Responsibility to provide queries for this function is on particular extractors. If query for input or output table isn’t provided, the query is skipped.

airflow.providers.openlineage.utils.sql.parse_query_result(cursor)[source]

Fetch results from DB-API 2.0 cursor and creates list of table schemas.

For each row it creates TableSchema.

airflow.providers.openlineage.utils.sql.create_information_schema_query(columns, information_schema_table_name, tables_hierarchy, uppercase_names=False, use_flat_cross_db_query=False, sqlalchemy_engine=None)[source]

Create query for getting table schemas from information schema.

airflow.providers.openlineage.utils.sql.create_filter_clauses(mapping, information_schema_table, uppercase_names=False)[source]

Create comprehensive filter clauses for all tables in one database.

Parameters
  • mapping (dict) – a nested dictionary of database, schema names and list of tables in each

  • information_schema_table (sqlalchemy.Table) – sqlalchemy.Table instance used to construct clauses For most SQL dbs it contains table_name and table_schema columns, therefore it is expected the table has them defined.

  • uppercase_names (bool) – if True use schema and table names uppercase

Was this entry helpful?