airflow.providers.apache.kafka.sensors.kafka

Module Contents

Classes

AwaitMessageSensor

An Airflow sensor that defers until a specific message is published to Kafka.

AwaitMessageTriggerFunctionSensor

Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting.

Attributes

VALID_COMMIT_CADENCE

airflow.providers.apache.kafka.sensors.kafka.VALID_COMMIT_CADENCE[source]
class airflow.providers.apache.kafka.sensors.kafka.AwaitMessageSensor(topics, apply_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, xcom_push_key=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

An Airflow sensor that defers until a specific message is published to Kafka.

The sensor creates a consumer that reads the Kafka log until it encounters a positive event.

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable and commit the message offset - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

Parameters
  • kafka_config_id (str) – The connection object to use, defaults to “kafka_default”

  • topics (Sequence[str]) – Topics (or topic regex) to use for reading from

  • apply_function (str) – The function to apply to messages to determine if an event occurred. As a dot notation string.

  • apply_function_args (Sequence[Any] | None) – Arguments to be applied to the processing function, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – Key word arguments to be applied to the processing function, defaults to None

  • poll_timeout (float) – How long the kafka consumer should wait for a message to arrive from the kafka cluster,defaults to 1

  • poll_interval (float) – How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

  • xcom_push_key – the name of a key to push the returned message to, defaults to None

BLUE = '#ffefeb'[source]
ui_color[source]
template_fields = ('topics', 'apply_function', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[source]
execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]
class airflow.providers.apache.kafka.sensors.kafka.AwaitMessageTriggerFunctionSensor(topics, apply_function, event_triggered_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5, **kwargs)[source]

Bases: airflow.models.BaseOperator

Defer until a specific message is published to Kafka, trigger a registered function, then resume waiting.

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable and commit the message offset - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

Parameters
  • kafka_config_id (str) – The connection object to use, defaults to “kafka_default”

  • topics (Sequence[str]) – Topics (or topic regex) to use for reading from

  • apply_function (str) – The function to apply to messages to determine if an event occurred. As a dot notation string.

  • event_triggered_function (Callable) – The callable to trigger once the apply_function encounters a positive event.

  • apply_function_args (Sequence[Any] | None) – Arguments to be applied to the processing function, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – Key word arguments to be applied to the processing function, defaults to None

  • poll_timeout (float) – How long the kafka consumer should wait for a message to arrive from the kafka cluster, defaults to 1

  • poll_interval (float) – How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

BLUE = '#ffefeb'[source]
ui_color[source]
template_fields = ('topics', 'apply_function', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[source]
execute(context, event=None)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]

Was this entry helpful?