Source code for airflow.providers.google.cloud.triggers.gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromtypingimportAny,AsyncIteratorfromaiohttpimportClientSessionfromairflow.providers.google.cloud.hooks.gcsimportGCSAsyncHookfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classGCSBlobTrigger(BaseTrigger):""" A trigger that fires and it finds the requested file or folder present in the given bucket. :param bucket: the bucket in the google cloud storage where the objects are residing. :param object_name: the file or folder present in the bucket :param google_cloud_conn_id: reference to the Google Connection :param poke_interval: polling period in seconds to check for file/folder """def__init__(self,bucket:str,object_name:str,poke_interval:float,google_cloud_conn_id:str,hook_params:dict[str,Any],):super().__init__()self.bucket=bucketself.object_name=object_nameself.poke_interval=poke_intervalself.google_cloud_conn_id:str=google_cloud_conn_idself.hook_params=hook_params
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes GCSBlobTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.gcs.GCSBlobTrigger",{"bucket":self.bucket,"object_name":self.object_name,"poke_interval":self.poke_interval,"google_cloud_conn_id":self.google_cloud_conn_id,"hook_params":self.hook_params,
},)
[docs]asyncdefrun(self)->AsyncIterator["TriggerEvent"]:"""Simple loop until the relevant file/folder is found."""try:hook=self._get_async_hook()whileTrue:res=awaitself._object_exists(hook=hook,bucket_name=self.bucket,object_name=self.object_name)ifres=="success":yieldTriggerEvent({"status":"success","message":res})awaitasyncio.sleep(self.poke_interval)exceptExceptionase:yieldTriggerEvent({"status":"error","message":str(e)})return
def_get_async_hook(self)->GCSAsyncHook:returnGCSAsyncHook(gcp_conn_id=self.google_cloud_conn_id,**self.hook_params)asyncdef_object_exists(self,hook:GCSAsyncHook,bucket_name:str,object_name:str)->str:""" Checks for the existence of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the blob_name to check in the Google cloud storage bucket. """asyncwithClientSession()ass:client=awaithook.get_storage_client(s)bucket=client.get_bucket(bucket_name)object_response=awaitbucket.blob_exists(blob_name=object_name)ifobject_response:return"success"return"pending"