Source code for airflow.providers.apache.hive.sensors.named_hive_partition
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromtypingimportTYPE_CHECKING,Any,List,Sequence,Tuplefromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classNamedHivePartitionSensor(BaseSensorOperator):""" Waits for a set of partitions to show up in Hive. :param partition_names: List of fully qualified names of the partitions to wait for. A fully qualified name is of the form ``schema.table/pk1=pv1/pk2=pv2``, for example, default.users/ds=2016-01-01. This is passed as is to the metastore Thrift client ``get_partitions_by_name`` method. Note that you cannot use logical or comparison operators as in HivePartitionSensor. :param metastore_conn_id: Reference to the :ref:`metastore thrift service connection id <howto/connection:hive_metastore>`. """
def__init__(self,*,partition_names:List[str],metastore_conn_id:str='metastore_default',poke_interval:int=60*3,hook:Any=None,**kwargs:Any,):super().__init__(poke_interval=poke_interval,**kwargs)self.next_index_to_poke=0ifisinstance(partition_names,str):raiseTypeError('partition_names must be an array of strings')self.metastore_conn_id=metastore_conn_idself.partition_names=partition_namesself.hook=hookifself.hookandmetastore_conn_id!='metastore_default':self.log.warning('A hook was passed but a non default metastore_conn_id=%s was used',metastore_conn_id)@staticmethod
[docs]defparse_partition_name(partition:str)->Tuple[Any,...]:"""Get schema, table, and partition info."""first_split=partition.split('.',1)iflen(first_split)==1:schema='default'table_partition=max(first_split)# poor man firstelse:schema,table_partition=first_splitsecond_split=table_partition.split('/',1)iflen(second_split)==1:raiseValueError(f'Could not parse {partition}into table, partition')else:table,partition=second_splitreturnschema,table,partition
[docs]defpoke_partition(self,partition:str)->Any:"""Check for a named partition."""ifnotself.hook:fromairflow.providers.apache.hive.hooks.hiveimportHiveMetastoreHookself.hook=HiveMetastoreHook(metastore_conn_id=self.metastore_conn_id)schema,table,partition=self.parse_partition_name(partition)self.log.info('Poking for %s.%s/%s',schema,table,partition)returnself.hook.check_for_named_partition(schema,table,partition)