airflow.providers.openlineage.sqlparser

Module Contents

Classes

GetTableSchemasParams

get_table_schemas params.

DatabaseInfo

Contains database specific information needed to process SQL statement parse result.

SQLParser

Interface for openlineage-sql.

Attributes

DEFAULT_NAMESPACE

DEFAULT_INFORMATION_SCHEMA_COLUMNS

DEFAULT_INFORMATION_SCHEMA_TABLE_NAME

airflow.providers.openlineage.sqlparser.DEFAULT_NAMESPACE = 'default'[source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_COLUMNS = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_TABLE_NAME = 'information_schema.columns'[source]
airflow.providers.openlineage.sqlparser.default_normalize_name_method(name)[source]
class airflow.providers.openlineage.sqlparser.GetTableSchemasParams[source]

Bases: airflow.typing_compat.TypedDict

get_table_schemas params.

normalize_name: Callable[[str], str][source]
is_cross_db: bool[source]
information_schema_columns: list[str][source]
information_schema_table: str[source]
is_uppercase_names: bool[source]
database: str | None[source]
class airflow.providers.openlineage.sqlparser.DatabaseInfo[source]

Contains database specific information needed to process SQL statement parse result.

Parameters
  • scheme – Scheme part of URI in OpenLineage namespace.

  • authority – Authority part of URI in OpenLineage namespace. For most cases it should return {host}:{port} part of Airflow connection. See: https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

  • database – Takes precedence over parsed database name.

  • information_schema_columns – List of columns names from information schema table.

  • information_schema_table_name – Information schema table name.

  • is_information_schema_cross_db – Specifies if information schema contains cross-database data.

  • is_uppercase_names – Specifies if database accepts only uppercase names (e.g. Snowflake).

  • normalize_name_method – Method to normalize database, schema and table names. Defaults to name.lower().

scheme: str[source]
authority: str | None[source]
database: str | None[source]
information_schema_columns: list[str][source]
information_schema_table_name: str[source]
is_information_schema_cross_db: bool = False[source]
is_uppercase_names: bool = False[source]
normalize_name_method: Callable[[str], str][source]
class airflow.providers.openlineage.sqlparser.SQLParser(dialect=None, default_schema=None)[source]

Interface for openlineage-sql.

Parameters
  • dialect (str | None) – dialect specific to the database

  • default_schema (str | None) – schema applied to each table with no schema parsed

parse(sql)[source]

Parse a single or a list of SQL statements.

parse_table_schemas(hook, inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None, sqlalchemy_engine=None)[source]

Parse schemas for input and output tables.

generate_openlineage_metadata_from_sql(sql, hook, database_info, database=None, sqlalchemy_engine=None)[source]

Parses SQL statement(s) and generates OpenLineage metadata.

Generated OpenLineage metadata contains:

  • input tables with schemas parsed

  • output tables with schemas parsed

  • run facets

  • job facets.

Parameters
  • sql (list[str] | str) – a SQL statement or list of SQL statement to be parsed

  • hook (airflow.hooks.base.BaseHook) – Airflow Hook used to connect to the database

  • database_info (DatabaseInfo) – database specific information

  • database (str | None) – when passed it takes precedence over parsed database name

  • sqlalchemy_engine (sqlalchemy.engine.Engine | None) – when passed, engine’s dialect is used to compile SQL queries

static create_namespace(database_info)[source]
classmethod normalize_sql(sql)[source]

Makes sure to return a semicolon-separated SQL statements.

classmethod split_sql_string(sql)[source]

Split SQL string into list of statements.

Tries to use DbApiHook.split_sql_string if available. Otherwise, uses the same logic.

classmethod create_information_schema_query(tables, normalize_name, is_cross_db, information_schema_columns, information_schema_table, is_uppercase_names, database=None, sqlalchemy_engine=None)[source]

Creates SELECT statement to query information schema table.

Was this entry helpful?