Source code for airflow.providers.apache.hive.sensors.named_hive_partition
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportTYPE_CHECKING,Any,Sequencefromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classNamedHivePartitionSensor(BaseSensorOperator):""" Waits for a set of partitions to show up in Hive. :param partition_names: List of fully qualified names of the partitions to wait for. A fully qualified name is of the form ``schema.table/pk1=pv1/pk2=pv2``, for example, default.users/ds=2016-01-01. This is passed as is to the metastore Thrift client ``get_partitions_by_name`` method. Note that you cannot use logical or comparison operators as in HivePartitionSensor. :param metastore_conn_id: Reference to the :ref:`metastore thrift service connection id <howto/connection:hive_metastore>`. """
def__init__(self,*,partition_names:list[str],metastore_conn_id:str="metastore_default",poke_interval:int=60*3,hook:Any=None,**kwargs:Any,):super().__init__(poke_interval=poke_interval,**kwargs)self.next_index_to_poke=0ifisinstance(partition_names,str):raiseTypeError("partition_names must be an array of strings")self.metastore_conn_id=metastore_conn_idself.partition_names=partition_namesself.hook=hookifself.hookandmetastore_conn_id!="metastore_default":self.log.warning("A hook was passed but a non default metastore_conn_id=%s was used",metastore_conn_id)@staticmethod
[docs]defparse_partition_name(partition:str)->tuple[Any,...]:"""Get schema, table, and partition info."""first_split=partition.split(".",1)iflen(first_split)==1:schema="default"table_partition=max(first_split)# poor man firstelse:schema,table_partition=first_splitsecond_split=table_partition.split("/",1)iflen(second_split)==1:raiseValueError(f"Could not parse {partition}into table, partition")else:table,partition=second_splitreturnschema,table,partition
[docs]defpoke_partition(self,partition:str)->Any:"""Check for a named partition."""ifnotself.hook:fromairflow.providers.apache.hive.hooks.hiveimportHiveMetastoreHookself.hook=HiveMetastoreHook(metastore_conn_id=self.metastore_conn_id)schema,table,partition=self.parse_partition_name(partition)self.log.info("Poking for %s.%s/%s",schema,table,partition)returnself.hook.check_for_named_partition(schema,table,partition)