Lineage¶
Note
Lineage support is very experimental and subject to change.
Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having audit trails and data governance, but also debugging of data flows.
Airflow tracks data by means of inlets and outlets of the tasks. Let's work from an example and see how it works.
import datetime
import pendulum
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
dag = DAG(
dag_id="example_lineage",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="0 0 * * *",
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
f_final = File(url="/tmp/final")
run_this_last = EmptyOperator(task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final)
f_in = File(url="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
outlets.append(f_out)
run_this = BashOperator(task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets)
run_this.set_downstream(run_this_last)
Inlets can be a (list of) upstream task ids or statically defined as an attr annotated object
as is, for example, the File
object. Outlets can only be attr annotated object. Both are rendered
at run time. However, the outlets of a task in case they are inlets to another task will not be re-rendered
for the downstream task.
Note
Operators can add inlets and outlets automatically if the operator supports it.
In the example DAG task run_this
(task_id=run_me_first
) is a BashOperator that takes 3 inlets: CAT1
, CAT2
, CAT3
, that are
generated from a list. Note that data_interval_start
is a templated field and will be rendered when the task is running.
Note
Behind the scenes Airflow prepares the lineage metadata as part of the pre_execute
method of a task. When the task
has finished execution post_execute
is called and lineage metadata is pushed into XCOM. Thus if you are creating
your own operators that override this method make sure to decorate your method with prepare_lineage
and apply_lineage
respectively.
Shorthand notation¶
Shorthand notation is available as well, this works almost equal to unix command line pipes, inputs and outputs.
Note that operator precedence still applies. Also the |
operator will only work when the left hand side either
has outlets defined (e.g. by using add_outlets(..)
or has out of the box support of lineage operator.supports_lineage == True
.
f_in > run_this | (run_this_last > outlets)
Lineage Backend¶
It's possible to push the lineage metrics to a custom backend by providing an instance of a LineageBackend in the config:
[lineage]
backend = my.lineage.CustomBackend
The backend should inherit from airflow.lineage.LineageBackend
.
from airflow.lineage.backend import LineageBackend
class CustomBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
...
# Send the info to some external service