DatabricksSqlOperator¶
Use the DatabricksSqlOperator
to execute SQL
on a Databricks SQL endpoint or a
Databricks cluster.
Using the Operator¶
Operator executes given SQL queries against configured endpoint. There are 3 ways of specifying SQL queries:
Simple string with SQL statement.
List of strings representing SQL statements.
Name of the file with SQL queries. File must have
.sql
extension. Each query should finish with;<new_line>
Parameter |
Input |
---|---|
sql: str or list[str] |
Required parameter specifying a queries to execute. |
sql_endpoint_name: str |
Optional name of Databricks SQL endpoint to use. If not specified, |
http_path: str |
Optional HTTP path for Databricks SQL endpoint or Databricks cluster. If not specified, it should be provided in Databricks connection, or the |
parameters: dict[str, any] |
Optional parameters that will be used to substitute variable(s) in SQL query. |
session_configuration: dict[str,str] |
optional dict specifying Spark configuration parameters that will be set for the session. |
output_path: str |
Optional path to the file to which results will be written. |
output_format: str |
Name of the format which will be used to write results. Supported values are (case-insensitive): |
csv_params: dict[str, any] |
Optional dictionary with parameters to customize Python CSV writer. |
do_xcom_push: boolean |
whether we should push query results (last query if multiple queries are provided) to xcom. Default: false |
Examples¶
Selecting data¶
An example usage of the DatabricksSqlOperator to select data from a table is as follows:
# Example of using the Databricks SQL Operator to select data.
select = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id='select_data',
sql="select * from default.my_airflow_table",
)
Selecting data into a file¶
An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows:
# Example of using the Databricks SQL Operator to select data into a file with JSONL format.
select_into_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id='select_data_into_file',
sql="select * from default.my_airflow_table",
output_path="/tmp/1.jsonl",
output_format="jsonl",
)
Executing multiple statements¶
An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows:
# Example of using the Databricks SQL Operator to perform multiple operations.
create = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id='create_and_populate_table',
sql=[
"drop table if exists default.my_airflow_table",
"create table default.my_airflow_table(id int, v string)",
"insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
],
)
Executing multiple statements from a file¶
An example usage of the DatabricksSqlOperator to perform statements from a file is as follows:
# Example of using the Databricks SQL Operator to select data.
# SQL statements should be in the file with name test.sql
create_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id='create_and_populate_from_file',
sql="test.sql",
)