## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.## Note: Any AirflowException raised is expected to cause the TaskInstance# to be marked in an ERROR state"""Exceptions used by Airflow"""importdatetimeimportwarningsfromtypingimportAny,Dict,List,NamedTuple,Optionalfromairflow.api_connexion.exceptionsimportNotFoundasApiConnextionNotFoundfromairflow.utils.code_utilsimportprepare_code_snippetfromairflow.utils.platformimportis_tty
[docs]classAirflowException(Exception):""" Base class for all Airflow's errors. Each custom exception should be derived from this class """
[docs]classAirflowNotFoundException(AirflowException,ApiConnextionNotFound):"""Raise when the requested object/resource is not available in the system"""
"""Raise when there is a timeout on sensor polling"""
[docs]classAirflowRescheduleException(AirflowException):""" Raise when the task should be re-scheduled at a later time. :param reschedule_date: The date when the task should be rescheduled :type reschedule_date: datetime.datetime """def__init__(self,reschedule_date):super().__init__()self.reschedule_date=reschedule_date
"""Raise when there is a cycle in Dag definition"""
[docs]classAirflowDagDuplicatedIdException(AirflowException):"""Raise when a Dag's ID is already used by another Dag"""def__init__(self,dag_id:str,incoming:str,existing:str)->None:super().__init__(dag_id,incoming,existing)self.dag_id=dag_idself.incoming=incomingself.existing=existing
[docs]def__str__(self)->str:returnf"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
"""Raise when creating a DAG run for DAG which already has DAG run entry"""
[docs]classDagFileExists(AirflowBadRequest):"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder"""def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)warnings.warn("DagFileExists is deprecated and will be removed.",DeprecationWarning,stacklevel=2)
"""Raise when task max_active_tasks limit is reached"""
[docs]classBackfillUnfinished(AirflowException):""" Raises when not all tasks succeed in backfill. :param message: The human-readable description of the exception :param ti_status: The information about all task statuses """def__init__(self,message,ti_status):super().__init__(message)self.ti_status=ti_status
[docs]classFileSyntaxError(NamedTuple):"""Information about a single error in a file."""
[docs]def__str__(self):returnf"{self.message}. Line number: s{str(self.line_no)},"
[docs]classAirflowFileParseException(AirflowException):""" Raises when connection or variable file can not be parsed :param msg: The human-readable description of the exception :param file_path: A processed file that contains errors :param parse_errors: File syntax errors """def__init__(self,msg:str,file_path:str,parse_errors:List[FileSyntaxError])->None:super().__init__(msg)self.msg=msgself.file_path=file_pathself.parse_errors=parse_errors
"""Raise when multiple values are found for the same conn_id"""
[docs]classTaskDeferred(BaseException):""" Special exception raised to signal that the operator it was raised from wishes to defer until a trigger fires. """def__init__(self,*,trigger,method_name:str,kwargs:Optional[Dict[str,Any]]=None,timeout:Optional[datetime.timedelta]=None,):super().__init__()self.trigger=triggerself.method_name=method_nameself.kwargs=kwargsself.timeout=timeout# Check timeout type at runtimeifself.timeoutisnotNoneandnothasattr(self.timeout,"total_seconds"):raiseValueError("Timeout value must be a timedelta")