Implementing OpenLineage in Operators¶
OpenLineage makes adding lineage to your data pipelines easy through support of direct modification of Airflow Operators. When it’s possible to modify the Operator adding lineage extraction can be as easy as adding a single method to it. See OpenLineage methods for more details.
There might be some Operators that you can not modify (f.e. third party providers), but still want the lineage to be extracted from them. To handle this situation, OpenLineage allows you to provide custom Extractor for any Operator. See Custom Extractors for more details.
If all of the above can not be implemented, as a fallback, there is a way to manually annotate lineage. Airflow allows Operators to track lineage by specifying the input and outputs of the Operators via inlets and outlets. See Manually annotated lineage for more details.
Extraction precedence¶
As there are multiple possible ways of implementing OpenLineage support for the Operator, it’s important to keep in mind the order in which OpenLineage looks for lineage data:
Extractor - check if there is a custom Extractor specified for Operator class name. Any custom Extractor registered by the user will take precedence over default Extractors defined in Airflow Provider source code (f.e. BashExtractor).
OpenLineage methods - if there is no Extractor explicitly specified for Operator class name, DefaultExtractor is used, that looks for OpenLineage methods in Operator.
Inlets and Outlets - if there are no OpenLineage methods defined in the Operator, inlets and outlets are checked.
If all the above options are missing, no lineage data is extracted from the Operator. You will still receive OpenLineage events enriched with things like general Airflow facets, proper event time and type, but the inputs/outputs will be empty and Operator-specific facets will be missing.
OpenLineage methods¶
This approach is recommended when dealing with your own Operators, where you can directly implement OpenLineage methods. When dealing with Operators that you can not modify (f.e. third party providers), but still want the lineage to be extracted from them, see Custom Extractors.
OpenLineage defines a few methods for implementation in Operators. Those are referred to as OpenLineage methods.
def get_openlineage_facets_on_start() -> OperatorLineage: ...
def get_openlineage_facets_on_complete(ti: TaskInstance) -> OperatorLineage: ...
def get_openlineage_facets_on_failure(ti: TaskInstance) -> OperatorLineage: ...
OpenLineage methods get called respectively when task instance changes state to:
RUNNING ->
get_openlineage_facets_on_start()
SUCCESS ->
get_openlineage_facets_on_complete()
FAILED ->
get_openlineage_facets_on_failure()
At least one of the following methods must be implemented: get_openlineage_facets_on_start()
or get_openlineage_facets_on_complete()
.
For more details on what methods are called when others are missing, see How to properly implement OpenLineage methods?.
Instead of returning complete OpenLineage event, the provider defines OperatorLineage
structure to be returned by Operators:
@define
class OperatorLineage:
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
OpenLineage integration itself takes care to enrich it with things like general Airflow facets, proper event time and type, creating proper OpenLineage RunEvent.
How to properly implement OpenLineage methods?¶
There are a couple of things worth noting when implementing OpenLineage in Operators.
First, do not import OpenLineage-related objects on top-level, but in OL method itself. This allows users to use your provider even if they do not have OpenLineage provider installed.
Second important point is to make sure your provider returns OpenLineage-compliant dataset names. It allows OpenLineage consumers to properly match information about datasets gathered from different sources. The naming convention is described in the OpenLineage naming docs.
Third, OpenLineage implementation should not waste time of users that do not use it.
This means not doing heavy processing or network calls in the execute
method that aren’t used by it.
Better option is to save relevant information in Operator attributes - and then use it
in OpenLineage method.
Good example is BigQueryExecuteQueryOperator
. It saves job_ids
of queries that were executed.
get_openlineage_facets_on_complete
then can call BigQuery API, asking for lineage of those tables, and transform it to OpenLineage format.
Fourth, it’s not necessary to implement all the methods. If all the datasets are known before execute
is
called, and there’s no relevant runtime data, there might be no point to implementing get_openlineage_facets_on_complete
- the get_openlineage_facets_on_start
method can provide all the data. And in reverse, if everything is unknown
before execute, there might be no point in writing _on_start
method.
Similarly, if there’s no relevant failure data - or the failure conditions are unknown,
implementing get_openlineage_facets_on_failure
is probably not worth it. In general:
if there’s no on_failure
method, the on_complete
method gets called instead.
If there’s no on_failure
and on_complete
method, the on_start
gets called instead (both at the task start and task completion).
If there’s no on_start
method the lineage information will not be included in START event, and the on_complete
method will be called upon task completion.
How to test OpenLineage methods?¶
Unit testing OpenLineage integration in Operators is very similar to testing Operators itself.
Objective of those tests is making sure the get_openlineage_*
methods return proper OperatorLineage
data structure with relevant fields filled. It’s recommended to mock any external calls.
Authors of tests need to remember the condition of calling different OL methods is different.
get_openlineage_facets_on_start
is called before execute
, and as such, must not depend on values
that are set there.
See Troubleshooting for details on how to troubleshoot OpenLineage locally.
There is no existing framework for system testing OpenLineage integration, but the easiest way it can be achieved is
by comparing emitted events (f.e. with FileTransport
) against expected ones.
Objective of author of OpenLineage system test is to provide expected dictionary of event keys.
Event keys identify event send from particular Operator and method: they have structure <dag_id>.<task_id>.event.<event_type>
;
it’s always possible to identify particular event send from particular task this way.
The provided event structure does not have to contain all the fields that are in the resulting event.
Only the fields provided by test author can be compared; this allows to check only for fields particular
test cares about. It also allows to skip fields that are (semi) randomly generated, like runId
or eventTime
,
or just always the same in context of OpenLineage in Airflow, like producer
.
Example¶
Here’s example of properly implemented get_openlineage_facets_on_complete
method, for GcsToGcsOperator.
As there is some processing made in execute
method, and there is no relevant failure data, implementing this single method is enough.
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because execute method does preprocessing on internals.
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(self.resolved_source_objects)
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
for target in sorted(self.resolved_target_objects)
],
)
For more examples of implemented OpenLineage methods, check out the source code of Supported classes.
Custom Extractors¶
This approach is recommended when dealing with Operators that you can not modify (f.e. third party providers), but still want the lineage to be extracted from them. If you want to extract lineage from your own Operators, you may prefer directly implementing OpenLineage methods as described in OpenLineage methods.
This approach works by detecting which Airflow Operators your DAG is using, and extracting lineage data from them using corresponding Extractors class.
Interface¶
Custom Extractors have to derive from BaseExtractor
and implement at least two methods: _execute_extraction
and get_operator_classnames
.
BaseExtractor defines two methods: extract
and extract_on_complete
, that are called and used to provide actual lineage data.
The difference is that extract
is called before Operator’s execute
method, while extract_on_complete
is called after.
By default, extract
calls _execute_extraction
method implemented in custom Extractor, and extract_on_complete
calls the extract
method. If you want to provide some additional information available after the task execution, you can
override extract_on_complete
method. This can be used to extract any additional information that the Operator
sets on it’s own properties. Good example is SnowflakeOperator
that sets query_ids
after execution.
The get_operator_classnames
is a classmethod that is used to provide list of Operators that your Extractor can get lineage from.
For example:
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['PostgresOperator']
If the name of the Operator matches one of the names on the list, the Extractor will be instantiated - with Operator
provided in the Extractor’s self.operator
property - and both extract
and extract_on_complete
methods will be called.
Both methods return OperatorLineage
structure:
@define
class OperatorLineage:
"""Structure returned from lineage extraction."""
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, RunFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
Inputs and outputs are lists of plain OpenLineage datasets (openlineage.client.event_v2.Dataset).
run_facets
and job_facets
are dictionaries of optional RunFacets and JobFacets that would be attached to the job - for example,
you might want to attach SqlJobFacet
if your Operator is executing SQL.
To learn more about facets in OpenLineage see Custom Facets.
Registering Custom Extractor¶
OpenLineage integration does not know that you’ve provided an Extractor unless you’ll register it.
It can be done by using extractors
option in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
Optionally, you can separate them with whitespace. It’s useful if you’re providing them as part of some YAML file.
AIRFLOW__OPENLINEAGE__EXTRACTORS: >-
full.path.to.FirstExtractor;
full.path.to.SecondExtractor
Remember to make sure that the path is importable for scheduler and worker.
Debugging Custom Extractor¶
There are two common problems associated with custom Extractors.
First, is wrong path provided to extractors
option in Airflow configuration. The path needs to be exactly the same as one you’d use from your code.
If the path is wrong or non-importable from worker, plugin will fail to load the Extractors and proper OpenLineage events for that Operator won’t be emitted.
Second one, and maybe more insidious, are imports from Airflow. Due to the fact that OpenLineage code gets instantiated when Airflow worker itself starts, any import from Airflow can be unnoticeably cyclical. This causes OpenLineage extraction to fail.
To avoid this issue, import from Airflow only locally - in _execute_extraction
or extract_on_complete
methods.
If you need imports for type checking, guard them behind typing.TYPE_CHECKING.
Testing Custom Extractor¶
As all code, custom Extractors should be tested. This section will provide some information about the most important data structures to write tests for and some notes on troubleshooting. We assume prior knowledge of writing custom Extractors. To learn more about how Operators and Extractors work together under the hood, check out Custom Extractors.
When testing an Extractor, we want to firstly verify if OperatorLineage
object is being created,
specifically verifying that the object is being built with the correct input and output datasets and relevant facets.
This is done in OpenLineage via pytest, with appropriate mocking and patching for connections and objects.
Check out example tests.
Testing each facet is also important, as data or graphs in the UI can render incorrectly if the facets are wrong. For example, if the facet name is created incorrectly in the Extractor, then the Operator’s task will not show up in the lineage graph, creating a gap in pipeline observability.
Even with unit tests, an Extractor may still not be operating as expected. The easiest way to tell if data isn’t coming through correctly is if the UI elements are not showing up correctly in the Lineage tab.
See Troubleshooting for details on how to troubleshoot OpenLineage locally.
Example¶
This is an example of a simple Extractor for an Operator that executes export Query in BigQuery and saves the result to S3 file.
Some information is known before Operator’s execute
method is called, and we can already extract some lineage in _execute_extraction
method.
After Operator’s execute
method is called, in extract_on_complete
, we can simply attach some additional Facets
f.e. with Bigquery Job ID to what we’ve prepared earlier. This way, we get all possible information from the Operator.
Please note that this is just an example. There are some OpenLineage built-in features that can facilitate different processes, like extracting column level lineage and inputs/outputs from SQL query with SQL parser.
from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
ExternalQueryRunFacet,
SQLJobFacet,
)
class ExampleOperator(BaseOperator):
def __init__(self, query, bq_table_reference, s3_path) -> None:
self.bq_table_reference = bq_table_reference
self.s3_path = s3_path
self.s3_file_name = s3_file_name
self._job_id = None
def execute(self, context) -> Any:
self._job_id = run_query(query=self.query)
class ExampleExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls):
return ["ExampleOperator"]
def _execute_extraction(self) -> OperatorLineage:
"""Define what we know before Operator's extract is called."""
return OperatorLineage(
inputs=[Dataset(namespace="bigquery", name=self.operator.bq_table_reference)],
outputs=[Dataset(namespace=self.operator.s3_path, name=self.operator.s3_file_name)],
job_facets={
"sql": SQLJobFacet(
query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';' ...) AS SELECT * FROM ... "
)
},
)
def extract_on_complete(self, task_instance) -> OperatorLineage:
"""Add what we received after Operator's extract call."""
lineage_metadata = self.extract()
lineage_metadata.run_facets = {
"parent": ExternalQueryRunFacet(externalQueryId=task_instance.task._job_id, source="bigquery")
}
return lineage_metadata
For more examples of OpenLineage Extractors, check out the source code of BashExtractor or PythonExtractor.
Manually annotated lineage¶
This approach is rarely recommended, only in very specific cases, when it’s impossible to extract some lineage information from the Operator itself. If you want to extract lineage from your own Operators, you may prefer directly implementing OpenLineage methods as described in OpenLineage methods. When dealing with Operators that you can not modify (f.e. third party providers), but still want the lineage to be extracted from them, see Custom Extractors.
Airflow allows Operators to track lineage by specifying the input and outputs of the Operators via inlets and outlets. OpenLineage will, by default, use inlets and outlets as input/output datasets if it cannot find any successful extraction from the OpenLineage methods or the Extractors.
Airflow supports inlets and outlets to be either a Table, Column, File or User entity and so does OpenLineage.
Example¶
An Operator inside the Airflow DAG can be annotated with inlets and outlets like in the below example:
"""Example DAG demonstrating the usage of the extraction via Inlets and Outlets."""
import pendulum
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.lineage.entities import Table, File, Column, User
t1 = Table(
cluster="c1",
database="d1",
name="t1",
owners=[User(email="jdoe@ok.com", first_name="Joe", last_name="Doe")],
)
t2 = Table(
cluster="c1",
database="d1",
name="t2",
columns=[
Column(name="col1", description="desc1", data_type="type1"),
Column(name="col2", description="desc2", data_type="type2"),
],
owners=[
User(email="mike@company.com", first_name="Mike", last_name="Smith"),
User(email="theo@company.com", first_name="Theo"),
User(email="smith@company.com", last_name="Smith"),
User(email="jane@company.com"),
],
)
t3 = Table(
cluster="c1",
database="d1",
name="t3",
columns=[
Column(name="col3", description="desc3", data_type="type3"),
Column(name="col4", description="desc4", data_type="type4"),
],
)
t4 = Table(cluster="c1", database="d1", name="t4")
f1 = File(url="s3://bucket/dir/file1")
with DAG(
dag_id="example_operator",
schedule="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
task1 = BashOperator(
task_id="task_1_with_inlet_outlet",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
inlets=[t1, t2],
outlets=[t3],
)
task2 = BashOperator(
task_id="task_2_with_inlet_outlet",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
inlets=[t3, f1],
outlets=[t4],
)
task1 >> task2
if __name__ == "__main__":
dag.cli()
Conversion from Airflow Table entity to OpenLineage Dataset is made in the following way:
- CLUSTER
of the table entity becomes the namespace of OpenLineage’s Dataset
- The name of the dataset is formed by {{DATABASE}}.{{NAME}}
where DATABASE
and NAME
are attributes specified by Airflow’s Table entity.
Custom Facets¶
To learn more about facets in OpenLineage, please refer to facet documentation. Also check out available facets and a blog post about extending with facets.
The OpenLineage spec might not contain all the facets you need to write your extractor, in which case you will have to make your own custom facets.
You can also inject your own custom facets in the lineage event’s run facet using the custom_run_facets
Airflow configuration.
Steps to be taken,
Write a function that returns the custom facets. You can write as many custom facet functions as needed.
Register the functions using the
custom_run_facets
Airflow configuration.
Airflow OpenLineage listener will automatically execute these functions during the lineage event generation and append their return values to the run facet in the lineage event.
Writing a custom facet function¶
Input arguments: The function should accept two input arguments:
TaskInstance
andTaskInstanceState
.Function body: Perform the logic needed to generate the custom facets. The custom facets must inherit from the
RunFacet
for the_producer
and_schemaURL
to be automatically added for the facet.Return value: The custom facets to be added to the lineage event. Return type should be
dict[str, RunFacet]
orNone
. You may choose to returnNone
, if you do not want to add custom facets for certain criteria.
Example custom facet function
import attrs
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.openlineage.facet import RunFacet
@attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
name: str
jobState: str
uniqueName: str
displayName: str
dagId: str
taskId: str
cluster: str
custom_metadata: dict
def get_my_custom_facet(
task_instance: TaskInstance, ti_state: TaskInstanceState
) -> dict[str, RunFacet] | None:
operator_name = task_instance.task.operator_name
custom_metadata = {}
if operator_name == "BashOperator":
return None
if ti_state == TaskInstanceState.FAILED:
custom_metadata["custom_key_failed"] = "custom_value"
job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
return {
"additional_run_facet": MyCustomRunFacet(
name="test-lineage-namespace",
jobState=task_instance.state,
uniqueName=job_unique_name,
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
dagId=task_instance.dag_id,
taskId=task_instance.task_id,
cluster="TEST",
custom_metadata=custom_metadata,
)
}
Register the custom facet functions¶
Use the custom_run_facets
Airflow configuration to register the custom run facet functions by passing
a string of semicolon separated full import path to the functions.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
Note
The custom facet functions are executed both at the START and COMPLETE/FAIL of the TaskInstance and added to the corresponding OpenLineage event.
When creating conditions on TaskInstance state, you should use second argument provided (
TaskInstanceState
) that will contain the state the task should be in. This may vary from ti.current_state() as the OpenLineage listener may get called before the TaskInstance’s state is updated in Airflow database.When path to a single function is registered more than once, it will still be executed only once.
When duplicate custom facet keys are returned by multiple functions registered, the result of random function result will be added to the lineage event. Please avoid using duplicate facet keys as it can produce unexpected behaviour.
Job Hierarchy¶
Apache Airflow features an inherent job hierarchy: DAGs, large and independently schedulable units, comprise smaller, executable tasks.
OpenLineage reflects this structure in its Job Hierarchy model.
Upon DAG scheduling, a START event is emitted.
Subsequently, following Airflow’s task order, each task triggers:
START events at TaskInstance start.
COMPLETE/FAILED events upon completion.
Finally, upon DAG termination, a completion event (COMPLETE or FAILED) is emitted.
TaskInstance events’ ParentRunFacet references the originating DAG run.
Troubleshooting¶
When testing code locally, Marquez can be used to inspect the data being emitted—or not being emitted. Using Marquez will allow you to figure out if the error is being caused by the Extractor or the API. If data is being emitted from the Extractor as expected but isn’t making it to the UI, then the Extractor is fine and an issue should be opened up in OpenLineage. However, if data is not being emitted properly, it is likely that more unit tests are needed to cover Extractor behavior. Marquez can help you pinpoint which facets are not being formed properly so you know where to add test coverage.
Debug settings¶
For debugging purposes, ensure that the Airflow logging level
is set to DEBUG
and that the debug_mode is enabled for OpenLineage integration.
This will increase the detail in Airflow logs and include additional environmental information in OpenLineage events.
When seeking help with debugging, always try to provide the following:
Airflow scheduler logs with the logging level set to DEBUG
Airflow worker logs (task logs) with the logging level set to DEBUG
OpenLineage events with debug_mode enabled
Where can I learn more?¶
Check out OpenLineage website.
Visit our GitHub repository.
Watch multiple talks about OpenLineage.
How to contribute¶
We welcome your contributions! OpenLineage is an Open Source project under active development, and we’d love your help!
Sounds fun? Check out our new contributor guide to get started.