Source code for airflow.providers.google.cloud.triggers.gcs

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import asyncio
from typing import Any, AsyncIterator

from aiohttp import ClientSession

from airflow.providers.google.cloud.hooks.gcs import GCSAsyncHook
from airflow.triggers.base import BaseTrigger, TriggerEvent


[docs]class GCSBlobTrigger(BaseTrigger): """ A trigger that fires and it finds the requested file or folder present in the given bucket. :param bucket: the bucket in the google cloud storage where the objects are residing. :param object_name: the file or folder present in the bucket :param google_cloud_conn_id: reference to the Google Connection :param poke_interval: polling period in seconds to check for file/folder """ def __init__( self, bucket: str, object_name: str, poke_interval: float, google_cloud_conn_id: str, hook_params: dict[str, Any], ): super().__init__() self.bucket = bucket self.object_name = object_name self.poke_interval = poke_interval self.google_cloud_conn_id: str = google_cloud_conn_id self.hook_params = hook_params
[docs] def serialize(self) -> tuple[str, dict[str, Any]]: """Serializes GCSBlobTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.gcs.GCSBlobTrigger", { "bucket": self.bucket, "object_name": self.object_name, "poke_interval": self.poke_interval, "google_cloud_conn_id": self.google_cloud_conn_id, "hook_params": self.hook_params,
}, )
[docs] async def run(self) -> AsyncIterator["TriggerEvent"]: """Simple loop until the relevant file/folder is found.""" try: hook = self._get_async_hook() while True: res = await self._object_exists( hook=hook, bucket_name=self.bucket, object_name=self.object_name ) if res == "success": yield TriggerEvent({"status": "success", "message": res}) await asyncio.sleep(self.poke_interval) except Exception as e: yield TriggerEvent({"status": "error", "message": str(e)}) return
def _get_async_hook(self) -> GCSAsyncHook: return GCSAsyncHook(gcp_conn_id=self.google_cloud_conn_id, **self.hook_params) async def _object_exists(self, hook: GCSAsyncHook, bucket_name: str, object_name: str) -> str: """ Checks for the existence of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the blob_name to check in the Google cloud storage bucket. """ async with ClientSession() as s: client = await hook.get_storage_client(s) bucket = client.get_bucket(bucket_name) object_response = await bucket.blob_exists(blob_name=object_name) if object_response: return "success" return "pending"

Was this entry helpful?