## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Save Rendered Template Fields"""from__future__importannotationsimportosimportsqlalchemy_jsonfieldfromsqlalchemyimportColumn,ForeignKeyConstraint,Integer,PrimaryKeyConstraint,and_,not_,text,tuple_fromsqlalchemy.ext.associationproxyimportassociation_proxyfromsqlalchemy.ormimportSession,relationshipfromairflow.configurationimportconffromairflow.models.baseimportBase,StringIDfromairflow.models.taskinstanceimportTaskInstancefromairflow.serialization.helpersimportserialize_template_fieldfromairflow.settingsimportjsonfromairflow.utils.retriesimportretry_db_transactionfromairflow.utils.sessionimportNEW_SESSION,provide_session
)# We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining# the relationship we can more easily find the execution date for these rows
[docs]defget_templated_fields(cls,ti:TaskInstance,session:Session=NEW_SESSION)->dict|None:""" Get templated field for a TaskInstance from the RenderedTaskInstanceFields table. :param ti: Task Instance :param session: SqlAlchemy Session :return: Rendered Templated TI field """result=(session.query(cls.rendered_fields).filter(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.run_id==ti.run_id,cls.map_index==ti.map_index,).one_or_none())ifresult:rendered_fields=result.rendered_fieldsreturnrendered_fieldselse:returnNone
@classmethod@provide_session
[docs]defget_k8s_pod_yaml(cls,ti:TaskInstance,session:Session=NEW_SESSION)->dict|None:""" Get rendered Kubernetes Pod Yaml for a TaskInstance from the RenderedTaskInstanceFields table. :param ti: Task Instance :param session: SqlAlchemy Session :return: Kubernetes Pod Yaml """result=(session.query(cls.k8s_pod_yaml).filter(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.run_id==ti.run_id,cls.map_index==ti.map_index,).one_or_none())returnresult.k8s_pod_yamlifresultelseNone
@provide_session
[docs]defwrite(self,session:Session=None):"""Write instance to database :param session: SqlAlchemy Session """session.merge(self)
@classmethod@provide_session
[docs]defdelete_old_records(cls,task_id:str,dag_id:str,num_to_keep=conf.getint("core","max_num_rendered_ti_fields_per_task",fallback=0),session:Session=None,):""" Keep only Last X (num_to_keep) number of records for a task by deleting others. In the case of data for a mapped task either all of the rows or none of the rows will be deleted, so we don't end up with partial data for a set of mapped Task Instances left in the database. :param task_id: Task ID :param dag_id: Dag ID :param num_to_keep: Number of Records to keep :param session: SqlAlchemy Session """fromairflow.models.dagrunimportDagRunifnum_to_keep<=0:returntis_to_keep_query=(session.query(cls.dag_id,cls.task_id,cls.run_id).filter(cls.dag_id==dag_id,cls.task_id==task_id).join(cls.dag_run).distinct().order_by(DagRun.execution_date.desc()).limit(num_to_keep))ifsession.bind.dialect.namein["postgresql","sqlite"]:# Fetch Top X records given dag_id & task_id ordered by Execution Datesubq1=tis_to_keep_query.subquery()excluded=session.query(subq1.c.dag_id,subq1.c.task_id,subq1.c.run_id)session.query(cls).filter(cls.dag_id==dag_id,cls.task_id==task_id,tuple_(cls.dag_id,cls.task_id,cls.run_id).notin_(excluded),).delete(synchronize_session=False)elifsession.bind.dialect.namein["mysql"]:cls._remove_old_rendered_ti_fields_mysql(dag_id,session,task_id,tis_to_keep_query)else:# Fetch Top X records given dag_id & task_id ordered by Execution Datetis_to_keep=tis_to_keep_query.all()filter_tis=[not_(and_(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.run_id==ti.run_id,))fortiintis_to_keep]session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)session.flush()
@classmethod@retry_db_transactiondef_remove_old_rendered_ti_fields_mysql(cls,dag_id,session,task_id,tis_to_keep_query):# Fetch Top X records given dag_id & task_id ordered by Execution Datesubq1=tis_to_keep_query.subquery('subq1')# Second Subquery# Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525)# Limitation: This version of MySQL does not yet support# LIMIT & IN/ALL/ANY/SOME subquerysubq2=session.query(subq1.c.dag_id,subq1.c.task_id,subq1.c.run_id).subquery('subq2')# This query might deadlock occasionally and it should be retried if fails (see decorator)session.query(cls).filter(cls.dag_id==dag_id,cls.task_id==task_id,tuple_(cls.dag_id,cls.task_id,cls.run_id).notin_(subq2),).delete(synchronize_session=False)