Module Contents



An operator that consumes from Kafka a topic(s) and processing the messages.



class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[source]

Bases: airflow.models.BaseOperator

An operator that consumes from Kafka a topic(s) and processing the messages.

The operator creates a Kafka consumer that reads a batch of messages from the cluster and processes them using the user supplied callable function. The consumer will continue to read in batches until it reaches the end of the log or reads a maximum number of messages is reached.

  • kafka_config_id (str) – The connection object to use, defaults to “kafka_default”

  • topics (str | Sequence[str]) – A list of topics or regex patterns the consumer should subscribe to.

  • apply_function (Callable[Ellipsis, Any] | str | None) – The function that should be applied to fetched one at a time. name of dag file executing the function and the function name delimited by a .

  • apply_function_batch (Callable[Ellipsis, Any] | str | None) – The function that should be applied to a batch of messages fetched. Can not be used with apply_function. Intended for transactional workloads where an expensive task might be called before or after operations on the messages are taken.

  • apply_function_args (Sequence[Any] | None) – Additional arguments that should be applied to the callable, defaults to None

  • apply_function_kwargs (dict[Any, Any] | None) – Additional key word arguments that should be applied to the callable defaults to None

  • commit_cadence (str | None) – When consumers should commit offsets (“never”, “end_of_batch”,”end_of_operator”), defaults to “end_of_operator”; if end_of_operator, the commit() is called based on the max_messages arg. Commits are made after the operator has processed the apply_function method for the maximum messages in the operator. if end_of_batch, the commit() is called based on the max_batch_size arg. Commits are made after each batch has processed by the apply_function method for all messages in the batch. if never, close() is called without calling the commit() method.

  • max_messages (int | None) – The maximum total number of messages an operator should read from Kafka, defaults to None implying read to the end of the topic.

  • max_batch_size (int) – The maximum number of messages a consumer should read when polling, defaults to 1000

  • poll_timeout (float) – How long the Kafka consumer should wait before determining no more messages are available, defaults to 60

See also

For more information on how to use this operator, take a look at the guide: ConsumeFromTopicOperator

BLUE = '#ffefeb'[source]
template_fields = ('topics', 'apply_function', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?