Source code for airflow.example_dags.example_python_operator
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG demonstrating the usage of the classic Python operators to execute Python functions natively andwithin a virtual environment."""from__future__importannotationsimportloggingimportsysimporttimefrompprintimportpprintimportpendulumfromairflow.models.dagimportDAGfromairflow.operators.pythonimport(ExternalPythonOperator,PythonOperator,PythonVirtualenvOperator,is_venv_installed,)
[docs]defprint_context(ds=None,**kwargs):"""Print the Airflow context and ds variable from the context."""print("::group::All kwargs")pprint(kwargs)print("::endgroup::")print("::group::Context variable ds")print(ds)print("::endgroup::")return"Whatever you return gets printed in the logs"
run_this=PythonOperator(task_id="print_the_context",python_callable=print_context)# [END howto_operator_python]# [START howto_operator_python_render_sql]deflog_sql(**kwargs):log.info("Python task decorator query: %s",str(kwargs["templates_dict"]["query"]))log_the_sql=PythonOperator(task_id="log_sql_query",python_callable=log_sql,templates_dict={"query":"sql/sample.sql"},templates_exts=[".sql"],)# [END howto_operator_python_render_sql]# [START howto_operator_python_kwargs]# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectivelydefmy_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)foriinrange(5):sleeping_task=PythonOperator(task_id=f"sleep_for_{i}",python_callable=my_sleeping_function,op_kwargs={"random_base":i/10})run_this>>log_the_sql>>sleeping_task# [END howto_operator_python_kwargs]ifnotis_venv_installed():log.warning("The virtalenv_python example task requires virtualenv, please install it.")else:# [START howto_operator_python_venv]defcallable_virtualenv():""" Example function that will be performed in a virtual environment. Importing at the module level ensures that it will not attempt to import the library before it is installed. """fromtimeimportsleepfromcoloramaimportBack,Fore,Styleprint(Fore.RED+"some red text")print(Back.GREEN+"and with a green background")print(Style.DIM+"and in dim text")print(Style.RESET_ALL)for_inrange(4):print(Style.DIM+"Please wait...",flush=True)sleep(1)print("Finished")virtualenv_task=PythonVirtualenvOperator(task_id="virtualenv_python",python_callable=callable_virtualenv,requirements=["colorama==0.4.0"],system_site_packages=False,)# [END howto_operator_python_venv]sleeping_task>>virtualenv_task# [START howto_operator_external_python]defcallable_external_python():""" Example function that will be performed in a virtual environment. Importing at the module level ensures that it will not attempt to import the library before it is installed. """importsysfromtimeimportsleepprint(f"Running task via {sys.executable}")print("Sleeping")for_inrange(4):print("Please wait...",flush=True)sleep(1)print("Finished")external_python_task=ExternalPythonOperator(task_id="external_python",python_callable=callable_external_python,python=PATH_TO_PYTHON_BINARY,)# [END howto_operator_external_python]run_this>>external_python_task>>virtualenv_task