airflow.providers.openlineage.sqlparser

Module Contents

Classes

GetTableSchemasParams

get_table_schemas params.

DatabaseInfo

Contains database specific information needed to process SQL statement parse result.

SQLParser

Interface for openlineage-sql.

Functions

default_normalize_name_method(name)

from_table_meta(table_meta, database, namespace, ...)

Attributes

DEFAULT_NAMESPACE

DEFAULT_INFORMATION_SCHEMA_COLUMNS

DEFAULT_INFORMATION_SCHEMA_TABLE_NAME

airflow.providers.openlineage.sqlparser.DEFAULT_NAMESPACE = 'default'[source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_COLUMNS = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_TABLE_NAME = 'information_schema.columns'[source]
airflow.providers.openlineage.sqlparser.default_normalize_name_method(name)[source]
class airflow.providers.openlineage.sqlparser.GetTableSchemasParams[source]

Bases: airflow.typing_compat.TypedDict

get_table_schemas params.

normalize_name: Callable[[str], str][source]
is_cross_db: bool[source]
information_schema_columns: list[str][source]
information_schema_table: str[source]
use_flat_cross_db_query: bool[source]
is_uppercase_names: bool[source]
database: str | None[source]
class airflow.providers.openlineage.sqlparser.DatabaseInfo[source]

Contains database specific information needed to process SQL statement parse result.

Parameters
  • scheme – Scheme part of URI in OpenLineage namespace.

  • authority – Authority part of URI in OpenLineage namespace. For most cases it should return {host}:{port} part of Airflow connection. See: https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

  • database – Takes precedence over parsed database name.

  • information_schema_columns – List of columns names from information schema table.

  • information_schema_table_name – Information schema table name.

  • use_flat_cross_db_query – Specifies if single information schema table should be used for cross-database queries (e.g. for Redshift).

  • is_information_schema_cross_db – Specifies if information schema contains cross-database data.

  • is_uppercase_names – Specifies if database accepts only uppercase names (e.g. Snowflake).

  • normalize_name_method – Method to normalize database, schema and table names. Defaults to name.lower().

scheme: str[source]
authority: str | None[source]
database: str | None[source]
information_schema_columns: list[str][source]
information_schema_table_name: str[source]
use_flat_cross_db_query: bool = False[source]
is_information_schema_cross_db: bool = False[source]
is_uppercase_names: bool = False[source]
normalize_name_method: Callable[[str], str][source]
airflow.providers.openlineage.sqlparser.from_table_meta(table_meta, database, namespace, is_uppercase)[source]
class airflow.providers.openlineage.sqlparser.SQLParser(dialect=None, default_schema=None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Interface for openlineage-sql.

Parameters
  • dialect (str | None) – dialect specific to the database

  • default_schema (str | None) – schema applied to each table with no schema parsed

parse(sql)[source]

Parse a single or a list of SQL statements.

parse_table_schemas(hook, inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None, sqlalchemy_engine=None)[source]

Parse schemas for input and output tables.

get_metadata_from_parser(inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None)[source]
attach_column_lineage(datasets, database, parse_result)[source]

Attaches column lineage facet to the list of datasets.

Note that currently each dataset has the same column lineage information set. This would be a matter of change after OpenLineage SQL Parser improvements.

generate_openlineage_metadata_from_sql(sql, hook, database_info, database=None, sqlalchemy_engine=None, use_connection=True)[source]

Parse SQL statement(s) and generate OpenLineage metadata.

Generated OpenLineage metadata contains:

  • input tables with schemas parsed

  • output tables with schemas parsed

  • run facets

  • job facets.

Parameters
  • sql (list[str] | str) – a SQL statement or list of SQL statement to be parsed

  • hook (airflow.hooks.base.BaseHook) – Airflow Hook used to connect to the database

  • database_info (DatabaseInfo) – database specific information

  • database (str | None) – when passed it takes precedence over parsed database name

  • sqlalchemy_engine (sqlalchemy.engine.Engine | None) – when passed, engine’s dialect is used to compile SQL queries

static create_namespace(database_info)[source]
classmethod normalize_sql(sql)[source]

Make sure to return a semicolon-separated SQL statement.

classmethod split_sql_string(sql)[source]

Split SQL string into list of statements.

Tries to use DbApiHook.split_sql_string if available. Otherwise, uses the same logic.

create_information_schema_query(tables, normalize_name, is_cross_db, information_schema_columns, information_schema_table, is_uppercase_names, use_flat_cross_db_query, database=None, sqlalchemy_engine=None)[source]

Create SELECT statement to query information schema table.

Was this entry helpful?