## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Save Rendered Template Fields"""importosfromtypingimportOptionalimportsqlalchemy_jsonfieldfromsqlalchemyimportColumn,String,and_,not_,tuple_fromsqlalchemy.ormimportSessionfromairflow.configurationimportconffromairflow.models.baseimportID_LEN,Basefromairflow.models.taskinstanceimportTaskInstancefromairflow.serialization.helpersimportserialize_template_fieldfromairflow.settingsimportjsonfromairflow.utils.sessionimportprovide_sessionfromairflow.utils.sqlalchemyimportUtcDateTime
[docs]defget_templated_fields(cls,ti:TaskInstance,session:Session=None)->Optional[dict]:""" Get templated field for a TaskInstance from the RenderedTaskInstanceFields table. :param ti: Task Instance :param session: SqlAlchemy Session :return: Rendered Templated TI field """result=(session.query(cls.rendered_fields).filter(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.execution_date==ti.execution_date).one_or_none())ifresult:rendered_fields=result.rendered_fieldsreturnrendered_fieldselse:returnNone
@classmethod@provide_session
[docs]defget_k8s_pod_yaml(cls,ti:TaskInstance,session:Session=None)->Optional[dict]:""" Get rendered Kubernetes Pod Yaml for a TaskInstance from the RenderedTaskInstanceFields table. :param ti: Task Instance :param session: SqlAlchemy Session :return: Kubernetes Pod Yaml """result=(session.query(cls.k8s_pod_yaml).filter(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.execution_date==ti.execution_date).one_or_none())returnresult.k8s_pod_yamlifresultelseNone
@provide_session
[docs]defwrite(self,session:Session=None):"""Write instance to database :param session: SqlAlchemy Session """session.merge(self)
@classmethod@provide_session
[docs]defdelete_old_records(cls,task_id:str,dag_id:str,num_to_keep=conf.getint("core","max_num_rendered_ti_fields_per_task",fallback=0),session:Session=None,):""" Keep only Last X (num_to_keep) number of records for a task by deleting others :param task_id: Task ID :param dag_id: Dag ID :param num_to_keep: Number of Records to keep :param session: SqlAlchemy Session """ifnum_to_keep<=0:returntis_to_keep_query=(session.query(cls.dag_id,cls.task_id,cls.execution_date).filter(cls.dag_id==dag_id,cls.task_id==task_id).order_by(cls.execution_date.desc()).limit(num_to_keep))ifsession.bind.dialect.namein["postgresql","sqlite"]:# Fetch Top X records given dag_id & task_id ordered by Execution Datesubq1=tis_to_keep_query.subquery('subq1')session.query(cls).filter(cls.dag_id==dag_id,cls.task_id==task_id,tuple_(cls.dag_id,cls.task_id,cls.execution_date).notin_(subq1),).delete(synchronize_session=False)elifsession.bind.dialect.namein["mysql"]:# Fetch Top X records given dag_id & task_id ordered by Execution Datesubq1=tis_to_keep_query.subquery('subq1')# Second Subquery# Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525)# Limitation: This version of MySQL does not yet support# LIMIT & IN/ALL/ANY/SOME subquerysubq2=session.query(subq1.c.dag_id,subq1.c.task_id,subq1.c.execution_date).subquery('subq2')session.query(cls).filter(cls.dag_id==dag_id,cls.task_id==task_id,tuple_(cls.dag_id,cls.task_id,cls.execution_date).notin_(subq2),).delete(synchronize_session=False)else:# Fetch Top X records given dag_id & task_id ordered by Execution Datetis_to_keep=tis_to_keep_query.all()filter_tis=[not_(and_(cls.dag_id==ti.dag_id,cls.task_id==ti.task_id,cls.execution_date==ti.execution_date,))fortiintis_to_keep]session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)