## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.# Note: Any AirflowException raised is expected to cause the TaskInstance# to be marked in an ERROR state"""Exceptions used by Airflow."""from__future__importannotationsimportdatetimeimportwarningsfromhttpimportHTTPStatusfromtypingimportTYPE_CHECKING,Any,NamedTuple,SizedifTYPE_CHECKING:fromairflow.modelsimportDagRun
[docs]classAirflowTermSignal(Exception):"""Raise when we receive a TERM signal"""
[docs]classAirflowConfigException(AirflowException):"""Raise when there is configuration problem."""
[docs]classAirflowSensorTimeout(AirflowException):"""Raise when there is a timeout on sensor polling."""
[docs]classAirflowRescheduleException(AirflowException):""" Raise when the task should be re-scheduled at a later time. :param reschedule_date: The date when the task should be rescheduled """def__init__(self,reschedule_date):super().__init__()self.reschedule_date=reschedule_date
[docs]classInvalidStatsNameException(AirflowException):"""Raise when name of the stats is invalid."""
[docs]classAirflowTaskTimeout(AirflowException):"""Raise when the task execution times-out."""
[docs]classAirflowWebServerTimeout(AirflowException):"""Raise when the web server times out."""
[docs]classAirflowSkipException(AirflowException):"""Raise when the task should be skipped."""
[docs]classAirflowFailException(AirflowException):"""Raise when the task should be failed without retrying."""
[docs]classAirflowOptionalProviderFeatureException(AirflowException):"""Raise by providers when imports are missing for optional provider features."""
[docs]classXComNotFound(AirflowException):"""Raise when an XCom reference is being resolved against a non-existent XCom."""def__init__(self,dag_id:str,task_id:str,key:str)->None:super().__init__()self.dag_id=dag_idself.task_id=task_idself.key=key
[docs]def__str__(self)->str:returnf'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!'
[docs]classUnmappableOperator(AirflowException):"""Raise when an operator is not implemented to be mappable."""
[docs]classXComForMappingNotPushed(AirflowException):"""Raise when a mapped downstream's dependency fails to push XCom for task mapping."""
[docs]def__str__(self)->str:return"did not push XCom for task mapping"
[docs]classUnmappableXComTypePushed(AirflowException):"""Raise when an unmappable type is pushed as a mapped downstream's dependency."""def__init__(self,value:Any,*values:Any)->None:super().__init__(value,*values)
[docs]def__str__(self)->str:typename=type(self.args[0]).__qualname__forarginself.args[1:]:typename=f"{typename}[{type(arg).__qualname__}]"returnf"unmappable return type {typename!r}"
[docs]classUnmappableXComLengthPushed(AirflowException):"""Raise when the pushed value is too large to map as a downstream's dependency."""def__init__(self,value:Sized,max_length:int)->None:super().__init__(value)self.value=valueself.max_length=max_length
[docs]def__str__(self)->str:returnf"unmappable return value length: {len(self.value)} > {self.max_length}"
[docs]classAirflowDagCycleException(AirflowException):"""Raise when there is a cycle in DAG definition."""
[docs]classAirflowDagDuplicatedIdException(AirflowException):"""Raise when a DAG's ID is already used by another DAG."""def__init__(self,dag_id:str,incoming:str,existing:str)->None:super().__init__(dag_id,incoming,existing)self.dag_id=dag_idself.incoming=incomingself.existing=existing
[docs]def__str__(self)->str:returnf"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
[docs]classAirflowDagInconsistent(AirflowException):"""Raise when a DAG has inconsistent attributes."""
[docs]classAirflowClusterPolicyViolation(AirflowException):"""Raise when there is a violation of a Cluster Policy in DAG definition."""
[docs]classAirflowClusterPolicyError(AirflowException):"""Raise when there is an error except AirflowClusterPolicyViolation in Cluster Policy."""
[docs]classAirflowTimetableInvalid(AirflowException):"""Raise when a DAG has an invalid timetable."""
[docs]classDagNotFound(AirflowNotFoundException):"""Raise when a DAG is not available in the system."""
[docs]classDagCodeNotFound(AirflowNotFoundException):"""Raise when a DAG code is not available in the system."""
[docs]classDagRunNotFound(AirflowNotFoundException):"""Raise when a DAG Run is not available in the system."""
[docs]classDagRunAlreadyExists(AirflowBadRequest):"""Raise when creating a DAG run for DAG which already has DAG run entry."""def__init__(self,dag_run:DagRun,execution_date:datetime.datetime,run_id:str)->None:super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}")self.dag_run=dag_run
[docs]classDagFileExists(AirflowBadRequest):"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder."""def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)warnings.warn("DagFileExists is deprecated and will be removed.",DeprecationWarning,stacklevel=2)
[docs]classDuplicateTaskIdFound(AirflowException):"""Raise when a Task with duplicate task_id is defined in the same DAG."""
[docs]classTaskAlreadyInTaskGroup(AirflowException):"""Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""def__init__(self,task_id:str,existing_group_id:str|None,new_group_id:str)->None:super().__init__(task_id,new_group_id)self.task_id=task_idself.existing_group_id=existing_group_idself.new_group_id=new_group_id
[docs]def__str__(self)->str:ifself.existing_group_idisNone:existing_group="the DAG's root group"else:existing_group=f"group {self.existing_group_id!r}"returnf"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"
[docs]classSerializationError(AirflowException):"""A problem occurred when trying to serialize something."""
[docs]classParamValidationError(AirflowException):"""Raise when DAG params is invalid."""
[docs]classTaskNotFound(AirflowNotFoundException):"""Raise when a Task is not available in the system."""
[docs]classTaskInstanceNotFound(AirflowNotFoundException):"""Raise when a task instance is not available in the system."""
[docs]classPoolNotFound(AirflowNotFoundException):"""Raise when a Pool is not available in the system."""
[docs]classNoAvailablePoolSlot(AirflowException):"""Raise when there is not enough slots in pool."""
[docs]classDagConcurrencyLimitReached(AirflowException):"""Raise when DAG max_active_tasks limit is reached."""
[docs]classTaskConcurrencyLimitReached(AirflowException):"""Raise when task max_active_tasks limit is reached."""
[docs]classBackfillUnfinished(AirflowException):""" Raises when not all tasks succeed in backfill. :param message: The human-readable description of the exception :param ti_status: The information about all task statuses """def__init__(self,message,ti_status):super().__init__(message)self.ti_status=ti_status
[docs]classFileSyntaxError(NamedTuple):"""Information about a single error in a file."""
[docs]def__str__(self):returnf"{self.message}. Line number: s{str(self.line_no)},"
[docs]classAirflowFileParseException(AirflowException):""" Raises when connection or variable file can not be parsed. :param msg: The human-readable description of the exception :param file_path: A processed file that contains errors :param parse_errors: File syntax errors """def__init__(self,msg:str,file_path:str,parse_errors:list[FileSyntaxError])->None:super().__init__(msg)self.msg=msgself.file_path=file_pathself.parse_errors=parse_errors
[docs]classConnectionNotUnique(AirflowException):"""Raise when multiple values are found for the same connection ID."""
[docs]classTaskDeferred(BaseException):""" Signal an operator moving to deferred state. Special exception raised to signal that the operator it was raised from wishes to defer until a trigger fires. """def__init__(self,*,trigger,method_name:str,kwargs:dict[str,Any]|None=None,timeout:datetime.timedelta|None=None,):super().__init__()self.trigger=triggerself.method_name=method_nameself.kwargs=kwargsself.timeout=timeout# Check timeout type at runtimeifself.timeoutisnotNoneandnothasattr(self.timeout,"total_seconds"):raiseValueError("Timeout value must be a timedelta")