Source code for airflow.example_dags.example_python_operator
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within avirtual environment."""from__future__importannotationsimportloggingimportshutilimportsysimporttempfileimporttimefrompprintimportpprintimportpendulumfromairflowimportDAGfromairflow.decoratorsimporttaskfromairflow.operators.pythonimportExternalPythonOperator,PythonVirtualenvOperator
[docs]defprint_context(ds=None,**kwargs):"""Print the Airflow context and ds variable from the context."""pprint(kwargs)print(ds)return"Whatever you return gets printed in the logs"
run_this=print_context()# [END howto_operator_python]# [START howto_operator_python_render_sql]@task(task_id="log_sql_query",templates_dict={"query":"sql/sample.sql"},templates_exts=[".sql"])deflog_sql(**kwargs):logging.info("Python task decorator query: %s",str(kwargs["templates_dict"]["query"]))log_the_sql=log_sql()# [END howto_operator_python_render_sql]# [START howto_operator_python_kwargs]# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectivelyforiinrange(5):@task(task_id=f"sleep_for_{i}")defmy_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)sleeping_task=my_sleeping_function(random_base=float(i)/10)run_this>>log_the_sql>>sleeping_task# [END howto_operator_python_kwargs]ifnotshutil.which("virtualenv"):log.warning("The virtalenv_python example task requires virtualenv, please install it.")else:# [START howto_operator_python_venv]@task.virtualenv(task_id="virtualenv_python",requirements=["colorama==0.4.0"],system_site_packages=False)defcallable_virtualenv():""" Example function that will be performed in a virtual environment. Importing at the module level ensures that it will not attempt to import the library before it is installed. """fromtimeimportsleepfromcoloramaimportBack,Fore,Styleprint(Fore.RED+"some red text")print(Back.GREEN+"and with a green background")print(Style.DIM+"and in dim text")print(Style.RESET_ALL)for_inrange(4):print(Style.DIM+"Please wait...",flush=True)sleep(1)print("Finished")virtualenv_task=callable_virtualenv()# [END howto_operator_python_venv]sleeping_task>>virtualenv_task# [START howto_operator_external_python]@task.external_python(task_id="external_python",python=PATH_TO_PYTHON_BINARY)defcallable_external_python():""" Example function that will be performed in a virtual environment. Importing at the module level ensures that it will not attempt to import the library before it is installed. """importsysfromtimeimportsleepprint(f"Running task via {sys.executable}")print("Sleeping")for_inrange(4):print("Please wait...",flush=True)sleep(1)print("Finished")external_python_task=callable_external_python()# [END howto_operator_external_python]# [START howto_operator_external_python_classic]external_classic=ExternalPythonOperator(task_id="external_python_classic",python=PATH_TO_PYTHON_BINARY,python_callable=x,)# [END howto_operator_external_python_classic]# [START howto_operator_python_venv_classic]virtual_classic=PythonVirtualenvOperator(task_id="virtualenv_classic",requirements="colorama==0.4.0",python_callable=x,)# [END howto_operator_python_venv_classic]run_this>>external_classic>>external_python_task>>virtual_classic