SQL Operators

These operators perform various queries against a SQL database, including column- and table-level data quality checks.

Check SQL Table Columns

Use the SQLColumnCheckOperator to run data quality checks against columns of a given table. As well as a connection ID and table, a column_mapping describing the relationship between columns and tests to run must be supplied. An example column mapping is a set of three nested dictionaries and looks like:

column_mapping = {
    "col_name": {
        "null_check": {
            "equal_to": 0,
        },
        "min": {
            "greater_than": 5,
            "leq_to": 10,
            "tolerance": 0.2,
        },
        "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
    }
}

Where col_name is the name of the column to run checks on, and each entry in its dictionary is a check. The valid checks are: - null_check: checks the number of NULL values in the column - distinct_check: checks the COUNT of values in the column that are distinct - unique_check: checks the number of distinct values in a column against the number of rows - min: checks the minimum value in the column - max: checks the maximum value in the column

Each entry in the check's dictionary is either a condition for success of the check or the tolerance. The conditions for success are: - greater_than - geq_to - less_than - leq_to - equal_to

When specifying conditions, equal_to is not compatible with other conditions. Both a lower- and an upper- bound condition may be specified in the same check. The tolerance is a percentage that the result may be out of bounds but still considered successful.

The below example demonstrates how to instantiate the SQLColumnCheckOperator task.

tests/system/providers/common/sql/example_sql_column_table_check.py[source]

column_check = SQLColumnCheckOperator(
    task_id="column_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    column_mapping={
        "id": {
            "null_check": {
                "equal_to": 0,
                "tolerance": 0,
            },
            "distinct_check": {
                "equal_to": 1,
            },
        }
    },
)

Check SQL Table Values

Use the SQLTableCheckOperator to run data quality checks against a given table. As well as a connection ID and table, a checks dictionary describing the relationship between the table and tests to run must be supplied. An example checks argument is a set of two nested dictionaries and looks like:

checks = (
    {
        "row_count_check": {
            "check_statement": "COUNT(*) = 1000",
        },
        "column_sum_check": {"check_statement": "col_a + col_b < col_c"},
    },
)

The first set of keys are the check names, which are referenced in the templated query the operator builds. The dictionary key under the check name must be check_statement, with the value a SQL statement that resolves to a boolean (this can be any string or int that resolves to a boolean in airflow.operators.sql.parse_boolean).

The below example demonstrates how to instantiate the SQLTableCheckOperator task.

tests/system/providers/common/sql/example_sql_column_table_check.py[source]

row_count_check = SQLTableCheckOperator(
    task_id="row_count_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) = 1",
        }
    },
)

Was this entry helpful?