## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.# Note: Any AirflowException raised is expected to cause the TaskInstance# to be marked in an ERROR state"""Exceptions used by Airflow."""from__future__importannotationsimportwarningsfromhttpimportHTTPStatusfromtypingimportTYPE_CHECKING,Any,NamedTuplefromairflow.utils.trigger_ruleimportTriggerRuleifTYPE_CHECKING:importdatetimefromcollections.abcimportSizedfromairflow.modelsimportDAG,DagRun
[docs]classAirflowException(Exception):""" Base class for all Airflow's errors. Each custom exception should be derived from this class. """
[docs]classAirflowConfigException(AirflowException):"""Raise when there is configuration problem."""
[docs]classAirflowSensorTimeout(AirflowException):"""Raise when there is a timeout on sensor polling."""
[docs]classAirflowRescheduleException(AirflowException):""" Raise when the task should be re-scheduled at a later time. :param reschedule_date: The date when the task should be rescheduled """def__init__(self,reschedule_date):super().__init__()self.reschedule_date=reschedule_date
[docs]classInvalidStatsNameException(AirflowException):"""Raise when name of the stats is invalid."""
# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt)
[docs]classAirflowTaskTimeout(BaseException):"""Raise when the task execution times-out."""
[docs]classAirflowTaskTerminated(BaseException):"""Raise when the task execution is terminated."""
[docs]classAirflowWebServerTimeout(AirflowException):"""Raise when the web server times out."""
[docs]classAirflowSkipException(AirflowException):"""Raise when the task should be skipped."""
[docs]classAirflowFailException(AirflowException):"""Raise when the task should be failed without retrying."""
[docs]classAirflowOptionalProviderFeatureException(AirflowException):"""Raise by providers when imports are missing for optional provider features."""
classAirflowInternalRuntimeError(BaseException):""" Airflow Internal runtime error. Indicates that something really terrible happens during the Airflow execution. :meta private: """
[docs]classXComNotFound(AirflowException):"""Raise when an XCom reference is being resolved against a non-existent XCom."""def__init__(self,dag_id:str,task_id:str,key:str)->None:super().__init__()self.dag_id=dag_idself.task_id=task_idself.key=key
[docs]def__str__(self)->str:returnf'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!'
[docs]classUnmappableOperator(AirflowException):"""Raise when an operator is not implemented to be mappable."""
[docs]classXComForMappingNotPushed(AirflowException):"""Raise when a mapped downstream's dependency fails to push XCom for task mapping."""
[docs]def__str__(self)->str:return"did not push XCom for task mapping"
[docs]classUnmappableXComTypePushed(AirflowException):"""Raise when an unmappable type is pushed as a mapped downstream's dependency."""def__init__(self,value:Any,*values:Any)->None:super().__init__(value,*values)
[docs]def__str__(self)->str:typename=type(self.args[0]).__qualname__forarginself.args[1:]:typename=f"{typename}[{type(arg).__qualname__}]"returnf"unmappable return type {typename!r}"
[docs]classUnmappableXComLengthPushed(AirflowException):"""Raise when the pushed value is too large to map as a downstream's dependency."""def__init__(self,value:Sized,max_length:int)->None:super().__init__(value)self.value=valueself.max_length=max_length
[docs]def__str__(self)->str:returnf"unmappable return value length: {len(self.value)} > {self.max_length}"
[docs]classAirflowDagCycleException(AirflowException):"""Raise when there is a cycle in DAG definition."""
[docs]classAirflowDagDuplicatedIdException(AirflowException):"""Raise when a DAG's ID is already used by another DAG."""def__init__(self,dag_id:str,incoming:str,existing:str)->None:super().__init__(dag_id,incoming,existing)self.dag_id=dag_idself.incoming=incomingself.existing=existing
[docs]def__str__(self)->str:returnf"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
[docs]classAirflowDagInconsistent(AirflowException):"""Raise when a DAG has inconsistent attributes."""
[docs]classAirflowClusterPolicyViolation(AirflowException):"""Raise when there is a violation of a Cluster Policy in DAG definition."""
[docs]classAirflowClusterPolicySkipDag(AirflowException):"""Raise when skipping dag is needed in Cluster Policy."""
[docs]classAirflowClusterPolicyError(AirflowException):"""Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag."""
[docs]classAirflowTimetableInvalid(AirflowException):"""Raise when a DAG has an invalid timetable."""
[docs]classDagNotFound(AirflowNotFoundException):"""Raise when a DAG is not available in the system."""
[docs]classDagCodeNotFound(AirflowNotFoundException):"""Raise when a DAG code is not available in the system."""
[docs]classDagRunNotFound(AirflowNotFoundException):"""Raise when a DAG Run is not available in the system."""
[docs]classDagRunAlreadyExists(AirflowBadRequest):"""Raise when creating a DAG run for DAG which already has DAG run entry."""def__init__(self,dag_run:DagRun,execution_date:datetime.datetime,run_id:str)->None:super().__init__(f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}")self.dag_run=dag_runself.execution_date=execution_dateself.run_id=run_id
[docs]defserialize(self):cls=self.__class__# Note the DagRun object will be detached here and fails serialization, we need to create a new onefromairflow.modelsimportDagRundag_run=DagRun(state=self.dag_run.state,dag_id=self.dag_run.dag_id,run_id=self.dag_run.run_id,external_trigger=self.dag_run.external_trigger,run_type=self.dag_run.run_type,execution_date=self.dag_run.execution_date,)dag_run.id=self.dag_run.idreturn(f"{cls.__module__}.{cls.__name__}",(),{"dag_run":dag_run,"execution_date":self.execution_date,"run_id":self.run_id},)
[docs]classDagFileExists(AirflowBadRequest):"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder."""def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)warnings.warn("DagFileExists is deprecated and will be removed.",DeprecationWarning,stacklevel=2)
[docs]classFailStopDagInvalidTriggerRule(AirflowException):"""Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule."""_allowed_rules=(TriggerRule.ALL_SUCCESS,TriggerRule.ALL_DONE_SETUP_SUCCESS)@classmethoddefcheck(cls,*,dag:DAG|None,trigger_rule:TriggerRule):""" Check that fail_stop dag tasks have allowable trigger rules. :meta private: """ifdagisnotNoneanddag.fail_stopandtrigger_rulenotincls._allowed_rules:raisecls()
[docs]def__str__(self)->str:returnf"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
[docs]classDuplicateTaskIdFound(AirflowException):"""Raise when a Task with duplicate task_id is defined in the same DAG."""
[docs]classTaskAlreadyInTaskGroup(AirflowException):"""Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""def__init__(self,task_id:str,existing_group_id:str|None,new_group_id:str)->None:super().__init__(task_id,new_group_id)self.task_id=task_idself.existing_group_id=existing_group_idself.new_group_id=new_group_id
[docs]def__str__(self)->str:ifself.existing_group_idisNone:existing_group="the DAG's root group"else:existing_group=f"group {self.existing_group_id!r}"returnf"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"
[docs]classSerializationError(AirflowException):"""A problem occurred when trying to serialize something."""
[docs]classParamValidationError(AirflowException):"""Raise when DAG params is invalid."""
[docs]classTaskNotFound(AirflowNotFoundException):"""Raise when a Task is not available in the system."""
[docs]classTaskInstanceNotFound(AirflowNotFoundException):"""Raise when a task instance is not available in the system."""
[docs]classPoolNotFound(AirflowNotFoundException):"""Raise when a Pool is not available in the system."""
[docs]classNoAvailablePoolSlot(AirflowException):"""Raise when there is not enough slots in pool."""
[docs]classDagConcurrencyLimitReached(AirflowException):"""Raise when DAG max_active_tasks limit is reached."""
[docs]classTaskConcurrencyLimitReached(AirflowException):"""Raise when task max_active_tasks limit is reached."""
[docs]classBackfillUnfinished(AirflowException):""" Raises when not all tasks succeed in backfill. :param message: The human-readable description of the exception :param ti_status: The information about all task statuses """def__init__(self,message,ti_status):super().__init__(message)self.ti_status=ti_status
[docs]classFileSyntaxError(NamedTuple):"""Information about a single error in a file."""
[docs]def__str__(self):returnf"{self.message}. Line number: s{str(self.line_no)},"
[docs]classAirflowFileParseException(AirflowException):""" Raises when connection or variable file can not be parsed. :param msg: The human-readable description of the exception :param file_path: A processed file that contains errors :param parse_errors: File syntax errors """def__init__(self,msg:str,file_path:str,parse_errors:list[FileSyntaxError])->None:super().__init__(msg)self.msg=msgself.file_path=file_pathself.parse_errors=parse_errors
[docs]classConnectionNotUnique(AirflowException):"""Raise when multiple values are found for the same connection ID."""
[docs]classTaskDeferred(BaseException):""" Signal an operator moving to deferred state. Special exception raised to signal that the operator it was raised from wishes to defer until a trigger fires. Triggers can send execution back to task or end the task instance directly. If the trigger should end the task instance itself, ``method_name`` does not matter, and can be None; otherwise, provide the name of the method that should be used when resuming execution in the task. """def__init__(self,*,trigger,method_name:str,kwargs:dict[str,Any]|None=None,timeout:datetime.timedelta|None=None,):super().__init__()self.trigger=triggerself.method_name=method_nameself.kwargs=kwargsself.timeout=timeout# Check timeout type at runtimeifself.timeoutisnotNoneandnothasattr(self.timeout,"total_seconds"):raiseValueError("Timeout value must be a timedelta")
[docs]classTaskDeferralError(AirflowException):"""Raised when a task failed during deferral for some reason."""
# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator# and it raises one of those exceptions. The code should be backwards compatible even if you import# and try/except the exception using direct imports from airflow.exceptions.# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception.# 2) if you have new provider, both provider and pod generator will throw the# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider.try:fromairflow.providers.cncf.kubernetes.pod_generatorimportPodMutationHookExceptionexceptImportError:
[docs]classPodMutationHookException(AirflowException):# type: ignore[no-redef]"""Raised when exception happens during Pod Mutation Hook execution."""
[docs]classPodReconciliationError(AirflowException):# type: ignore[no-redef]"""Raised when an error is encountered while trying to merge pod configs."""
[docs]classRemovedInAirflow3Warning(DeprecationWarning):"""Issued for usage of deprecated features that will be removed in Airflow3."""
"Indicates the provider version that started raising this deprecation warning"
[docs]classDeserializingResultError(ValueError):"""Raised when an error is encountered while a pickling library deserializes a pickle file."""
[docs]def__str__(self):return("Error deserializing result. Note that result deserialization ""is not supported across major Python versions. Cause: "+str(self.__cause__))
[docs]classUnknownExecutorException(ValueError):"""Raised when an attempt is made to load an executor which is not configured."""