## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Save Rendered Template Fields"""from__future__importannotationsimportosfromtypingimportTYPE_CHECKINGimportsqlalchemy_jsonfieldfromsqlalchemyimportColumn,ForeignKeyConstraint,Integer,PrimaryKeyConstraint,textfromsqlalchemy.ext.associationproxyimportassociation_proxyfromsqlalchemy.ormimportSession,relationshipfromairflow.configurationimportconffromairflow.models.baseimportBase,StringIDfromairflow.models.taskinstanceimportTaskInstancefromairflow.serialization.helpersimportserialize_template_fieldfromairflow.settingsimportjsonfromairflow.utils.retriesimportretry_db_transactionfromairflow.utils.sessionimportNEW_SESSION,provide_sessionfromairflow.utils.sqlalchemyimporttuple_not_in_conditionifTYPE_CHECKING:fromsqlalchemy.sqlimportFromClause
)# We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining# the relationship we can more easily find the execution date for these rows
[docs]defget_templated_fields(cls,ti:TaskInstance,session:Session=NEW_SESSION)->dict|None:""" Get templated field for a TaskInstance from the RenderedTaskInstanceFields table. :param ti: Task Instance :param session: SqlAlchemy Session :return: Rendered Templated TI field """result=(session.query(cls.rendered_fields).filter(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.run_id==ti.run_id,cls.map_index==ti.map_index,).one_or_none())ifresult:rendered_fields=result.rendered_fieldsreturnrendered_fieldselse:returnNone
@classmethod@provide_session
[docs]defget_k8s_pod_yaml(cls,ti:TaskInstance,session:Session=NEW_SESSION)->dict|None:""" Get rendered Kubernetes Pod Yaml for a TaskInstance from the RenderedTaskInstanceFields table. :param ti: Task Instance :param session: SqlAlchemy Session :return: Kubernetes Pod Yaml """result=(session.query(cls.k8s_pod_yaml).filter(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.run_id==ti.run_id,cls.map_index==ti.map_index,).one_or_none())returnresult.k8s_pod_yamlifresultelseNone
@provide_session
[docs]defwrite(self,session:Session=None):"""Write instance to database :param session: SqlAlchemy Session """session.merge(self)
@classmethod@provide_session
[docs]defdelete_old_records(cls,task_id:str,dag_id:str,num_to_keep:int=conf.getint("core","max_num_rendered_ti_fields_per_task",fallback=0),session:Session=NEW_SESSION,)->None:""" Keep only Last X (num_to_keep) number of records for a task by deleting others. In the case of data for a mapped task either all of the rows or none of the rows will be deleted, so we don't end up with partial data for a set of mapped Task Instances left in the database. :param task_id: Task ID :param dag_id: Dag ID :param num_to_keep: Number of Records to keep :param session: SqlAlchemy Session """fromairflow.models.dagrunimportDagRunifnum_to_keep<=0:returntis_to_keep_query=(session.query(cls.dag_id,cls.task_id,cls.run_id).filter(cls.dag_id==dag_id,cls.task_id==task_id).join(cls.dag_run).distinct().order_by(DagRun.execution_date.desc()).limit(num_to_keep))cls._do_delete_old_records(dag_id=dag_id,task_id=task_id,ti_clause=tis_to_keep_query.subquery(),session=session,)session.flush()
@classmethod@retry_db_transactiondef_do_delete_old_records(cls,*,task_id:str,dag_id:str,ti_clause:FromClause,session:Session,)->None:# This query might deadlock occasionally and it should be retried if fails (see decorator)session.query(cls).filter(cls.dag_id==dag_id,cls.task_id==task_id,tuple_not_in_condition((cls.dag_id,cls.task_id,cls.run_id),session.query(ti_clause.c.dag_id,ti_clause.c.task_id,ti_clause.c.run_id),),).delete(synchronize_session=False)