airflow.providers.google.cloud.operators.pubsub
¶
This module contains Google PubSub operators.
Module Contents¶
Classes¶
Create a PubSub topic. |
|
Create a PubSub subscription. |
|
Delete a PubSub topic. |
|
Delete a PubSub subscription. |
|
Publish messages to a PubSub topic. |
|
Pulls messages from a PubSub subscription and passes them through XCom. |
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, gcp_conn_id='google_cloud_default', labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
Create a PubSub topic.
See also
For more information on how to use this operator, take a look at the guide: Creating a PubSub topic
By default, if the topic already exists, this operator will not cause the DAG to fail.
with DAG("successful DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic >> create_topic_again
The operator can be configured to fail if the topic already exists.
with DAG("failing DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator( project_id="my-project", topic="my_new_topic", fail_if_exists=True ) create_topic >> create_topic_again
Both
project_id
andtopic
are templated so you can use Jinja templating in their values.- Parameters
project_id (str) – Optional, the Google Cloud project ID where the topic will be created. If set to None or missing, the default project_id from the Google Cloud connection is used.
topic (str) – the topic to create. Do not include the full topic path. In other words, instead of
projects/{project}/topics/{topic}
, provide only{topic}
. (templated)gcp_conn_id (str) – The connection ID to use connecting to Google Cloud.
labels (dict[str, str] | None) – Client-assigned labels; see https://cloud.google.com/pubsub/docs/labels
message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – Policy constraining the set of Google Cloud regions where messages published to the topic may be stored. If not present, then no constraints are in effect. Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]
kms_key_name (str | None) – The resource name of the Cloud KMS CryptoKey to be used to protect access to messages published on this topic. The expected format is
projects/*/locations/*/keyRings/*/cryptoKeys/*
.retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.
timeout (float | None) – (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
metadata (collections.abc.Sequence[tuple[str, str]]) – (Optional) Additional metadata that is provided to the method.
impersonation_chain (str | collections.abc.Sequence[str] | None) – Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator(*, topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, gcp_conn_id='google_cloud_default', push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
Create a PubSub subscription.
See also
For more information on how to use this operator, take a look at the guide: Creating a PubSub subscription
By default, the subscription will be created in
project_id
. Ifsubscription_project_id
is specified and the Google Cloud credentials allow, the Subscription can be created in a different project from its topic.By default, if the subscription already exists, this operator will not cause the DAG to fail. However, the topic must exist in the project.
with DAG("successful DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription >> create_subscription_again
The operator can be configured to fail if the subscription already exists.
with DAG("failing DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription", fail_if_exists=True ) create_subscription >> create_subscription_again
Finally, subscription is not required. If not passed, the operator will generated a universally unique identifier for the subscription’s name.
with DAG("DAG") as dag: PubSubCreateSubscriptionOperator(project_id="my-project", topic="my-topic")
project_id
,topic
,subscription
,subscription_project_id
andimpersonation_chain
are templated so you can use Jinja templating in their values.- Parameters
project_id (str) – Optional, the Google Cloud project ID where the topic exists. If set to None or missing, the default project_id from the Google Cloud connection is used.
topic (str) – the topic to create. Do not include the full topic path. In other words, instead of
projects/{project}/topics/{topic}
, provide only{topic}
. (templated)subscription (str | None) – the Pub/Sub subscription name. If empty, a random name will be generated using the uuid module
subscription_project_id (str | None) – the Google Cloud project ID where the subscription will be created. If empty,
topic_project
will be used.ack_deadline_secs (int) – Number of seconds that a subscriber has to acknowledge each message pulled from the subscription
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud.
push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – If push delivery is used with this subscription, this field is used to configure it. An empty
pushConfig
signifies that the subscriber will pull and ack messages using API methods.retain_acked_messages (bool | None) – Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription’s backlog, even if they are acknowledged, until they fall out of the
message_retention_duration
window. This must be true if you would like to Seek to a timestamp.message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – How long to retain unacknowledged messages in the subscription’s backlog, from the moment a message is published. If
retain_acked_messages
is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time aSeek
can be done. Defaults to 7 days. Cannot be more than 7 days or less than 10 minutes.labels (dict[str, str] | None) – Client-assigned labels; see https://cloud.google.com/pubsub/docs/labels
enable_message_ordering (bool) – If true, messages published with the same ordering_key in PubsubMessage will be delivered to the subscribers in the order in which they are received by the Pub/Sub system. Otherwise, they may be delivered in any order.
expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – A policy that specifies the conditions for this subscription’s expiration. A subscription is considered active as long as any connected subscriber is successfully consuming messages from the subscription or is issuing operations on the subscription. If expiration_policy is not set, a default policy with ttl of 31 days will be used. The minimum allowed value for expiration_policy.ttl is 1 day.
filter – An expression written in the Cloud Pub/Sub filter language. If non-empty, then only PubsubMessages whose attributes field matches the filter are delivered on this subscription. If empty, then no messages are filtered out.
dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – A policy that specifies the conditions for dead lettering messages in this subscription. If dead_letter_policy is not set, dead lettering is disabled.
retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – A policy that specifies how Pub/Sub retries message delivery for this subscription. If not set, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.
timeout (float | None) – (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
metadata (collections.abc.Sequence[tuple[str, str]]) – (Optional) Additional metadata that is provided to the method.
impersonation_chain (str | collections.abc.Sequence[str] | None) – Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'subscription', 'subscription_project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
Delete a PubSub topic.
See also
For more information on how to use this operator, take a look at the guide: Deleting a PubSub topic
By default, if the topic does not exist, this operator will not cause the DAG to fail.
with DAG("successful DAG") as dag: PubSubDeleteTopicOperator(project_id="my-project", topic="non_existing_topic")
The operator can be configured to fail if the topic does not exist.
with DAG("failing DAG") as dag: PubSubDeleteTopicOperator( project_id="my-project", topic="non_existing_topic", fail_if_not_exists=True, )
Both
project_id
andtopic
are templated so you can use Jinja templating in their values.- Parameters
project_id (str) – Optional, the Google Cloud project ID in which to work (templated). If set to None or missing, the default project_id from the Google Cloud connection is used.
topic (str) – the topic to delete. Do not include the full topic path. In other words, instead of
projects/{project}/topics/{topic}
, provide only{topic}
. (templated)fail_if_not_exists (bool) – If True and the topic does not exist, fail the task
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud.
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.
timeout (float | None) – (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
metadata (collections.abc.Sequence[tuple[str, str]]) – (Optional) Additional metadata that is provided to the method.
impersonation_chain (str | collections.abc.Sequence[str] | None) – Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator(*, subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
Delete a PubSub subscription.
See also
For more information on how to use this operator, take a look at the guide: Deleting a PubSub subscription
By default, if the subscription does not exist, this operator will not cause the DAG to fail.
with DAG("successful DAG") as dag: PubSubDeleteSubscriptionOperator(project_id="my-project", subscription="non-existing")
The operator can be configured to fail if the subscription already exists.
with DAG("failing DAG") as dag: PubSubDeleteSubscriptionOperator( project_id="my-project", subscription="non-existing", fail_if_not_exists=True, )
project_id
, andsubscription
are templated so you can use Jinja templating in their values.- Parameters
project_id (str) – Optional, the Google Cloud project ID in which to work (templated). If set to None or missing, the default project_id from the Google Cloud connection is used.
subscription (str) – the subscription to delete. Do not include the full subscription path. In other words, instead of
projects/{project}/subscription/{subscription}
, provide only{subscription}
. (templated)fail_if_not_exists (bool) – If True and the subscription does not exist, fail the task
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud.
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (Optional) A retry object used to retry requests. If None is specified, requests will not be retried.
timeout (float | None) – (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
metadata (collections.abc.Sequence[tuple[str, str]]) – (Optional) Additional metadata that is provided to the method.
impersonation_chain (str | collections.abc.Sequence[str] | None) – Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator(*, topic, messages, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', enable_message_ordering=False, impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
Publish messages to a PubSub topic.
See also
For more information on how to use this operator, take a look at the guide: Publishing PubSub messages
Each Task publishes all provided messages to the same topic in a single Google Cloud project. If the topic does not exist, this task will fail.
m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}} m2 = {"data": b"Knock, knock"} m3 = {"attributes": {"foo": ""}} m4 = {"data": b"Who's there?", "attributes": {"ordering_key": "knock_knock"}} t1 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m1, m2, m3], create_topic=True, dag=dag, ) t2 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m4], create_topic=True, enable_message_ordering=True, dag=dag, )
project_id
,topic
, andmessages
are templated so you can use Jinja templating in their values.- Parameters
project_id (str) – Optional, the Google Cloud project ID in which to work (templated). If set to None or missing, the default project_id from the Google Cloud connection is used.
topic (str) – the topic to which to publish. Do not include the full topic path. In other words, instead of
projects/{project}/topics/{topic}
, provide only{topic}
. (templated)messages (list) – a list of messages to be published to the topic. Each message is a dict with one or more of the following keys-value mappings: * ‘data’: a bytestring (utf-8 encoded) * ‘attributes’: {‘key1’: ‘value1’, …} Each message must contain at least a non-empty ‘data’ value or an attribute dict with at least one key (templated). See https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud.
enable_message_ordering (bool) – If true, messages published with the same ordering_key in PubsubMessage will be delivered to the subscribers in the order in which they are received by the Pub/Sub system. Otherwise, they may be delivered in any order. Default is False.
impersonation_chain (str | collections.abc.Sequence[str] | None) – Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'messages', 'enable_message_ordering', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator(*, project_id, subscription, max_messages=5, ack_messages=False, messages_callback=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
Pulls messages from a PubSub subscription and passes them through XCom.
If the queue is empty, returns empty list - never waits for messages. If you do need to wait, please use
airflow.providers.google.cloud.sensors.PubSubPullSensor
instead.See also
For more information on how to use this operator and the PubSubPullSensor, take a look at the guide: Pulling messages from a PubSub subscription
This operator will pull up to
max_messages
messages from the specified PubSub subscription. When the subscription returns messages, the messages will be returned immediately from the operator and passed through XCom for downstream tasks.If
ack_messages
is set to True, messages will be immediately acknowledged before being returned, otherwise, downstream tasks will be responsible for acknowledging them.project_id `` and ``subscription
are templated so you can use Jinja templating in their values.- Parameters
project_id (str) – the Google Cloud project ID for the subscription (templated)
subscription (str) – the Pub/Sub subscription name. Do not include the full subscription path.
max_messages (int) – The maximum number of messages to retrieve per PubSub pull request
ack_messages (bool) – If True, each message will be acknowledged immediately rather than by any downstream tasks
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud.
messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (Optional) Callback to process received messages. Its return value will be saved to XCom. If you are pulling large messages, you probably want to provide a custom callback. If not provided, the default implementation will convert ReceivedMessage objects into JSON-serializable dicts using google.protobuf.json_format.MessageToDict function.
impersonation_chain (str | collections.abc.Sequence[str] | None) – Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated).
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]¶