Source code for airflow.providers.google.cloud.triggers.gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromdatetimeimportdatetimefromtypingimportAny,AsyncIteratorfromaiohttpimportClientSessionfromairflow.providers.google.cloud.hooks.gcsimportGCSAsyncHookfromairflow.triggers.baseimportBaseTrigger,TriggerEventfromairflow.utilsimporttimezone
[docs]classGCSBlobTrigger(BaseTrigger):""" A trigger that fires and it finds the requested file or folder present in the given bucket. :param bucket: the bucket in the google cloud storage where the objects are residing. :param object_name: the file or folder present in the bucket :param google_cloud_conn_id: reference to the Google Connection :param poke_interval: polling period in seconds to check for file/folder :param hook_params: Extra config params to be passed to the underlying hook. Should match the desired hook constructor params. """def__init__(self,bucket:str,object_name:str,poke_interval:float,google_cloud_conn_id:str,hook_params:dict[str,Any],):super().__init__()self.bucket=bucketself.object_name=object_nameself.poke_interval=poke_intervalself.google_cloud_conn_id:str=google_cloud_conn_idself.hook_params=hook_params
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes GCSBlobTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.gcs.GCSBlobTrigger",{"bucket":self.bucket,"object_name":self.object_name,"poke_interval":self.poke_interval,"google_cloud_conn_id":self.google_cloud_conn_id,"hook_params":self.hook_params,
},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:"""Loop until the relevant file/folder is found."""try:hook=self._get_async_hook()whileTrue:res=awaitself._object_exists(hook=hook,bucket_name=self.bucket,object_name=self.object_name)ifres=="success":yieldTriggerEvent({"status":"success","message":res})awaitasyncio.sleep(self.poke_interval)exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e)})return
def_get_async_hook(self)->GCSAsyncHook:returnGCSAsyncHook(gcp_conn_id=self.google_cloud_conn_id,**self.hook_params)asyncdef_object_exists(self,hook:GCSAsyncHook,bucket_name:str,object_name:str)->str:""" Checks for the existence of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the blob_name to check in the Google cloud storage bucket. """asyncwithClientSession()ass:client=awaithook.get_storage_client(s)bucket=client.get_bucket(bucket_name)object_response=awaitbucket.blob_exists(blob_name=object_name)ifobject_response:return"success"return"pending"
[docs]classGCSCheckBlobUpdateTimeTrigger(BaseTrigger):""" A trigger that makes an async call to GCS to check whether the object is updated in a bucket. :param bucket: google cloud storage bucket name cloud storage where the objects are residing. :param object_name: the file or folder present in the bucket :param target_date: context datetime to compare with blob object updated time :param poke_interval: polling period in seconds to check for file/folder :param google_cloud_conn_id: reference to the Google Connection :param hook_params: dict object that has delegate_to and impersonation_chain """def__init__(self,bucket:str,object_name:str,target_date:datetime,poke_interval:float,google_cloud_conn_id:str,hook_params:dict[str,Any],):super().__init__()self.bucket=bucketself.object_name=object_nameself.target_date=target_dateself.poke_interval=poke_intervalself.google_cloud_conn_id:str=google_cloud_conn_idself.hook_params=hook_params
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes GCSCheckBlobUpdateTimeTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.gcs.GCSCheckBlobUpdateTimeTrigger",{"bucket":self.bucket,"object_name":self.object_name,"target_date":self.target_date,"poke_interval":self.poke_interval,"google_cloud_conn_id":self.google_cloud_conn_id,"hook_params":self.hook_params,
},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:"""Loop until the object updated time is greater than target datetime"""try:hook=self._get_async_hook()whileTrue:status,res=awaitself._is_blob_updated_after(hook=hook,bucket_name=self.bucket,object_name=self.object_name,target_date=self.target_date,)ifstatus:yieldTriggerEvent(res)awaitasyncio.sleep(self.poke_interval)exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e)})
def_get_async_hook(self)->GCSAsyncHook:returnGCSAsyncHook(gcp_conn_id=self.google_cloud_conn_id,**self.hook_params)asyncdef_is_blob_updated_after(self,hook:GCSAsyncHook,bucket_name:str,object_name:str,target_date:datetime)->tuple[bool,dict[str,Any]]:""" Checks if the object in the bucket is updated. :param hook: GCSAsyncHook Hook class :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the blob_name to check in the Google cloud storage bucket. :param target_date: context datetime to compare with blob object updated time """asyncwithClientSession()assession:client=awaithook.get_storage_client(session)bucket=client.get_bucket(bucket_name)blob=awaitbucket.get_blob(blob_name=object_name)ifblobisNone:res={"message":f"Object ({object_name}) not found in Bucket ({bucket_name})","status":"error",}returnTrue,resblob_updated_date=blob.updated# type: ignore[attr-defined]blob_updated_time=datetime.strptime(blob_updated_date,"%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)# Blob updated time is in string format so converting the string format# to datetime object to compare the last updated timeifblob_updated_timeisnotNone:ifnottarget_date.tzinfo:target_date=target_date.replace(tzinfo=timezone.utc)self.log.info("Verify object date: %s > %s",blob_updated_time,target_date)ifblob_updated_time>target_date:returnTrue,{"status":"success","message":"success"}returnFalse,{"status":"pending","message":"pending"}
[docs]classGCSPrefixBlobTrigger(GCSBlobTrigger):""" Looks for objects in bucket matching a prefix. If none found, sleep for interval and check again. Otherwise, return matches. :param bucket: the bucket in the google cloud storage where the objects are residing. :param prefix: The prefix of the blob_names to match in the Google cloud storage bucket :param google_cloud_conn_id: reference to the Google Connection :param poke_interval: polling period in seconds to check :param hook_params: Extra config params to be passed to the underlying hook. Should match the desired hook constructor params. """def__init__(self,bucket:str,prefix:str,poke_interval:float,google_cloud_conn_id:str,hook_params:dict[str,Any],):super().__init__(bucket=bucket,object_name=prefix,poke_interval=poke_interval,google_cloud_conn_id=google_cloud_conn_id,hook_params=hook_params,)self.prefix=prefix
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes GCSPrefixBlobTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.gcs.GCSPrefixBlobTrigger",{"bucket":self.bucket,"prefix":self.prefix,"poke_interval":self.poke_interval,"google_cloud_conn_id":self.google_cloud_conn_id,"hook_params":self.hook_params,
},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:"""Loop until the matches are found for the given prefix on the bucket."""try:hook=self._get_async_hook()whileTrue:self.log.info("Checking for existence of blobs with prefix %s in bucket %s",self.prefix,self.bucket)res=awaitself._list_blobs_with_prefix(hook=hook,bucket_name=self.bucket,prefix=self.prefix)iflen(res)>0:yieldTriggerEvent({"status":"success","message":"Successfully completed","matches":res})awaitasyncio.sleep(self.poke_interval)exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e)})return
asyncdef_list_blobs_with_prefix(self,hook:GCSAsyncHook,bucket_name:str,prefix:str)->list[str]:""" Returns names of blobs which match the given prefix for a given bucket. :param hook: The async hook to use for listing the blobs :param bucket_name: The Google Cloud Storage bucket where the object is. :param prefix: The prefix of the blob_names to match in the Google cloud storage bucket. """asyncwithClientSession()assession:client=awaithook.get_storage_client(session)bucket=client.get_bucket(bucket_name)object_response=awaitbucket.list_blobs(prefix=prefix)returnobject_response