Source code for airflow.providers.openlineage.utils.selective_enable
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingfromtypingimportTypeVarfromairflow.modelsimportDAG,Operator,Paramfromairflow.models.xcom_argimportXComArg
[docs]defenable_lineage(obj:T)->T:""" Set selective enable OpenLineage parameter to True. The method also propagates param to tasks if the object is DAG. """ifisinstance(obj,XComArg):enable_lineage(obj.operator)returnobj# propagate param to tasksifisinstance(obj,DAG):fortaskinobj.task_dict.values():enable_lineage(task)obj.params[ENABLE_OL_PARAM_NAME]=ENABLE_OL_PARAMreturnobj
[docs]defdisable_lineage(obj:T)->T:""" Set selective enable OpenLineage parameter to False. The method also propagates param to tasks if the object is DAG. """ifisinstance(obj,XComArg):disable_lineage(obj.operator)returnobj# propagate param to tasksifisinstance(obj,DAG):fortaskinobj.task_dict.values():disable_lineage(task)obj.params[ENABLE_OL_PARAM_NAME]=DISABLE_OL_PARAMreturnobj
[docs]defis_task_lineage_enabled(task:Operator)->bool:"""Check if selective enable OpenLineage parameter is set to True on task level."""iftask.params.get(ENABLE_OL_PARAM_NAME)isFalse:log.debug("OpenLineage event emission suppressed. Task for this functionality is selectively disabled.")returntask.params.get(ENABLE_OL_PARAM_NAME)isTrue
[docs]defis_dag_lineage_enabled(dag:DAG)->bool:""" Check if DAG is selectively enabled to emit OpenLineage events. The method also checks if selective enable parameter is set to True or if any of the tasks in DAG is selectively enabled. """ifdag.params.get(ENABLE_OL_PARAM_NAME)isFalse:log.debug("OpenLineage event emission suppressed. DAG for this functionality is selectively disabled.")returndag.params.get(ENABLE_OL_PARAM_NAME)isTrueorany(is_task_lineage_enabled(task)fortaskindag.tasks)