Source code for airflow.example_dags.example_python_operator
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG demonstrating the usage of the PythonOperator."""importtimefrompprintimportpprintfromairflowimportDAGfromairflow.operators.pythonimportPythonOperator,PythonVirtualenvOperatorfromairflow.utils.datesimportdays_agoargs={'owner':'airflow',}withDAG(dag_id='example_python_operator',default_args=args,schedule_interval=None,start_date=days_ago(2),tags=['example'],)asdag:# [START howto_operator_python]defprint_context(ds,**kwargs):"""Print the Airflow context and ds variable from the context."""pprint(kwargs)print(ds)return'Whatever you return gets printed in the logs'run_this=PythonOperator(task_id='print_the_context',python_callable=print_context,)# [END howto_operator_python]# [START howto_operator_python_kwargs]defmy_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectivelyforiinrange(5):task=PythonOperator(task_id='sleep_for_'+str(i),python_callable=my_sleeping_function,op_kwargs={'random_base':float(i)/10},)run_this>>task# [END howto_operator_python_kwargs]# [START howto_operator_python_venv]defcallable_virtualenv():""" Example function that will be performed in a virtual environment. Importing at the module level ensures that it will not attempt to import the library before it is installed. """fromtimeimportsleepfromcoloramaimportBack,Fore,Styleprint(Fore.RED+'some red text')print(Back.GREEN+'and with a green background')print(Style.DIM+'and in dim text')print(Style.RESET_ALL)for_inrange(10):print(Style.DIM+'Please wait...',flush=True)sleep(10)print('Finished')virtualenv_task=PythonVirtualenvOperator(task_id="virtualenv_python",python_callable=callable_virtualenv,requirements=["colorama==0.4.0"],system_site_packages=False,)# [END howto_operator_python_venv]