Redis Message Queue¶
Redis Queue Provider¶
Implemented by RedisPubSubMessageQueueProvider
The Redis Queue Provider is a BaseMessageQueueProvider
that uses
Redis as the underlying message queue system.
It allows you to send and receive messages using Redis channels in your Airflow workflows with MessageQueueTrigger
common message queue interface.
It uses
redis+pubsub
as scheme for identifying Redis queues.For parameter definitions take a look at
AwaitMessageTrigger
.from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger from airflow.sdk import Asset, AssetWatcher trigger = MessageQueueTrigger( scheme="redis+pubsub", # Additional Redis AwaitMessageTrigger parameters as needed channels=["my_channel"], redis_conn_id="redis_default", ) asset = Asset("redis_queue_asset", watchers=[AssetWatcher(name="redis_watcher", trigger=trigger)])
Redis Message Queue Trigger¶
Implemented by AwaitMessageTrigger
Inherited from MessageQueueTrigger
Wait for a message in a queue¶
Below is an example of how you can configure an Airflow Dag to be triggered by a message in Redis.
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher
# Define a trigger that listens to an external message queue (Redis in this case)
trigger = MessageQueueTrigger(scheme="redis+pubsub", channels=["test"])
# Define an asset that watches for messages on the queue
asset = Asset("redis_queue_asset_1", watchers=[AssetWatcher(name="redis_watcher_1", trigger=trigger)])
with DAG(dag_id="example_redis_watcher_1", schedule=[asset]) as dag:
EmptyOperator(task_id="task_1")
How it works¶
Redis Message Queue Trigger: The
AwaitMessageTrigger
listens for messages from Redis channel(s).
2. Asset and Watcher: The Asset
abstracts the external entity, the Redis queue in this example.
The AssetWatcher
associate a trigger with a name. This name helps you identify which trigger is associated to which
asset.
3. Event-Driven DAG: Instead of running on a fixed schedule, the DAG executes when the asset receives an update (e.g., a new message in the queue).
For how to use the trigger, refer to the documentation of the Messaging Trigger