DatabricksSqlOperator

Use the DatabricksSqlOperator to execute SQL on a Databricks SQL endpoint or a Databricks cluster.

Using the Operator

Operator executes given SQL queries against configured endpoint. There are 3 ways of specifying SQL queries:

  1. Simple string with SQL statement.

  2. List of strings representing SQL statements.

  3. Name of the file with SQL queries. File must have .sql extension. Each query should finish with ;<new_line>

Parameter

Input

sql: str or list[str]

Required parameter specifying a queries to execute.

sql_endpoint_name: str

Optional name of Databricks SQL endpoint to use. If not specified, http_path should be provided.

http_path: str

Optional HTTP path for Databricks SQL endpoint or Databricks cluster. If not specified, it should be provided in Databricks connection, or the sql_endpoint_name parameter must be set.

parameters: dict[str, any]

Optional parameters that will be used to substitute variable(s) in SQL query.

session_configuration: dict[str,str]

optional dict specifying Spark configuration parameters that will be set for the session.

output_path: str

Optional path to the file to which results will be written.

output_format: str

Name of the format which will be used to write results. Supported values are (case-insensitive): JSON (array of JSON objects), JSONL (each row as JSON object on a separate line), CSV (default).

csv_params: dict[str, any]

Optional dictionary with parameters to customize Python CSV writer.

do_xcom_push: boolean

whether we should push query results (last query if multiple queries are provided) to xcom. Default: false

Examples

Selecting data

An example usage of the DatabricksSqlOperator to select data from a table is as follows:

airflow/providers/databricks/example_dags/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to select data.
    select = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id='select_data',
        sql="select * from default.my_airflow_table",
    )

Selecting data into a file

An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows:

airflow/providers/databricks/example_dags/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to select data into a file with JSONL format.
    select_into_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id='select_data_into_file',
        sql="select * from default.my_airflow_table",
        output_path="/tmp/1.jsonl",
        output_format="jsonl",
    )

Executing multiple statements

An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows:

airflow/providers/databricks/example_dags/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to perform multiple operations.
    create = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id='create_and_populate_table',
        sql=[
            "drop table if exists default.my_airflow_table",
            "create table default.my_airflow_table(id int, v string)",
            "insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
        ],
    )

Executing multiple statements from a file

An example usage of the DatabricksSqlOperator to perform statements from a file is as follows:

airflow/providers/databricks/example_dags/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to select data.
    # SQL statements should be in the file with name test.sql
    create_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id='create_and_populate_from_file',
        sql="test.sql",
    )

Was this entry helpful?