Usage Overview

This section provides a general overview of how to use Kafka Manager.

KafkaManager

import json

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace topic name with your choice
group_id = 'example_group'  # Replace consumer group ID with your choice


def message_handler(message):
    """
    This method is a callback function called by the consumer, which handles the received messages when a new message
    arrives.

    In production real-world application, the received message would be processed as follows:
    - Perform some business logic
    - Store the message in a database for further processing.
    - Message deserialization
    - etc.

    :param message: Message received from the consumer.
    """
    print(f'Received message: Partition={message.partition}, Offset={message.offset}, Value={message.value}')


def main():
    # 1. Create a KafkaManager instance
    kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

    # 2. For topic management connect to Kafka admin client
    if not kafka_manager.connect_admin_client():
        print('Failed to connect to Kafka admin client.')
        exit()

    # 3. Create a topic - (if it doesn't exist)
    try:
        # Define topic
        if not kafka_manager.create_topic(topic_name=topic_name, num_partitions=1, replication_factor=1):
            print(f'Failed to create topic "{topic_name}". Check that if the topic already exists.')
        # In production, please ensure the topic exists; don't exit the check and continue.
    except Exception as e:
        print(f'Error in creating Kafka topic: {e}')

    # 4. Start the Kafka producer
    if not kafka_manager.start_producer():
        print('Failed to start Kafka producer.')
        exit()

    # 5. Send Kafka message
    try:
        message_payload = json.dumps({
            "message_key": "message_value"
        })
        metadata = kafka_manager.send_message(topic=topic_name, value=message_payload)
        if metadata:
            print(f'Message sent successfully to Kafka topic: "{topic_name}"')
        else:
            print(f'Failed to send message to Kafka topic: "{topic_name}"')
    except Exception as e:
        print(f'Error in sending message to Kafka topic: {e}')

    # 6. Create a Kafka Consumer
    consumer = kafka_manager.create_consumer(topics=[topic_name], group_id=group_id, auto_offset_reset='earliest')
    if consumer is None:
        print('Failed to create consumer.')
        exit()

    # 7. Start the Kafka Consumer
    if not kafka_manager.start_consumer(consumer_id=group_id):
        print('Failed to start consumer.')
        exit()

    # 8. Consume Messages
    kafka_manager.consume_messages(consumer_id=group_id, message_handler=message_handler)

    # 9. Stop Kafka producers, Kafka consumers and Admin Client.
    kafka_manager.stop_producer()
    kafka_manager.stop_consumer(consumer_id=group_id)
    kafka_manager.close_admin_client()
    print('Stopped all Kafka producer, consumer and admin client.')


if __name__ == "__main__":
    main()

For more detailed examples, refer to the KafkaManager API documentation for more details.