Apache Kafka Sensors¶
AwaitMessageSensor¶
A sensor that defers until a specific message is published to a Kafka topic.
The sensor will create a consumer reading messages from a Kafka topic until a message fulfilling criteria defined in the
apply_function parameter is found. If the apply_function returns any data, a TriggerEvent is raised and the AwaitMessageSensor completes successfully.
For parameter definitions take a look at AwaitMessageSensor.
Using the sensor¶
tests/system/apache/kafka/example_dag_hello_kafka.py
t5 = AwaitMessageSensor(
    kafka_config_id="t5",
    task_id="awaiting_message",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.await_function",
    xcom_push_key="retrieved_message",
)
Reference¶
For further information, see the Apache Kafka Consumer documentation.
AwaitMessageTriggerFunctionSensor¶
Similar to the AwaitMessageSensor above, this sensor will defer until it consumes a message from a Kafka topic fulfilling the criteria
of its apply_function. Once a positive event is encountered, the AwaitMessageTriggerFunctionSensor will trigger a callable provided
to event_triggered_function. Afterwards the sensor will be deferred again, continuing to consume messages.
For parameter definitions take a look at AwaitMessageTriggerFunctionSensor.
Using the sensor¶
tests/system/apache/kafka/example_dag_event_listener.py
listen_for_message = AwaitMessageTriggerFunctionSensor(
    kafka_config_id="fizz_buzz_2",
    task_id="listen_for_message",
    topics=["fizz_buzz"],
    apply_function="example_dag_event_listener.await_function",
    event_triggered_function=wait_for_event,
)
Reference¶
For further information, see the Apache Kafka Consumer documentation.