## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importinspectimportosimportpickleimportsysimporttypesimportwarningsfromtempfileimportTemporaryDirectoryfromtextwrapimportdedentfromtypingimportAny,Callable,Dict,Iterable,List,Optional,Unionimportdillfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.models.skipmixinimportSkipMixinfromairflow.models.taskinstanceimport_CURRENT_CONTEXTfromairflow.utils.operator_helpersimportdetermine_kwargsfromairflow.utils.process_utilsimportexecute_in_subprocessfromairflow.utils.python_virtualenvimportprepare_virtualenv,write_python_script
[docs]deftask(python_callable:Optional[Callable]=None,multiple_outputs:Optional[bool]=None,**kwargs):""" Deprecated function that calls @task.python and allows users to turn a python function into an Airflow task. Please use the following instead: from airflow.decorators import task @task def my_task() :param python_callable: A reference to an object that is callable :type python_callable: python callable :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated) :type op_kwargs: dict :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :type op_args: list :param multiple_outputs: if set, function return value will be unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. Defaults to False. :type multiple_outputs: bool :return: """# To maintain backwards compatibility, we import the task object into this file# This prevents breakages in dags that use `from airflow.operators.python import task`fromairflow.decorators.pythonimportpython_taskwarnings.warn("""airflow.operators.python.task is deprecated. Please use the following instead from airflow.decorators import task @task def my_task()""",DeprecationWarning,stacklevel=2,)returnpython_task(python_callable=python_callable,multiple_outputs=multiple_outputs,**kwargs)
[docs]classPythonOperator(BaseOperator):""" Executes a Python callable .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:PythonOperator` :param python_callable: A reference to an object that is callable :type python_callable: python callable :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function :type op_kwargs: dict (templated) :param op_args: a list of positional arguments that will get unpacked when calling your callable :type op_args: list (templated) :param templates_dict: a dictionary where the values are templates that will get templated by the Airflow engine sometime between ``__init__`` and ``execute`` takes place and are made available in your callable's context after the template has been applied. (templated) :type templates_dict: dict[str] :param templates_exts: a list of file extensions to resolve while processing templated fields, for examples ``['.sql', '.hql']`` :type templates_exts: list[str] """
)def__init__(self,*,python_callable:Callable,op_args:Optional[List]=None,op_kwargs:Optional[Dict]=None,templates_dict:Optional[Dict]=None,templates_exts:Optional[List[str]]=None,**kwargs,)->None:ifkwargs.get("provide_context"):warnings.warn("provide_context is deprecated as of 2.0 and is no longer required",DeprecationWarning,stacklevel=2,)kwargs.pop('provide_context',None)super().__init__(**kwargs)ifnotcallable(python_callable):raiseAirflowException('`python_callable` param must be callable')self.python_callable=python_callableself.op_args=op_argsor[]self.op_kwargs=op_kwargsor{}self.templates_dict=templates_dictiftemplates_exts:self.template_ext=templates_exts
[docs]defexecute(self,context:Dict):context.update(self.op_kwargs)context['templates_dict']=self.templates_dictself.op_kwargs=determine_kwargs(self.python_callable,self.op_args,context)return_value=self.execute_callable()self.log.info("Done. Returned value was: %s",return_value)returnreturn_value
[docs]defexecute_callable(self):""" Calls the python callable with the given arguments. :return: the return value of the call. :rtype: any """returnself.python_callable(*self.op_args,**self.op_kwargs)
[docs]classBranchPythonOperator(PythonOperator,SkipMixin):""" Allows a workflow to "branch" or follow a path following the execution of this task. It derives the PythonOperator and expects a Python function that returns a single task_id or list of task_ids to follow. The task_id(s) returned should point to a task directly downstream from {self}. All other "branches" or directly downstream tasks are marked with a state of ``skipped`` so that these paths can't move forward. The ``skipped`` states are propagated downstream to allow for the DAG state to fill up and the DAG run's state to be inferred. """
[docs]classShortCircuitOperator(PythonOperator,SkipMixin):""" Allows a workflow to continue only if a condition is met. Otherwise, the workflow "short-circuits" and downstream tasks are skipped. The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with a state of "skipped". If the condition is True, downstream tasks proceed as normal. The condition is determined by the result of `python_callable`. """
[docs]defexecute(self,context:Dict):condition=super().execute(context)self.log.info("Condition result is %s",condition)ifcondition:self.log.info('Proceeding with downstream tasks...')returnself.log.info('Skipping downstream tasks...')downstream_tasks=context['task'].get_flat_relatives(upstream=False)self.log.debug("Downstream task_ids %s",downstream_tasks)ifdownstream_tasks:self.skip(context['dag_run'],context['ti'].execution_date,downstream_tasks)self.log.info("Done.")
[docs]classPythonVirtualenvOperator(PythonOperator):""" Allows one to run a function in a virtualenv that is created and destroyed automatically (with certain caveats). The function must be defined using def, and not be part of a class. All imports must happen inside the function and no variables outside of the scope may be referenced. A global scope variable named virtualenv_string_args will be available (populated by string_args). In addition, one can pass stuff through op_args and op_kwargs, and one can use a return value. Note that if your virtualenv runs in a different Python major version than Airflow, you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to Airflow through plugins. You can use string_args though. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:PythonVirtualenvOperator` :param python_callable: A python function with no references to outside variables, defined with def, which will be run in a virtualenv :type python_callable: function :param requirements: A list of requirements as specified in a pip install command :type requirements: list[str] :param python_version: The Python version to run the virtualenv with. Note that both 2 and 2.7 are acceptable forms. :type python_version: Optional[Union[str, int, float]] :param use_dill: Whether to use dill to serialize the args and result (pickle is default). This allow more complex types but requires you to include dill in your requirements. :type use_dill: bool :param system_site_packages: Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information. :type system_site_packages: bool :param op_args: A list of positional arguments to pass to python_callable. :type op_args: list :param op_kwargs: A dict of keyword arguments to pass to python_callable. :type op_kwargs: dict :param string_args: Strings that are present in the global var virtualenv_string_args, available to python_callable at runtime as a list[str]. Note that args are split by newline. :type string_args: list[str] :param templates_dict: a dictionary where the values are templates that will get templated by the Airflow engine sometime between ``__init__`` and ``execute`` takes place and are made available in your callable's context after the template has been applied :type templates_dict: dict of str :param templates_exts: a list of file extensions to resolve while processing templated fields, for examples ``['.sql', '.hql']`` :type templates_exts: list[str] """
def__init__(self,*,python_callable:Callable,requirements:Optional[Iterable[str]]=None,python_version:Optional[Union[str,int,float]]=None,use_dill:bool=False,system_site_packages:bool=True,op_args:Optional[List]=None,op_kwargs:Optional[Dict]=None,string_args:Optional[Iterable[str]]=None,templates_dict:Optional[Dict]=None,templates_exts:Optional[List[str]]=None,**kwargs,):if(notisinstance(python_callable,types.FunctionType)orisinstance(python_callable,types.LambdaType)andpython_callable.__name__=="<lambda>"):raiseAirflowException('PythonVirtualenvOperator only supports functions for python_callable arg')if(python_versionandstr(python_version)[0]!=str(sys.version_info.major)and(op_argsorop_kwargs)):raiseAirflowException("Passing op_args or op_kwargs is not supported across different Python ""major versions for PythonVirtualenvOperator. Please use string_args.")super().__init__(python_callable=python_callable,op_args=op_args,op_kwargs=op_kwargs,templates_dict=templates_dict,templates_exts=templates_exts,**kwargs,)self.requirements=list(requirementsor[])self.string_args=string_argsor[]self.python_version=python_versionself.use_dill=use_dillself.system_site_packages=system_site_packagesifnotself.system_site_packagesandself.use_dilland'dill'notinself.requirements:self.requirements.append('dill')self.pickling_library=dillifself.use_dillelsepickle
[docs]def_read_result(self,filename):ifos.stat(filename).st_size==0:returnNonewithopen(filename,'rb')asfile:try:returnself.pickling_library.load(file)exceptValueError:self.log.error("Error deserializing result. Note that result deserialization ""is not supported across major Python versions.")raise
[docs]def__deepcopy__(self,memo):# module objects can't be copied _at all__memo[id(self.pickling_library)]=self.pickling_libraryreturnsuper().__deepcopy__(memo)
[docs]defget_current_context()->Dict[str,Any]:""" Obtain the execution context for the currently executing operator without altering user method's signature. This is the simplest method of retrieving the execution context dictionary. **Old style:** .. code:: python def my_task(**context): ti = context["ti"] **New style:** .. code:: python from airflow.operators.python import get_current_context def my_task(): context = get_current_context() ti = context["ti"] Current context will only have value if this method was called after an operator was starting to execute. """ifnot_CURRENT_CONTEXT:raiseAirflowException("Current context was requested but no context was found! ""Are you running within an airflow task?")return_CURRENT_CONTEXT[-1]