Source code for airflow.providers.openlineage.plugins.facets

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from attrs import define
from deprecated import deprecated
from openlineage.client.facet import BaseFacet
from openlineage.client.utils import RedactMixin

from airflow.exceptions import AirflowProviderDeprecationWarning


@deprecated(
    reason="To be removed in the next release. Make sure to use information from AirflowRunFacet instead.",
    category=AirflowProviderDeprecationWarning,
)
@define(slots=False)
[docs]class AirflowMappedTaskRunFacet(BaseFacet): """Run facet containing information about mapped tasks."""
[docs] mapIndex: int
[docs] operatorClass: str
_additional_skip_redact = ["operatorClass"] @classmethod
[docs] def from_task_instance(cls, task_instance): task = task_instance.task from airflow.providers.openlineage.utils.utils import get_operator_class return cls( mapIndex=task_instance.map_index, operatorClass=f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}", )
@define(slots=False)
[docs]class AirflowRunFacet(BaseFacet): """Composite Airflow run facet."""
[docs] dag: dict
[docs] dagRun: dict
[docs] task: dict
[docs] taskInstance: dict
[docs] taskUuid: str
@define(slots=False)
[docs]class UnknownOperatorInstance(RedactMixin): """Describes an unknown operator. This specifies the (class) name of the operator and its properties. """
[docs] name: str
[docs] properties: dict[str, object]
[docs] type: str = "operator"
_skip_redact = ["name", "type"]
@deprecated( reason="To be removed in the next release. Make sure to use information from AirflowRunFacet instead.", category=AirflowProviderDeprecationWarning, ) @define(slots=False)
[docs]class UnknownOperatorAttributeRunFacet(BaseFacet): """RunFacet that describes unknown operators in an Airflow DAG."""
[docs] unknownItems: list[UnknownOperatorInstance]

Was this entry helpful?