Airflow Summit 2025 is coming October 07-09. Register now to secure your spot!

airflow.providers.apache.kafka.sensors.kafka

Attributes

VALID_COMMIT_CADENCE

Classes

AwaitMessageSensor

An Airflow sensor that defers until a specific message is published to Kafka.

AwaitMessageTriggerFunctionSensor

Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting.

Module Contents

airflow.providers.apache.kafka.sensors.kafka.VALID_COMMIT_CADENCE[source]
class airflow.providers.apache.kafka.sensors.kafka.AwaitMessageSensor(topics, apply_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, xcom_push_key=None, **kwargs)[source]

Bases: airflow.providers.apache.kafka.version_compat.BaseOperator

An Airflow sensor that defers until a specific message is published to Kafka.

The sensor creates a consumer that reads the Kafka log until it encounters a positive event.

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable and commit the message offset - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

Parameters:
  • kafka_config_id (str) – The connection object to use, defaults to “kafka_default”

  • topics (collections.abc.Sequence[str]) – Topics (or topic regex) to use for reading from

  • apply_function (str) – The function to apply to messages to determine if an event occurred. As a dot notation string.

  • apply_function_args (collections.abc.Sequence[Any] | None) – Arguments to be applied to the processing function, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – Key word arguments to be applied to the processing function, defaults to None

  • poll_timeout (float) – How long the kafka consumer should wait for a message to arrive from the kafka cluster,defaults to 1

  • poll_interval (float) – How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

  • xcom_push_key – the name of a key to push the returned message to, defaults to None

BLUE = '#ffefeb'[source]
ui_color = '#ffefeb'[source]
template_fields = ('topics', 'apply_function', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[source]
topics[source]
apply_function[source]
apply_function_args = None[source]
apply_function_kwargs = None[source]
kafka_config_id = 'kafka_default'[source]
poll_timeout = 1[source]
poll_interval = 5[source]
xcom_push_key = None[source]
execute(context)[source]

Derive when creating an operator.

The main method to execute the task. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]
class airflow.providers.apache.kafka.sensors.kafka.AwaitMessageTriggerFunctionSensor(topics, apply_function, event_triggered_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, **kwargs)[source]

Bases: airflow.providers.apache.kafka.version_compat.BaseOperator

Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting.

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable and commit the message offset - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

Parameters:
  • kafka_config_id (str) – The connection object to use, defaults to “kafka_default”

  • topics (collections.abc.Sequence[str]) – Topics (or topic regex) to use for reading from

  • apply_function (str) – The function to apply to messages to determine if an event occurred. As a dot notation string.

  • event_triggered_function (collections.abc.Callable) – The callable to trigger once the apply_function encounters a positive event.

  • apply_function_args (collections.abc.Sequence[Any] | None) – Arguments to be applied to the processing function, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – Key word arguments to be applied to the processing function, defaults to None

  • poll_timeout (float) – How long the kafka consumer should wait for a message to arrive from the kafka cluster, defaults to 1

  • poll_interval (float) – How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

BLUE = '#ffefeb'[source]
ui_color = '#ffefeb'[source]
template_fields = ('topics', 'apply_function', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[source]
topics[source]
apply_function[source]
apply_function_args = None[source]
apply_function_kwargs = None[source]
kafka_config_id = 'kafka_default'[source]
poll_timeout = 1[source]
poll_interval = 5[source]
event_triggered_function[source]
execute(context, event=None)[source]

Derive when creating an operator.

The main method to execute the task. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]

Was this entry helpful?