Source code for airflow.lineage

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from functools import wraps

from airflow import configuration as conf
from airflow.lineage.datasets import DataSet
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string

from itertools import chain

PIPELINE_OUTLETS = "pipeline_outlets"
PIPELINE_INLETS = "pipeline_inlets"

log = LoggingMixin().log


def _get_backend():
    backend = None

    try:
        _backend_str = conf.get("lineage", "backend")
        backend = import_string(_backend_str)
    except ImportError as ie:
        log.debug("Cannot import %s due to %s", _backend_str, ie)
    except conf.AirflowConfigException:
        log.debug("Could not find lineage backend key in config")

    return backend


[docs]def apply_lineage(func): """ Saves the lineage to XCom and if configured to do so sends it to the backend. """ backend = _get_backend() @wraps(func) def wrapper(self, context, *args, **kwargs): self.log.debug("Lineage called with inlets: %s, outlets: %s", self.inlets, self.outlets) ret_val = func(self, context, *args, **kwargs) outlets = [x.as_dict() for x in self.outlets] inlets = [x.as_dict() for x in self.inlets] if len(self.outlets) > 0: self.xcom_push(context, key=PIPELINE_OUTLETS, value=outlets, execution_date=context['ti'].execution_date) if len(self.inlets) > 0: self.xcom_push(context, key=PIPELINE_INLETS, value=inlets, execution_date=context['ti'].execution_date) if backend: backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context) return ret_val return wrapper
[docs]def prepare_lineage(func): """ Prepares the lineage inlets and outlets. Inlets can be: * "auto" -> picks up any outlets from direct upstream tasks that have outlets defined, as such that if A -> B -> C and B does not have outlets but A does, these are provided as inlets. * "list of task_ids" -> picks up outlets from the upstream task_ids * "list of datasets" -> manually defined list of DataSet """ @wraps(func) def wrapper(self, context, *args, **kwargs): self.log.debug("Preparing lineage inlets and outlets") task_ids = set(self._inlets['task_ids']).intersection( self.get_flat_relative_ids(upstream=True) ) if task_ids: inlets = self.xcom_pull(context, task_ids=task_ids, dag_id=self.dag_id, key=PIPELINE_OUTLETS) inlets = [item for sublist in inlets if sublist for item in sublist] inlets = [DataSet.map_type(i['typeName'])(data=i['attributes']) for i in inlets] self.inlets.extend(inlets) if self._inlets['auto']: # dont append twice task_ids = set(self._inlets['task_ids']).symmetric_difference( self.upstream_task_ids ) inlets = self.xcom_pull(context, task_ids=task_ids, dag_id=self.dag_id, key=PIPELINE_OUTLETS) inlets = [item for sublist in inlets if sublist for item in sublist] inlets = [DataSet.map_type(i['typeName'])(data=i['attributes']) for i in inlets] self.inlets.extend(inlets) if len(self._inlets['datasets']) > 0: self.inlets.extend(self._inlets['datasets']) # outlets if len(self._outlets['datasets']) > 0: self.outlets.extend(self._outlets['datasets']) self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets) for dataset in chain(self.inlets, self.outlets): dataset.set_context(context) return func(self, context, *args, **kwargs) return wrapper