Source code for airflow.example_dags.example_python_operator
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within avirtual environment."""importloggingimportshutilimporttimefromdatetimeimportdatetimefrompprintimportpprintfromairflowimportDAGfromairflow.decoratorsimporttasklog=logging.getLogger(__name__)withDAG(dag_id='example_python_operator',schedule_interval=None,start_date=datetime(2021,1,1),catchup=False,tags=['example'],)asdag:# [START howto_operator_python]@task(task_id="print_the_context")defprint_context(ds=None,**kwargs):"""Print the Airflow context and ds variable from the context."""pprint(kwargs)print(ds)return'Whatever you return gets printed in the logs'run_this=print_context()# [END howto_operator_python]# [START howto_operator_python_kwargs]# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectivelyforiinrange(5):@task(task_id=f'sleep_for_{i}')defmy_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)sleeping_task=my_sleeping_function(random_base=float(i)/10)run_this>>sleeping_task# [END howto_operator_python_kwargs]ifnotshutil.which("virtualenv"):log.warning("The virtalenv_python example task requires virtualenv, please install it.")else:# [START howto_operator_python_venv]@task.virtualenv(task_id="virtualenv_python",requirements=["colorama==0.4.0"],system_site_packages=False)defcallable_virtualenv():""" Example function that will be performed in a virtual environment. Importing at the module level ensures that it will not attempt to import the library before it is installed. """fromtimeimportsleepfromcoloramaimportBack,Fore,Styleprint(Fore.RED+'some red text')print(Back.GREEN+'and with a green background')print(Style.DIM+'and in dim text')print(Style.RESET_ALL)for_inrange(10):print(Style.DIM+'Please wait...',flush=True)sleep(10)print('Finished')virtualenv_task=callable_virtualenv()# [END howto_operator_python_venv]