Apache Kafka Sensors

AwaitMessageSensor

A sensor that defers until a specific message is published to a Kafka topic. The sensor will create a consumer reading messages from a Kafka topic until a message fulfilling criteria defined in the apply_function parameter is found. If the apply_function returns any data, a TriggerEvent is raised and the AwaitMessageSensor completes successfully.

For parameter definitions take a look at AwaitMessageSensor.

Using the sensor

tests/system/providers/apache/kafka/example_dag_hello_kafka.py[source]

t5 = AwaitMessageSensor(
    kafka_config_id="t5",
    task_id="awaiting_message",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.await_function",
    xcom_push_key="retrieved_message",
)

Reference

For further information, see the Apache Kafka Consumer documentation.

AwaitMessageTriggerFunctionSensor

Similar to the AwaitMessageSensor above, this sensor will defer until it consumes a message from a Kafka topic fulfilling the criteria of its apply_function. Once a positive event is encountered, the AwaitMessageTriggerFunctionSensor will trigger a callable provided to event_triggered_function. Afterwards the sensor will be deferred again, continuing to consume messages.

For parameter definitions take a look at AwaitMessageTriggerFunctionSensor.

Using the sensor

tests/system/providers/apache/kafka/example_dag_event_listener.py[source]

listen_for_message = AwaitMessageTriggerFunctionSensor(
    kafka_config_id="fizz_buzz_2",
    task_id="listen_for_message",
    topics=["fizz_buzz"],
    apply_function="example_dag_event_listener.await_function",
    event_triggered_function=wait_for_event,
)

Reference

For further information, see the Apache Kafka Consumer documentation.

Was this entry helpful?