KafkaManager

Kafka Manager Module

The Kafka Manager module provides a high-level abstraction over the kafka-python library to simplify interaction with Apache Kafka. It offers many functionalities to manage Kafka Producers, Consumers, Topics, and Admin Client operations, encapsulating the complexity behind kafka-python usage.

class kafka_manager.kafka_manager.KafkaManager(bootstrap_servers)[source]

Bases: object

A utility class to manage Kafka producers, consumers, and topics. It provides a higher level of abstraction for interacting with Kafka, encapsulating and defining methods to manage producers and consumers, and allowing administrative operations like topic creation, deletion, etc. It leverages the KafkaProducerClient and KafkaConsumerClient classes for lifecycle management to access and invoke the methods of producers and consumers.

property admin_client

Returns the Kafka admin client.

close()[source]

This method closes all the Kafka connections (producers, consumers, and admin clients). It’s essential to call this method to ensure that all producers, consumers, and admin clients are stopped properly and resources are released.

close_admin_client()[source]

This method closes the Kafka admin client connection.

Returns:

It returns True if the Kafka admin client was closed successfully; else it returns False otherwise.

connect_admin_client()[source]

This method connects to the Kafka admin client, establishes a connection to the Kafka broker(s), and performs administrative operations on the Kafka cluster.

Returns:

It returns True if the connection to the Kafka admin client was successful; else, it returns False.

consume_messages(consumer_id, message_handler)[source]

This method consumes messages from the given consumer_id of a specific consumer.

Parameters:
  • consumer_id – An ID of the consumer client to start consuming messages from.

  • message_handler – A function to call when a message is received.

property consumers

Returns the Kafka consumer.

create_consumer(topics: list = None, group_id: str = None, auto_offset_reset: str = 'latest', **kwargs)[source]

This method creates a new KafkaConsumerClient instance with the given configuration, establishes the connection to the Kafka broker(s), and stores the specific consumer’s configuration in the consumers dictionary with the group_id key.

Parameters:
  • topics – A list of Kafka topics to subscribe to.

  • group_id – A consumer group_id and defaults to None.

  • auto_offset_reset

    An optional parameter to sort the message ordering, which is set by default to ‘latest’ when the initial offset in Kafka does not exist.

    • ’earliest’: Automatically sorts messages earlier than the initial offset.

    • ’latest’: Automatically sorts messages later than the initial offset.

  • kwargs – Additional arguments are passed directly to the Kafka Consumer constructor, which allows further customization of the consumer (e.g., Security Settings, etc.).

Returns:

It returns the new KafkaConsumerClient instance.

create_topic(topic_name: str = None, num_partitions: int = 1, replication_factor: int = 1)[source]

This method creates a new topic with the given topic_name and it’s an administrative operation, it can only be created by the admin client.

Parameters:
  • topic_name – Name of the new topic.

  • num_partitions – Number of partitions of the new topic. Defaults to 1.

  • replication_factor – Replication factor of the new topic. Defaults to 1.

Returns:

It returns True if the topic was created successfully; else it returns False otherwise.

delete_topic(topic_name)[source]

This method deletes a topic with the given topic_name and it’s an administrative operation, it can only be deleted by the admin client.

Parameters:

topic_name – Name of the new topic.

Returns:

It returns True if the topic was deleted successfully; else it returns False otherwise.

is_producer_running()[source]

This method checks if the managed Kafka producer client is running.

Returns:

It returns True if the managed Kafka producer client is running and False otherwise.

property producer_client

Returns the Kafka producer client.

send_message(topic, value)[source]

This method sends message to the specified Kafka topic by using the managed Kafka producer client.

Parameters:
  • topic – The name of the Kafka topic send the message to.

  • value – The serialized message JSON payload.

Returns:

If the sent message was successful, it returns Metadata; otherwise, it returns None.

start_consumer(consumer_id)[source]

This method starts a specific KafkaConsumerClient instance of the given consumer_id.

Parameters:

consumer_id – An ID of the consumer client to start.

Returns:

It returns True if the consumer started successfully else, it returns False if the consumer ID is not found.

start_producer()[source]

This method starts the Kafka producer client and establishes the connection to the Kafka broker(s), which this instance manages.

Returns:

It returns True if the Kafka producer started successfully else it returns False.

stop_all_consumers()[source]

This method stops all KafkaConsumerClient instances.

stop_consumer(consumer_id)[source]

This method stops a specific KafkaConsumerClient instance of the given consumer_id.

Parameters:

consumer_id – An ID of the consumer client to stop consuming messages from.

Returns:

It returns True if the consumer stopped successfully else, it returns False if the consumer ID is not found.

