airflow.providers.google.cloud.operators.pubsub

This module contains Google PubSub operators.

Module Contents

Classes

PubSubCreateTopicOperator

Create a PubSub topic.

PubSubCreateSubscriptionOperator

Create a PubSub subscription.

PubSubDeleteTopicOperator

Delete a PubSub topic.

PubSubDeleteSubscriptionOperator

Delete a PubSub subscription.

PubSubPublishMessageOperator

Publish messages to a PubSub topic.

PubSubPullOperator

Pulls messages from a PubSub subscription and passes them through XCom.

class airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator(*, topic, project_id=None, fail_if_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, labels=None, message_storage_policy=None, kms_key_name=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Create a PubSub topic.

See also

For more information on how to use this operator, take a look at the guide: Creating a PubSub topic

By default, if the topic already exists, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    (
        PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
        >> PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
    )

The operator can be configured to fail if the topic already exists.

with DAG('failing DAG') as dag:
    (
        PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
        >> PubSubCreateTopicOperator(
            project_id='my-project',
            topic='my_new_topic',
            fail_if_exists=True,
        )
    )

Both project_id and topic are templated so you can use Jinja templating in their values.

Parameters
  • project_id (Optional[str]) -- Optional, the Google Cloud project ID where the topic will be created. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • topic (str) -- the topic to create. Do not include the full topic path. In other words, instead of projects/{project}/topics/{topic}, provide only {topic}. (templated)

  • gcp_conn_id (str) -- The connection ID to use connecting to Google Cloud.

  • delegate_to (Optional[str]) -- The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • labels (Optional[Dict[str, str]]) -- Client-assigned labels; see https://cloud.google.com/pubsub/docs/labels

  • message_storage_policy (Union[Dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]) -- Policy constraining the set of Google Cloud regions where messages published to the topic may be stored. If not present, then no constraints are in effect. Union[Dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]

  • kms_key_name (Optional[str]) -- The resource name of the Cloud KMS CryptoKey to be used to protect access to messages published on this topic. The expected format is projects/*/locations/*/keyRings/*/cryptoKeys/*.

  • retry (Union[google.api_core.retry.Retry, google.api_core.gapic_v1.method._MethodDefault]) -- (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.

  • timeout (Optional[float]) -- (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.

  • metadata (Sequence[Tuple[str, str]]) -- (Optional) Additional metadata that is provided to the method.

  • impersonation_chain (Optional[Union[str, Sequence[str]]]) -- Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).

template_fields :Sequence[str] = ['project_id', 'topic', 'impersonation_chain'][source]
ui_color = #0273d4[source]
execute(self, context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator(*, topic, project_id=None, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Create a PubSub subscription.

See also

For more information on how to use this operator, take a look at the guide: Creating a PubSub subscription

By default, the subscription will be created in project_id. If subscription_project_id is specified and the Google Cloud credentials allow, the Subscription can be created in a different project from its topic.

By default, if the subscription already exists, this operator will not cause the DAG to fail. However, the topic must exist in the project.

with DAG('successful DAG') as dag:
    (
        PubSubCreateSubscriptionOperator(
            project_id='my-project',
            topic='my-topic',
            subscription='my-subscription'
        )
        >> PubSubCreateSubscriptionOperator(
            project_id='my-project',
            topic='my-topic',
            subscription='my-subscription',
        )
    )

The operator can be configured to fail if the subscription already exists.

with DAG('failing DAG') as dag:
    (
        PubSubCreateSubscriptionOperator(
            project_id='my-project',
            topic='my-topic',
            subscription='my-subscription',
        )
        >> PubSubCreateSubscriptionOperator(
            project_id='my-project',
            topic='my-topic',
            subscription='my-subscription',
            fail_if_exists=True,
        )
    )

Finally, subscription is not required. If not passed, the operator will generated a universally unique identifier for the subscription's name.

with DAG('DAG') as dag:
    PubSubCreateSubscriptionOperator(project_id='my-project', topic='my-topic')

project_id, topic, subscription, subscription_project_id and impersonation_chain are templated so you can use Jinja templating in their values.

Parameters
  • project_id (Optional[str]) -- Optional, the Google Cloud project ID where the topic exists. If set to None or missing, the default project_id from the Google Cloud connection is used.

  • topic (str) -- the topic to create. Do not include the full topic path. In other words, instead of projects/{project}/topics/{topic}, provide only {topic}. (templated)

  • subscription (Optional[str]) -- the Pub/Sub subscription name. If empty, a random name will be generated using the uuid module

  • subscription_project_id (Optional[str]) -- the Google Cloud project ID where the subscription will be created. If empty, topic_project will be used.

  • ack_deadline_secs (int) -- Number of seconds that a subscriber has to acknowledge each message pulled from the subscription

  • gcp_conn_id (str) -- The connection ID to use connecting to Google Cloud.

  • delegate_to (Optional[str]) -- The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • push_config (Optional[Union[Dict, google.cloud.pubsub_v1.types.PushConfig]]) -- If push delivery is used with this subscription, this field is used to configure it. An empty pushConfig signifies that the subscriber will pull and ack messages using API methods.

  • retain_acked_messages (Optional[bool]) -- Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the message_retention_duration window. This must be true if you would like to Seek to a timestamp.

  • message_retention_duration (Optional[Union[Dict, google.cloud.pubsub_v1.types.Duration]]) -- How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If retain_acked_messages is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a Seek can be done. Defaults to 7 days. Cannot be more than 7 days or less than 10 minutes.

  • labels (Optional[Dict[str, str]]) -- Client-assigned labels; see https://cloud.google.com/pubsub/docs/labels

  • enable_message_ordering (bool) -- If true, messages published with the same ordering_key in PubsubMessage will be delivered to the subscribers in the order in which they are received by the Pub/Sub system. Otherwise, they may be delivered in any order.

  • expiration_policy (Optional[Union[Dict, google.cloud.pubsub_v1.types.ExpirationPolicy]]) -- A policy that specifies the conditions for this subscription’s expiration. A subscription is considered active as long as any connected subscriber is successfully consuming messages from the subscription or is issuing operations on the subscription. If expiration_policy is not set, a default policy with ttl of 31 days will be used. The minimum allowed value for expiration_policy.ttl is 1 day.

  • filter -- An expression written in the Cloud Pub/Sub filter language. If non-empty, then only PubsubMessages whose attributes field matches the filter are delivered on this subscription. If empty, then no messages are filtered out.

  • dead_letter_policy (Optional[Union[Dict, google.cloud.pubsub_v1.types.DeadLetterPolicy]]) -- A policy that specifies the conditions for dead lettering messages in this subscription. If dead_letter_policy is not set, dead lettering is disabled.

  • retry_policy (Optional[Union[Dict, google.cloud.pubsub_v1.types.RetryPolicy]]) -- A policy that specifies how Pub/Sub retries message delivery for this subscription. If not set, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.

  • retry (Union[google.api_core.retry.Retry, google.api_core.gapic_v1.method._MethodDefault]) -- (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.

  • timeout (Optional[float]) -- (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.

  • metadata (Sequence[Tuple[str, str]]) -- (Optional) Additional metadata that is provided to the method.

  • impersonation_chain (Optional[Union[str, Sequence[str]]]) -- Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).

template_fields :Sequence[str] = ['project_id', 'topic', 'subscription', 'subscription_project_id', 'impersonation_chain'][source]
ui_color = #0273d4[source]
execute(self, context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator(*, topic, project_id=None, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a PubSub topic.

See also

For more information on how to use this operator, take a look at the guide: Deleting a PubSub topic

By default, if the topic does not exist, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    PubSubDeleteTopicOperator(project_id='my-project', topic='non_existing_topic')

The operator can be configured to fail if the topic does not exist.

with DAG('failing DAG') as dag:
    PubSubDeleteTopicOperator(
        project_id='my-project', topic='non_existing_topic', fail_if_not_exists=True,
    )

Both project_id and topic are templated so you can use Jinja templating in their values.

Parameters
  • project_id (Optional[str]) -- Optional, the Google Cloud project ID in which to work (templated). If set to None or missing, the default project_id from the Google Cloud connection is used.

  • topic (str) -- the topic to delete. Do not include the full topic path. In other words, instead of projects/{project}/topics/{topic}, provide only {topic}. (templated)

  • fail_if_not_exists (bool) -- If True and the topic does not exist, fail the task

  • gcp_conn_id (str) -- The connection ID to use connecting to Google Cloud.

  • delegate_to (Optional[str]) -- The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • retry (Union[google.api_core.retry.Retry, google.api_core.gapic_v1.method._MethodDefault]) -- (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.

  • timeout (Optional[float]) -- (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.

  • metadata (Sequence[Tuple[str, str]]) -- (Optional) Additional metadata that is provided to the method.

  • impersonation_chain (Optional[Union[str, Sequence[str]]]) -- Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).

template_fields :Sequence[str] = ['project_id', 'topic', 'impersonation_chain'][source]
ui_color = #cb4335[source]
execute(self, context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator(*, subscription, project_id=None, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', delegate_to=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a PubSub subscription.

See also

For more information on how to use this operator, take a look at the guide: Deleting a PubSub subscription

By default, if the subscription does not exist, this operator will not cause the DAG to fail.

with DAG('successful DAG') as dag:
    PubSubDeleteSubscriptionOperator(project_id='my-project', subscription='non-existing')

The operator can be configured to fail if the subscription already exists.

with DAG('failing DAG') as dag:
    PubSubDeleteSubscriptionOperator(
        project_id='my-project', subscription='non-existing', fail_if_not_exists=True,
    )

project_id, and subscription are templated so you can use Jinja templating in their values.

Parameters
  • project_id (Optional[str]) -- Optional, the Google Cloud project ID in which to work (templated). If set to None or missing, the default project_id from the Google Cloud connection is used.

  • subscription (str) -- the subscription to delete. Do not include the full subscription path. In other words, instead of projects/{project}/subscription/{subscription}, provide only {subscription}. (templated)

  • fail_if_not_exists (bool) -- If True and the subscription does not exist, fail the task

  • gcp_conn_id (str) -- The connection ID to use connecting to Google Cloud.

  • delegate_to (Optional[str]) -- The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • retry (Union[google.api_core.retry.Retry, google.api_core.gapic_v1.method._MethodDefault]) -- (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.

  • timeout (Optional[float]) -- (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.

  • metadata (Sequence[Tuple[str, str]]) -- (Optional) Additional metadata that is provided to the method.

  • impersonation_chain (Optional[Union[str, Sequence[str]]]) -- Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).

template_fields :Sequence[str] = ['project_id', 'subscription', 'impersonation_chain'][source]
ui_color = #cb4335[source]
execute(self, context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator(*, topic, messages, project_id=None, gcp_conn_id='google_cloud_default', delegate_to=None, impersonation_chain=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Publish messages to a PubSub topic.

See also

For more information on how to use this operator, take a look at the guide: Publishing PubSub messages

Each Task publishes all provided messages to the same topic in a single Google Cloud project. If the topic does not exist, this task will fail.

m1 = {'data': b'Hello, World!',
      'attributes': {'type': 'greeting'}
     }
m2 = {'data': b'Knock, knock'}
m3 = {'attributes': {'foo': ''}}

t1 = PubSubPublishMessageOperator(
    project_id='my-project',
    topic='my_topic',
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag,
)

project_id, topic, and messages are templated so you can use Jinja templating in their values.

Parameters
  • project_id (Optional[str]) -- Optional, the Google Cloud project ID in which to work (templated). If set to None or missing, the default project_id from the Google Cloud connection is used.

  • topic (str) -- the topic to which to publish. Do not include the full topic path. In other words, instead of projects/{project}/topics/{topic}, provide only {topic}. (templated)

  • messages (List) -- a list of messages to be published to the topic. Each message is a dict with one or more of the following keys-value mappings: * 'data': a bytestring (utf-8 encoded) * 'attributes': {'key1': 'value1', ...} Each message must contain at least a non-empty 'data' value or an attribute dict with at least one key (templated). See https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage

  • gcp_conn_id (str) -- The connection ID to use connecting to Google Cloud.

  • delegate_to (Optional[str]) -- The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • impersonation_chain (Optional[Union[str, Sequence[str]]]) -- Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).

template_fields :Sequence[str] = ['project_id', 'topic', 'messages', 'impersonation_chain'][source]
ui_color = #0273d4[source]
execute(self, context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

class airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator(*, project_id, subscription, max_messages=5, ack_messages=False, messages_callback=None, gcp_conn_id='google_cloud_default', delegate_to=None, impersonation_chain=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Pulls messages from a PubSub subscription and passes them through XCom. If the queue is empty, returns empty list - never waits for messages. If you do need to wait, please use airflow.providers.google.cloud.sensors.PubSubPullSensor instead.

See also

For more information on how to use this operator and the PubSubPullSensor, take a look at the guide: Pulling messages from a PubSub subscription

This operator will pull up to max_messages messages from the specified PubSub subscription. When the subscription returns messages, the messages will be returned immediately from the operator and passed through XCom for downstream tasks.

If ack_messages is set to True, messages will be immediately acknowledged before being returned, otherwise, downstream tasks will be responsible for acknowledging them.

project_id `` and ``subscription are templated so you can use Jinja templating in their values.

Parameters
  • project_id (str) -- the Google Cloud project ID for the subscription (templated)

  • subscription (str) -- the Pub/Sub subscription name. Do not include the full subscription path.

  • max_messages (int) -- The maximum number of messages to retrieve per PubSub pull request

  • ack_messages (bool) -- If True, each message will be acknowledged immediately rather than by any downstream tasks

  • gcp_conn_id (str) -- The connection ID to use connecting to Google Cloud.

  • delegate_to (Optional[str]) -- The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • messages_callback (Optional[Callable[[List[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any]]) -- (Optional) Callback to process received messages. It's return value will be saved to XCom. If you are pulling large messages, you probably want to provide a custom callback. If not provided, the default implementation will convert ReceivedMessage objects into JSON-serializable dicts using google.protobuf.json_format.MessageToDict function.

  • impersonation_chain (Optional[Union[str, Sequence[str]]]) -- Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).

template_fields :Sequence[str] = ['project_id', 'subscription', 'impersonation_chain'][source]
execute(self, context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?