Source code for airflow.contrib.hooks.gcp_pubsub_hook

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from uuid import uuid4

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook


[docs]def _format_subscription(project, subscription): return 'projects/{}/subscriptions/{}'.format(project, subscription)
[docs]def _format_topic(project, topic): return 'projects/{}/topics/{}'.format(project, topic)
[docs]class PubSubException(Exception): pass
[docs]class PubSubHook(GoogleCloudBaseHook): """Hook for accessing Google Pub/Sub. The GCP project against which actions are applied is determined by the project embedded in the Connection referenced by gcp_conn_id. """ def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None): super(PubSubHook, self).__init__(gcp_conn_id, delegate_to=delegate_to) self.num_retries = self._get_field('num_retries', 5)
[docs] def get_conn(self): """Returns a Pub/Sub service object. :rtype: googleapiclient.discovery.Resource """ http_authorized = self._authorize() return build( 'pubsub', 'v1', http=http_authorized, cache_discovery=False)
[docs] def publish(self, project, topic, messages): """Publishes messages to a Pub/Sub topic. :param project: the GCP project ID in which to publish :type project: str :param topic: the Pub/Sub topic to which to publish; do not include the ``projects/{project}/topics/`` prefix. :type topic: str :param messages: messages to publish; if the data field in a message is set, it should already be base64 encoded. :type messages: list of PubSub messages; see http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage """ body = {'messages': messages} full_topic = _format_topic(project, topic) request = self.get_conn().projects().topics().publish( topic=full_topic, body=body) try: request.execute(num_retries=self.num_retries) except HttpError as e: raise PubSubException( 'Error publishing to topic {}'.format(full_topic), e)
[docs] def create_topic(self, project, topic, fail_if_exists=False): """Creates a Pub/Sub topic, if it does not already exist. :param project: the GCP project ID in which to create the topic :type project: str :param topic: the Pub/Sub topic name to create; do not include the ``projects/{project}/topics/`` prefix. :type topic: str :param fail_if_exists: if set, raise an exception if the topic already exists :type fail_if_exists: bool """ service = self.get_conn() full_topic = _format_topic(project, topic) try: service.projects().topics().create( name=full_topic, body={}).execute(num_retries=self.num_retries) except HttpError as e: # Status code 409 indicates that the topic already exists. if str(e.resp['status']) == '409': message = 'Topic already exists: {}'.format(full_topic) self.log.warning(message) if fail_if_exists: raise PubSubException(message) else: raise PubSubException( 'Error creating topic {}'.format(full_topic), e)
[docs] def delete_topic(self, project, topic, fail_if_not_exists=False): """Deletes a Pub/Sub topic if it exists. :param project: the GCP project ID in which to delete the topic :type project: str :param topic: the Pub/Sub topic name to delete; do not include the ``projects/{project}/topics/`` prefix. :type topic: str :param fail_if_not_exists: if set, raise an exception if the topic does not exist :type fail_if_not_exists: bool """ service = self.get_conn() full_topic = _format_topic(project, topic) try: service.projects().topics().delete(topic=full_topic).execute(num_retries=self.num_retries) except HttpError as e: # Status code 409 indicates that the topic was not found if str(e.resp['status']) == '404': message = 'Topic does not exist: {}'.format(full_topic) self.log.warning(message) if fail_if_not_exists: raise PubSubException(message) else: raise PubSubException( 'Error deleting topic {}'.format(full_topic), e)
[docs] def create_subscription(self, topic_project, topic, subscription=None, subscription_project=None, ack_deadline_secs=10, fail_if_exists=False): """Creates a Pub/Sub subscription, if it does not already exist. :param topic_project: the GCP project ID of the topic that the subscription will be bound to. :type topic_project: str :param topic: the Pub/Sub topic name that the subscription will be bound to create; do not include the ``projects/{project}/subscriptions/`` prefix. :type topic: str :param subscription: the Pub/Sub subscription name. If empty, a random name will be generated using the uuid module :type subscription: str :param subscription_project: the GCP project ID where the subscription will be created. If unspecified, ``topic_project`` will be used. :type subscription_project: str :param ack_deadline_secs: Number of seconds that a subscriber has to acknowledge each message pulled from the subscription :type ack_deadline_secs: int :param fail_if_exists: if set, raise an exception if the topic already exists :type fail_if_exists: bool :return: subscription name which will be the system-generated value if the ``subscription`` parameter is not supplied :rtype: str """ service = self.get_conn() full_topic = _format_topic(topic_project, topic) if not subscription: subscription = 'sub-{}'.format(uuid4()) if not subscription_project: subscription_project = topic_project full_subscription = _format_subscription(subscription_project, subscription) body = { 'topic': full_topic, 'ackDeadlineSeconds': ack_deadline_secs } try: service.projects().subscriptions().create( name=full_subscription, body=body).execute(num_retries=self.num_retries) except HttpError as e: # Status code 409 indicates that the subscription already exists. if str(e.resp['status']) == '409': message = 'Subscription already exists: {}'.format( full_subscription) self.log.warning(message) if fail_if_exists: raise PubSubException(message) else: raise PubSubException( 'Error creating subscription {}'.format(full_subscription), e) return subscription
[docs] def delete_subscription(self, project, subscription, fail_if_not_exists=False): """Deletes a Pub/Sub subscription, if it exists. :param project: the GCP project ID where the subscription exists :type project: str :param subscription: the Pub/Sub subscription name to delete; do not include the ``projects/{project}/subscriptions/`` prefix. :type subscription: str :param fail_if_not_exists: if set, raise an exception if the topic does not exist :type fail_if_not_exists: bool """ service = self.get_conn() full_subscription = _format_subscription(project, subscription) try: service.projects().subscriptions().delete( subscription=full_subscription).execute(num_retries=self.num_retries) except HttpError as e: # Status code 404 indicates that the subscription was not found if str(e.resp['status']) == '404': message = 'Subscription does not exist: {}'.format( full_subscription) self.log.warning(message) if fail_if_not_exists: raise PubSubException(message) else: raise PubSubException( 'Error deleting subscription {}'.format(full_subscription), e)
[docs] def pull(self, project, subscription, max_messages, return_immediately=False): """Pulls up to ``max_messages`` messages from Pub/Sub subscription. :param project: the GCP project ID where the subscription exists :type project: str :param subscription: the Pub/Sub subscription name to pull from; do not include the 'projects/{project}/topics/' prefix. :type subscription: str :param max_messages: The maximum number of messages to return from the Pub/Sub API. :type max_messages: int :param return_immediately: If set, the Pub/Sub API will immediately return if no messages are available. Otherwise, the request will block for an undisclosed, but bounded period of time :type return_immediately: bool :return: A list of Pub/Sub ReceivedMessage objects each containing an ``ackId`` property and a ``message`` property, which includes the base64-encoded message content. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#ReceivedMessage """ service = self.get_conn() full_subscription = _format_subscription(project, subscription) body = { 'maxMessages': max_messages, 'returnImmediately': return_immediately } try: response = service.projects().subscriptions().pull( subscription=full_subscription, body=body).execute(num_retries=self.num_retries) return response.get('receivedMessages', []) except HttpError as e: raise PubSubException( 'Error pulling messages from subscription {}'.format( full_subscription), e)
[docs] def acknowledge(self, project, subscription, ack_ids): """Pulls up to ``max_messages`` messages from Pub/Sub subscription. :param project: the GCP project name or ID in which to create the topic :type project: str :param subscription: the Pub/Sub subscription name to delete; do not include the 'projects/{project}/topics/' prefix. :type subscription: str :param ack_ids: List of ReceivedMessage ackIds from a previous pull response :type ack_ids: list """ service = self.get_conn() full_subscription = _format_subscription(project, subscription) try: service.projects().subscriptions().acknowledge( subscription=full_subscription, body={'ackIds': ack_ids} ).execute(num_retries=self.num_retries) except HttpError as e: raise PubSubException( 'Error acknowledging {} messages pulled from subscription {}' .format(len(ack_ids), full_subscription), e)