Source code for tests.system.openlineage.example_openlineage_all_facets_dag

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DAG with a custom operator that emits all importable openlineage.client.facet_v2 facets that OL do not overwrite.

It checks:
    - all run facets
    - all job facets
    - all input dataset facets
    - all output dataset facets
    - one custom facet per entity
    - all optional parameters are populated for comprehensive serialization coverage
"""

from __future__ import annotations

from datetime import datetime

from openlineage.client.event_v2 import InputDataset, OutputDataset
from openlineage.client.facet_v2 import (
    base_subset_dataset,
    catalog_dataset,
    column_lineage_dataset,
    data_quality_assertions_dataset,
    data_quality_metrics_dataset,
    data_quality_metrics_input_dataset,
    dataset_type_dataset,
    dataset_version_dataset,
    datasource_dataset,
    documentation_dataset,
    documentation_job,
    environment_variables_run,
    error_message_run,
    execution_parameters_run,
    external_query_run,
    extraction_error_run,
    hierarchy_dataset,
    input_statistics_input_dataset,
    job_dependencies_run,
    lifecycle_state_change_dataset,
    output_statistics_output_dataset,
    ownership_dataset,
    ownership_job,
    schema_dataset,
    source_code_job,
    source_code_location_job,
    sql_job,
    storage_dataset,
    symlinks_dataset,
    tags_dataset,
    tags_job,
    tags_run,
    test_run as test_run_facet_module,
)

from airflow import DAG
from airflow.providers.common.compat.sdk import BaseOperator
from airflow.providers.openlineage.extractors.base import OperatorLineage

from system.openlineage.expected_events import get_expected_event_file_path
from system.openlineage.operator import OpenLineageTestOperator


[docs] class AllFacetsOperator(BaseOperator):
[docs] def execute(self, context): pass
[docs] def get_openlineage_facets_on_start(self) -> OperatorLineage: input_ds = InputDataset( namespace="s3://all-facets-bucket", name="input/data.csv", facets={ "custom_input_ds_facet": {"key": "value"}, # type: ignore[dict-item] "schema": schema_dataset.SchemaDatasetFacet( fields=[ schema_dataset.SchemaDatasetFacetFields( name="id", type="INTEGER", description="Unique row identifier", ordinal_position=1, ), schema_dataset.SchemaDatasetFacetFields( name="name", type="VARCHAR", description="Full name of the entity", ordinal_position=2, ), schema_dataset.SchemaDatasetFacetFields( name="address", type="STRUCT", description="Nested address record", ordinal_position=3, fields=[ schema_dataset.SchemaDatasetFacetFields( name="street", type="VARCHAR", description="Street address line", ordinal_position=1, ), schema_dataset.SchemaDatasetFacetFields( name="city", type="VARCHAR", description="City name", ordinal_position=2, ), ], ), ] ), "dataSource": datasource_dataset.DatasourceDatasetFacet( name="all-facets-source", uri="s3://all-facets-bucket", ), "columnLineage": column_lineage_dataset.ColumnLineageDatasetFacet( fields={ "name": column_lineage_dataset.Fields( inputFields=[ column_lineage_dataset.InputField( namespace="s3://upstream", name="upstream/data.csv", field="full_name", transformations=[ column_lineage_dataset.Transformation( type="DIRECT", subtype="IDENTITY", description="Direct copy of the full_name field", masking=False, ) ], ) ], transformationDescription="Column passed through without modification", transformationType="IDENTITY", ), }, ), "documentation": documentation_dataset.DocumentationDatasetFacet( description="Input dataset for all-facets comprehensive serialization test", contentType="text/plain", ), "inputStatistics": input_statistics_input_dataset.InputStatisticsInputDatasetFacet( # type: ignore[dict-item] rowCount=1000, size=8192, fileCount=1, ), "dataQualityAssertions": data_quality_assertions_dataset.DataQualityAssertionsDatasetFacet( # type: ignore[dict-item] assertions=[ data_quality_assertions_dataset.Assertion( assertion="not_null", success=True, column="id", severity="ERROR", name="id_not_null_check", description="Checks that id column has no null values", expected="0 nulls", actual="0 nulls", content='{"nullCount": 0}', contentType="application/json", params={"sample_size": "1000"}, ), data_quality_assertions_dataset.Assertion( assertion="row_count_above_threshold", success=True, severity="WARNING", name="row_count_check", description="Checks that row count is above the minimum threshold", expected=">= 100", actual="1000", ), ], ), "ownership": ownership_dataset.OwnershipDatasetFacet( owners=[ ownership_dataset.Owner(name="team:data-engineering", type="team"), ownership_dataset.Owner(name="user:jane.smith@example.com", type="user"), ], ), "tags": tags_dataset.TagsDatasetFacet( tags=[ tags_dataset.TagsDatasetFacetFields( key="env", value="test", source="airflow-system-test", ), tags_dataset.TagsDatasetFacetFields( key="source", value="s3", source="airflow-system-test", ), tags_dataset.TagsDatasetFacetFields( key="format", value="csv", source="airflow-system-test", field="id", ), ], ), "catalog": catalog_dataset.CatalogDatasetFacet( framework="iceberg", type="TABLE", name="all_facets_input", metadataUri="s3://metastore/all_facets.json", warehouseUri="s3://all-facets-bucket/warehouse", source="s3://all-facets-bucket/catalog.json", catalogProperties={"location": "s3://all-facets-bucket/warehouse/all_facets_input"}, ), "dataQualityMetrics": data_quality_metrics_dataset.DataQualityMetricsDatasetFacet( columnMetrics={ "id": data_quality_metrics_dataset.ColumnMetrics( nullCount=0, distinctCount=1000, sum=500500.0, count=1000, min=1.0, max=1000.0, quantiles={"0.25": 250.0, "0.5": 500.0, "0.75": 750.0}, ), "name": data_quality_metrics_dataset.ColumnMetrics( nullCount=5, distinctCount=980, count=1000, ), }, rowCount=1000, bytes=8192, fileCount=1, ), "dataQualityMetricsInput": data_quality_metrics_input_dataset.DataQualityMetricsInputDatasetFacet( # type: ignore[dict-item] columnMetrics={ "id": data_quality_metrics_input_dataset.ColumnMetrics( nullCount=0, distinctCount=1000, sum=500500.0, count=1000, min=1.0, max=1000.0, quantiles={"0.25": 250.0, "0.5": 500.0, "0.75": 750.0}, ), }, rowCount=1000, bytes=8192, fileCount=1, ), "datasetType": dataset_type_dataset.DatasetTypeDatasetFacet( datasetType="TABLE", subType="ICEBERG_TABLE", ), "hierarchy": hierarchy_dataset.HierarchyDatasetFacet( hierarchy=[ hierarchy_dataset.HierarchyDatasetFacetLevel(type="catalog", name="aws-glue"), hierarchy_dataset.HierarchyDatasetFacetLevel(type="database", name="analytics"), hierarchy_dataset.HierarchyDatasetFacetLevel(type="schema", name="public"), ], ), "inputSubset": base_subset_dataset.InputSubsetInputDatasetFacet( # type: ignore[dict-item] inputCondition=base_subset_dataset.PartitionSubsetCondition( partitions=[ base_subset_dataset.Partition( dimensions={"business_date": "2024-10-15", "country": "PL"}, identifier="2024-01-01/us-east-1", ) ], type="partition", ), ), }, ) output_ds = OutputDataset( namespace="snowflake://account", name="analytics.public.all_facets_output", facets={ "custom_output_ds_facet": {"key": "value"}, # type: ignore[dict-item] "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet( # type: ignore[dict-item] rowCount=500, size=4096, fileCount=1, ), "storage": storage_dataset.StorageDatasetFacet( storageLayer="snowflake", fileFormat="table", ), "symlinks": symlinks_dataset.SymlinksDatasetFacet( identifiers=[ symlinks_dataset.Identifier( namespace="snowflake://account", name="analytics.public.all_facets_output_alias", type="TABLE", ), ], ), "version": dataset_version_dataset.DatasetVersionDatasetFacet( datasetVersion="v1.0.0", ), "lifecycleStateChange": lifecycle_state_change_dataset.LifecycleStateChangeDatasetFacet( lifecycleStateChange=lifecycle_state_change_dataset.LifecycleStateChange.CREATE, previousIdentifier=lifecycle_state_change_dataset.PreviousIdentifier( name="analytics.public.all_facets_output_v0", namespace="snowflake://account", ), ), "outputSubset": base_subset_dataset.OutputSubsetOutputDatasetFacet( # type: ignore[dict-item] outputCondition=base_subset_dataset.PartitionSubsetCondition( partitions=[ base_subset_dataset.Partition( dimensions={"business_date": "2024-10-15", "country": "PL"}, identifier="2024-01-01", ) ], type="partition", ), ), }, ) return OperatorLineage( inputs=[input_ds], outputs=[output_ds], run_facets={ "custom_run_facet": {"key": "value"}, "tags": tags_run.TagsRunFacet( tags=[ tags_run.TagsRunFacetFields( key="test_type", value="all_facets", source="airflow-system-test", ), ], ), "externalQuery": external_query_run.ExternalQueryRunFacet( externalQueryId="all-facets-query-id-001", source="snowflake://account", ), "testRun": test_run_facet_module.TestRunFacet( tests=[ test_run_facet_module.TestExecution( name="all_facets_test", status="success", severity="WARNING", type="integration", description="Checks all facets are emitted and serialized correctly", expected="all_facets_present", actual="all_facets_present", content='{"facetCount": 37}', contentType="application/json", params={"batch": "1"}, ), ], ), "environmentVariables": environment_variables_run.EnvironmentVariablesRunFacet( environmentVariables=[ environment_variables_run.EnvironmentVariable( name="SPARK_MASTER", value="yarn", ), environment_variables_run.EnvironmentVariable( name="JAVA_HOME", value="/usr/lib/jvm/java-11-openjdk", ), ], ), "errorMessage": error_message_run.ErrorMessageRunFacet( message="Non-fatal warning detected during extraction phase", programmingLanguage="python", stackTrace=( "Traceback (most recent call last):\n" ' File "extract.py", line 42, in run\n' ' raise ValueError("Partial extraction failure")\n' "ValueError: Partial extraction failure" ), ), "executionParameters": execution_parameters_run.ExecutionParametersRunFacet( parameters=[ execution_parameters_run.ExecutionParameter( key="executor-cores", name="executor-cores", description="Number of CPU cores per executor", value="4", ), execution_parameters_run.ExecutionParameter( key="executor-memory", name="executor-memory", description="Memory allocated per executor in gigabytes", value="8g", ), ], ), "extractionError": extraction_error_run.ExtractionErrorRunFacet( totalTasks=10, failedTasks=1, errors=[ extraction_error_run.Error( errorMessage="Unable to parse column 'event_ts' as TIMESTAMP", stackTrace=( "Traceback (most recent call last):\n" ' File "schema.py", line 18, in parse_column\n' ' raise TypeError("Cannot cast to TIMESTAMP")\n' "TypeError: Cannot cast to TIMESTAMP" ), task="parse_schema", taskNumber=3, ), ], ), "jobDependencies": job_dependencies_run.JobDependenciesRunFacet( upstream=[ job_dependencies_run.JobDependency( job=job_dependencies_run.JobIdentifier( namespace="default", name="upstream_etl_job", ), run=job_dependencies_run.RunIdentifier( runId="00000000-0000-0000-0000-000000000001", ), dependency_type="WAIT", sequence_trigger_rule="ALL_DONE", status_trigger_rule="ALL_SUCCESS", ), ], downstream=[], trigger_rule="ALL_SUCCESS", ), }, job_facets={ "custom_job_facet": {"key": "value"}, "tags": tags_job.TagsJobFacet( tags=[ tags_job.TagsJobFacetFields( key="domain", value="data-platform", source="airflow-system-test", ), tags_job.TagsJobFacetFields( key="team", value="data-engineering", source="airflow-system-test", ), ], ), "sourceCodeLocation": source_code_location_job.SourceCodeLocationJobFacet( type="git", url="https://github.com/apache/airflow/blob/main/providers/openlineage/tests/system/openlineage/example_openlineage_all_facets_dag.py", repoUrl="https://github.com/apache/airflow", path="providers/openlineage/tests/system/openlineage/example_openlineage_all_facets_dag.py", version="main", tag="v2.10.0", branch="main", pullRequestNumber="12345", ), "documentation": documentation_job.DocumentationJobFacet( description="All-facets test operator emitting every available OL facet", ), "sql": sql_job.SQLJobFacet( query="SELECT id, name FROM analytics.public.all_facets_output WHERE id IS NOT NULL", dialect="snowflake_sql", ), "ownership": ownership_job.OwnershipJobFacet( owners=[ ownership_job.Owner(name="data-engineering-team", type="team"), ownership_job.Owner(name="user:john.doe@example.com", type="user"), ], ), "sourceCode": source_code_job.SourceCodeJobFacet( language="python", sourceCode=( "def transform(df):\n return df.select('id', 'name').filter(df.id.isNotNull())\n" ), ), }, )
[docs] DAG_ID = "openlineage_all_facets_dag"
with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule=None, catchup=False, default_args={"retries": 0}, ) as dag:
[docs] all_facets_task = AllFacetsOperator(task_id="all_facets_task")
check_events = OpenLineageTestOperator( task_id="check_events", file_path=get_expected_event_file_path(DAG_ID) ) all_facets_task >> check_events from tests_common.test_utils.system_tests import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: contributing-docs/testing/system_tests.rst)
[docs] test_run = get_test_run(dag)

Was this entry helpful?