SnowparkOperator¶
Use the SnowparkOperator
to run
Snowpark Python code in a Snowflake database.
Warning
Snowpark does not support Python 3.12 yet.
Currently, this operator does not support Snowpark pandas API because conflicting pandas version is used in Airflow. Consider using Snowpark pandas API with other Snowpark decorators or operators.
Tip
The @task.snowpark decorator is recommended over the SnowparkOperator
to run Snowpark Python code.
Prerequisite Tasks¶
To use this operator, you must do a few things:
Install provider package via pip.
pip install 'apache-airflow-providers-snowflake'Detailed information is available for Installation.
Using the Operator¶
Use the snowflake_conn_id
argument to specify connection used. If not specified, snowflake_default
will be used.
An example usage of the @task.snowpark
is as follows:
def setup_data(session: Session):
# The Snowpark session object is injected as an argument
data = [
(1, 0, 5, "Product 1", "prod-1", 1, 10),
(2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
(3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
(4, 0, 10, "Product 2", "prod-2", 2, 40),
(5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
(6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
(7, 0, 20, "Product 3", "prod-3", 3, 70),
(8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
(9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
(10, 0, 50, "Product 4", "prod-4", 4, 100),
(11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
(12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
]
columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
df = session.create_dataframe(data, schema=columns)
table_name = "sample_product_data"
df.write.save_as_table(table_name, mode="overwrite")
return table_name
setup_data_operator = SnowparkOperator(
task_id="setup_data",
python_callable=setup_data,
dag=dag,
)
def check_num_rows(table_name: str):
# Alternatively, retrieve the Snowpark session object using `get_active_session`
from snowflake.snowpark.context import get_active_session
session = get_active_session()
df = session.table(table_name)
assert df.count() == 12
check_num_rows_operator = SnowparkOperator(
task_id="check_num_rows",
python_callable=check_num_rows,
op_kwargs={"table_name": "{{ task_instance.xcom_pull(task_ids='setup_data') }}"},
dag=dag,
)
setup_data_operator >> check_num_rows_operator
As the example demonstrates, there are two ways to use the Snowpark session object in your Python function:
Pass the Snowpark session object to the function as a keyword argument named
session
. The Snowpark session will be automatically injected into the function, allowing you to use it as you normally would.Use get_active_session function from Snowpark to retrieve the Snowpark session object inside the function.
Note
Parameters that can be passed onto the operators will be given priority over the parameters already given
in the Airflow connection metadata (such as schema
, role
, database
and so forth).