stop_producer()[source]

This method will stop the managed Kafka producer client.

Returns:

It returns True if the Kafka producer stopped successfully else it returns False.

KafkaProducerClient

KafkaProducerClient Module

The Kafka Producer Client module encapsulates Kafka Producer API from the kafka-python library. It provides many functionalities for start, stop, flush, and send messages to Kafka topics.

class kafka_manager.kafka_producer_client.KafkaProducerClient(bootstrap_servers)[source]

Bases: object

A class for managing the lifecycle of a Kafka producer, which encapsulates the connection and disconnection logic, allows managing and reusing the producer within an application. To send messages, it serializes the message value to JSON format, which gives an advantage to read once deserialized.

flush()[source]

This method flushes the producer and ensures all buffered messages are sent successfully. It forces the producer to send buffered messages that have not yet been transmitted to the Kafka brokers and blocks until all outstanding messages have been successfully sent or timeout is reached.

To prevent data loss, calling the flush method is essential before stopping or closing the Kafka producer. As per the lifecycle, If the producer is terminated before all messages are flushed, there is a higher possibility that some messages may not be delivered to Kafka.

Returns:

This method does not return a value. It only performs an action to flush the produce rather than returning a result.

Raises:
  • If an exception occurs during the flushing process due to network issues, broker

  • unavailability, or any other problem preventing the producer from sending messages

  • to the brokers, an exception is raised to handle this scenario, and the caller

  • should be prepared to handle this exception on the client side.

is_producer_running()[source]

This method checks if the Kafka producer is running.

Returns:

If the Kafka producer has been running and connected, it returns True. Otherwise, it returns False.

send_message(topic: str, value: dict = None)[source]

This method sends the serialized JSON value to the Kafka topic with the given topic name using the configured Kafka producer.

Parameters:
  • topic – The topic name to send the message to.

  • value – The serialized JSON value to send to the topic.

Returns:

If the sending was successful, it returns metadata about the delivered message. Otherwise, it returns None if the producer is not running or an error occurred during sending.

start()[source]

This method starts the Kafka producer and connects to the Kafka broker(s). And configures the KafkaProducer instance, which handles the initial connection to Kafka broker(s) specified in bootstrap_servers.

To serialize the message values to JSON, the producer configured with UTF-8 encoding standard.

Returns:

It returns True if the producer started and connected successfully else False

stop()[source]

This method gracefully closes the Kafka producer connection. It’s essential to call this method once the Kafka producer processes all pending messages and resources are released.

Returns:

If the Kafka producer was stopped successfully or already stopped, it returns True. Otherwise, if an error occurs while stopping the Kafka producer, it returns False.

KafkaConsumerClient

KafkaConsumerClient Module

The Kafka Consumer Client module encapsulates Kafka Consumer API from the kafka-python library. It provides many functionalities to create, start, stop, and consume messages from Kafka topics.

class kafka_manager.kafka_consumer_client.KafkaConsumerClient(bootstrap_servers, topics, group_id=None, auto_offset_reset='latest', **kwargs)[source]

Bases: object

A class for managing the lifecycle of a Kafka consumer, which encapsulates the connection and disconnection logic, allows managing and consuming messages from a Kafka consumer within an application. It implements a deserialization technique to decode the message value from JSON to read the content and handles errors in each process.

consume(message_handler)[source]

This method continuously enters a loop to poll for new messages from the Kafka topic and calls each received message the message_handler function, passing the message object as an argument. This technique is implemented to process the messages efficiently, and it continues to consume messages until the stop() method is called, or any discrepancy or exception occurs.

Parameters:

message_handler – When each message is received, a method that takes a KafkaConsumer message object as an argument will be called. The message object contains attributes like topic, partition, offset, key, and value.

property consumer

Returns the Kafka Consumer object.

is_running()[source]

This method checks whether the Kafka consumer is currently running.

Returns:

It returns True if the consumer has been started and is currently running else, it returns False.

start()[source]

This method starts the Kafka consumer and connects to the Kafka broker(s) and subscribes to the specified topic(s). And configures the KafkaConsumer instance, which handles consuming messages from a Kafka topic.

The consumer is configured to deserialize the message from (UTF-8) encoded JSON.

Returns:

It returns True if the consumer starts and connects successfully or returns False.

stop()[source]

This method gracefully stops the Kafka consumer, unsubscribes from the topic(s), and closes the connection. It’s essential to call this method once the consumer completes message consumption to release resources. The following method will also be triggered when the consumer unsubscribes from the topic(s).

Returns:

It returns True if the consumer has stopped successfully or if it was already stopped; otherwise, it returns False if an error occurred during the consumer unsubscription.