# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingimportosimportstructfromdatetimeimportdatetimefromtypingimportIterablefromsqlalchemyimportBigInteger,Column,String,Textfromsqlalchemy.dialects.mysqlimportMEDIUMTEXTfromsqlalchemy.sql.expressionimportliteralfromairflow.exceptionsimportAirflowException,DagCodeNotFoundfromairflow.models.baseimportBasefromairflow.utilsimporttimezonefromairflow.utils.fileimportcorrect_maybe_zipped,open_maybe_zippedfromairflow.utils.sessionimportprovide_sessionfromairflow.utils.sqlalchemyimportUtcDateTime
[docs]classDagCode(Base):"""A table for DAGs code. dag_code table contains code of DAG files synchronized by scheduler. For details on dag serialization see SerializedDagModel """
[docs]defsync_to_db(self,session=None):"""Writes code into database. :param session: ORM Session """self.bulk_sync_to_db([self.fileloc],session)
@classmethod@provide_session
[docs]defbulk_sync_to_db(cls,filelocs:Iterable[str],session=None):"""Writes code in bulk into database. :param filelocs: file paths of DAGs to sync :param session: ORM Session """filelocs=set(filelocs)filelocs_to_hashes={fileloc:DagCode.dag_fileloc_hash(fileloc)forfilelocinfilelocs}existing_orm_dag_codes=(session.query(DagCode).filter(DagCode.fileloc_hash.in_(filelocs_to_hashes.values())).with_for_update(of=DagCode).all())ifexisting_orm_dag_codes:existing_orm_dag_codes_map={orm_dag_code.fileloc:orm_dag_codefororm_dag_codeinexisting_orm_dag_codes}else:existing_orm_dag_codes_map={}existing_orm_dag_codes_by_fileloc_hashes={orm.fileloc_hash:ormfororminexisting_orm_dag_codes}existing_orm_filelocs={orm.filelocfororminexisting_orm_dag_codes_by_fileloc_hashes.values()}ifnotexisting_orm_filelocs.issubset(filelocs):conflicting_filelocs=existing_orm_filelocs.difference(filelocs)hashes_to_filelocs={DagCode.dag_fileloc_hash(fileloc):filelocforfilelocinfilelocs}message=""forfilelocinconflicting_filelocs:filename=hashes_to_filelocs[DagCode.dag_fileloc_hash(fileloc)]message+=(f"Filename '{filename}' causes a hash collision in the "f"database with '{fileloc}'. Please rename the file.")raiseAirflowException(message)existing_filelocs={dag_code.filelocfordag_codeinexisting_orm_dag_codes}missing_filelocs=filelocs.difference(existing_filelocs)forfilelocinmissing_filelocs:orm_dag_code=DagCode(fileloc,cls._get_code_from_file(fileloc))session.add(orm_dag_code)forfilelocinexisting_filelocs:current_version=existing_orm_dag_codes_by_fileloc_hashes[filelocs_to_hashes[fileloc]]file_mod_time=datetime.fromtimestamp(os.path.getmtime(correct_maybe_zipped(fileloc)),tz=timezone.utc)iffile_mod_time>current_version.last_updated:orm_dag_code=existing_orm_dag_codes_map[fileloc]orm_dag_code.last_updated=file_mod_timeorm_dag_code.source_code=cls._get_code_from_file(orm_dag_code.fileloc)session.merge(orm_dag_code)
@classmethod@provide_session
[docs]defremove_deleted_code(cls,alive_dag_filelocs:list[str],session=None):"""Deletes code not included in alive_dag_filelocs. :param alive_dag_filelocs: file paths of alive DAGs :param session: ORM Session """alive_fileloc_hashes=[cls.dag_fileloc_hash(fileloc)forfilelocinalive_dag_filelocs]log.debug("Deleting code from %s table ",cls.__tablename__)session.query(cls).filter(cls.fileloc_hash.notin_(alive_fileloc_hashes),cls.fileloc.notin_(alive_dag_filelocs)).delete(synchronize_session="fetch")
@classmethod@provide_session
[docs]defhas_dag(cls,fileloc:str,session=None)->bool:"""Checks a file exist in dag_code table. :param fileloc: the file to check :param session: ORM Session """fileloc_hash=cls.dag_fileloc_hash(fileloc)returnsession.query(literal(True)).filter(cls.fileloc_hash==fileloc_hash).one_or_none()isnotNone
@classmethod
[docs]defget_code_by_fileloc(cls,fileloc:str)->str:"""Returns source code for a given fileloc. :param fileloc: file path of a DAG :return: source code as string """returncls.code(fileloc)
@classmethod
[docs]defcode(cls,fileloc)->str:"""Returns source code for this DagCode object. :return: source code as string """returncls._get_code_from_db(fileloc)
[docs]defdag_fileloc_hash(full_filepath:str)->int:"""Hashing file location for indexing. :param full_filepath: full filepath of DAG file :return: hashed full_filepath """# Hashing is needed because the length of fileloc is 2000 as an Airflow convention,# which is over the limit of indexing.importhashlib# Only 7 bytes because MySQL BigInteger can hold only 8 bytes (signed).returnstruct.unpack(">Q",hashlib.sha1(full_filepath.encode("utf-8")).digest()[-8:])[0]>>8