DatabricksSqlOperator¶
Use the DatabricksSqlOperator
to execute SQL
on a Databricks SQL warehouse or a
Databricks cluster.
Using the Operator¶
Operator executes given SQL queries against configured warehouse. The only required parameters are:
sql
- SQL queries to execute. There are 3 ways of specifying SQL queries:Simple string with SQL statement.
List of strings representing SQL statements.
Name of the file with SQL queries. File must have
.sql
extension. Each query should finish with;<new_line>
One of
sql_warehouse_name
(name of Databricks SQL warehouse to use) orhttp_path
(HTTP path for Databricks SQL warehouse or Databricks cluster).
Other parameters are optional and could be found in the class documentation.
Examples¶
Selecting data¶
An example usage of the DatabricksSqlOperator to select data from a table is as follows:
# Example of using the Databricks SQL Operator to select data.
select = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="select_data",
sql="select * from default.my_airflow_table",
)
Selecting data into a file¶
An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows:
# Example of using the Databricks SQL Operator to select data into a file with JSONL format.
select_into_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="select_data_into_file",
sql="select * from default.my_airflow_table",
output_path="/tmp/1.jsonl",
output_format="jsonl",
)
Executing multiple statements¶
An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows:
# Example of using the Databricks SQL Operator to perform multiple operations.
create = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="create_and_populate_table",
sql=[
"drop table if exists default.my_airflow_table",
"create table default.my_airflow_table(id int, v string)",
"insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
],
)
Executing multiple statements from a file¶
An example usage of the DatabricksSqlOperator to perform statements from a file is as follows:
# Example of using the Databricks SQL Operator to select data.
# SQL statements should be in the file with name test.sql
create_file = DatabricksSqlOperator(
databricks_conn_id=connection_id,
sql_endpoint_name=sql_endpoint_name,
task_id="create_and_populate_from_file",
sql="test.sql",
)
DatabricksSqlSensor¶
Use the DatabricksSqlSensor
to run the sensor
for a table accessible via a Databricks SQL warehouse or interactive cluster.
Using the Sensor¶
The sensor executes the SQL statement supplied by the user. The only required parameters are:
sql
- SQL query to execute for the sensor.One of
sql_warehouse_name
(name of Databricks SQL warehouse to use) orhttp_path
(HTTP path for Databricks SQL warehouse or Databricks cluster).
Other parameters are optional and could be found in the class documentation.
Examples¶
Configuring Databricks connection to be used with the Sensor.
# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"
Poking the specific table for existence of data/partition:
# Example of using the Databricks SQL Sensor to check existence of data in a table.
sql_sensor = DatabricksSqlSensor(
databricks_conn_id=connection_id,
sql_warehouse_name=sql_warehouse_name,
catalog="hive_metastore",
task_id="sql_sensor_task",
sql="select * from hive_metastore.temp.sample_table_3 limit 1",
timeout=60 * 2,
)