airflow.providers.apache.kafka.sensors.kafka

Attributes

VALID_COMMIT_CADENCE

Classes

AwaitMessageSensor

An Airflow sensor that defers until a specific message is published to Kafka.

AwaitMessageTriggerFunctionSensor

Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting.

Module Contents

airflow.providers.apache.kafka.sensors.kafka.VALID_COMMIT_CADENCE[source]
class airflow.providers.apache.kafka.sensors.kafka.AwaitMessageSensor(topics, apply_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, xcom_push_key=None, commit_offset=True, **kwargs)[source]

Bases: airflow.providers.common.compat.sdk.BaseSensorOperator

An Airflow sensor that defers until a specific message is published to Kafka.

The sensor creates a consumer that reads the Kafka log until it encounters a positive event.

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable - if commit_offset is True (default), commit the message offset after processing - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

Parameters:
  • kafka_config_id (str) – The connection object to use, defaults to “kafka_default”

  • topics (collections.abc.Sequence[str]) – Topics (or topic regex) to use for reading from

  • apply_function (str | None) – The function to apply to messages to determine if an event occurred. As a dot notation string.

  • apply_function_args (collections.abc.Sequence[Any] | None) – Arguments to be applied to the processing function, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – Key word arguments to be applied to the processing function, defaults to None

  • poll_timeout (float) – How long the kafka consumer should wait for a message to arrive from the kafka cluster,defaults to 1

  • poll_interval (float) – How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

  • xcom_push_key (str | None) – the name of a key to push the returned message to, defaults to None

  • commit_offset (bool) – Whether to commit the message offset after processing. If False, the offset is not committed by the sensor, allowing downstream tasks to commit it manually (e.g., after successful processing). Defaults to True.

  • soft_fail – Set to true to mark the task as SKIPPED on failure

  • timeout – Time elapsed before the task times out and fails (in seconds)

  • poke_interval – This parameter is inherited but not used in this deferrable implementation

  • mode – This parameter is inherited but not used in this deferrable implementation

BLUE = '#ffefeb'[source]
ui_color = '#ffefeb'[source]
template_fields = ('topics', 'apply_function', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id',...[source]
topics[source]
apply_function[source]
apply_function_args = None[source]
apply_function_kwargs = None[source]
kafka_config_id = 'kafka_default'[source]
poll_timeout = 1[source]
poll_interval = 5[source]
xcom_push_key = None[source]
commit_offset = True[source]
execute(context)[source]

Derive when creating an operator.

The main method to execute the task. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]
class airflow.providers.apache.kafka.sensors.kafka.AwaitMessageTriggerFunctionSensor(topics, apply_function, event_triggered_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, **kwargs)[source]

Bases: airflow.providers.common.compat.sdk.BaseSensorOperator

Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting.

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable and commit the message offset - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

Parameters:
  • kafka_config_id (str) – The connection object to use, defaults to “kafka_default”

  • topics (collections.abc.Sequence[str]) – Topics (or topic regex) to use for reading from

  • apply_function (str | None) – The function to apply to messages to determine if an event occurred. As a dot notation string.

  • event_triggered_function (collections.abc.Callable) – The callable to trigger once the apply_function encounters a positive event.

  • apply_function_args (collections.abc.Sequence[Any] | None) – Arguments to be applied to the processing function, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – Key word arguments to be applied to the processing function, defaults to None

  • poll_timeout (float) – How long the kafka consumer should wait for a message to arrive from the kafka cluster, defaults to 1

  • poll_interval (float) – How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

  • soft_fail – Set to true to mark the task as SKIPPED on failure

  • timeout – Time elapsed before the task times out and fails (in seconds)

  • poke_interval – This parameter is inherited but not used in this deferrable implementation

  • mode – This parameter is inherited but not used in this deferrable implementation

BLUE = '#ffefeb'[source]
ui_color = '#ffefeb'[source]
template_fields = ('topics', 'apply_function', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[source]
topics[source]
apply_function[source]
apply_function_args = None[source]
apply_function_kwargs = None[source]
kafka_config_id = 'kafka_default'[source]
poll_timeout = 1[source]
poll_interval = 5[source]
event_triggered_function[source]
execute(context, event=None)[source]

Derive when creating an operator.

The main method to execute the task. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]

Was this entry helpful?