Notebook - Consumer¶
Kafka Producer-Consumer Demo Overview¶
In this demo, we'll be exploring a fundamental pattern in event-driven architectures: producing and consuming messages with Kafka. We will simulate a simple scenario where users (riders) request cars, producing this request as a message to a Kafka topic. Concurrently, we will have a consumer reading these requests and processing them.
Key Components:
- Producer: Simulates user requests by producing random
rider-nameandlocationdata to a Kafka topic. - Consumer: Listens to the Kafka topic and processes the incoming user requests by printing them.
- Configuration Loader: Loads the Kafka configurations required for authentication and communication.
- Callbacks: Handle specific events, such as a successful message delivery or topic partition assignment.
Technical Highlights:
- Asynchronous programming with
asyncioto handle concurrent operations. - Use of the Confluent Kafka Python library for Kafka operations.
- Integration with
.envfiles for environment variable management usingdotenv.
List of Used files¶
Please run this python file in your directory. Standalone python file.
The .env file is here as reference.
Kafka Consumer-Producer Demo Setup and Execution¶
1. Virtual Environment Setup¶
Virtual environments allow for isolated spaces to manage dependencies. It's beneficial to use a virtual environment to avoid potential conflicts between package versions.
1.1. Create & Activate a Virtual Environment¶
Create: Navigate to your project directory and execute:
python -m venv venv
Activate:
For Linux & Mac:
source venv/bin/activate
For Windows (PowerShell):
.\venv\Scripts\Activate.ps1
1.2. Install Dependencies¶
With the virtual environment activated, install the necessary packages:
pip install confluent_kafka python-dotenv asyncio
2. Kafka Cluster Setup and API Key Creation¶
To connect with the Kafka cluster, follow these steps:
2.1. Get Cluster Description¶
Retrieve details of the Kafka cluster:
confluent kafka cluster describe
2.2. Create an API Key¶
Generate an API key for your Kafka cluster:
confluent api-key create --resource {id}
Replace {id} with your cluster ID from the previous step.
2.3. Set up Environment Variables¶
Use the provided details to populate an .env file:
BOOTSTRAP_SERVERS={Endpoint}
SECURITY_PROTOCOL=SASL_SSL
SASL_MECHANISMS=PLAIN
SASL_USERNAME={API Key}
SASL_PASSWORD={API Secret}
CONSUMER_GROUP_ID=my_consumer_group
3. Running the Demo¶
Once the setup is complete, run the demo script:
python3 demo_d_consumer_v2.py consumer_example_v2
4. (Optional) Jupyter Notebook Setup¶
If using Jupyter Notebook or Jupyter Lab, it's crucial to set the appropriate Python kernel.
4.1. Install ipykernel¶
pip install ipykernel
4.2. Set the Jupyter Kernel¶
- Start Jupyter Notebook or Jupyter Lab.
- Open the desired notebook.
- Select
Kernel->Change kernel->venv.
Note: Asynchronous tasks in Jupyter might behave unexpectedly. It's advised to run the Kafka code as a standalone script.
Create Topics¶
This part is the same as demos in Lec 2.
from confluent_kafka.admin import AdminClient, NewTopic
## Recommended way of loading secrets from .env file
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def load_config():
"""Load Kafka configuration."""
return {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
'security.protocol': os.getenv('SECURITY_PROTOCOL'),
'sasl.mechanisms': os.getenv('SASL_MECHANISMS'),
'sasl.username': os.getenv('SASL_USERNAME'),
'sasl.password': os.getenv('SASL_PASSWORD')
}
##
def topic_exists(admin_client, topic_name):
"""Check topic existence."""
return topic_name in set(admin_client.list_topics(timeout=5).topics.keys())
def create_topic(admin_client, topic_name, partitions=1, replication_factor=1, config={}):
"""Create topic if not existing."""
if not topic_exists(admin_client, topic_name):
new_topic = [
NewTopic(
topic_name,
num_partitions=partitions,
replication_factor=replication_factor,
config=config)]
created_topic = admin_client.create_topics(new_topic)
for topic, f in created_topic.items():
try:
f.result()
print(f"Topic {topic} created")
except Exception as e:
print(f"Failed to create topic {topic}: {e}")
else:
print(f"Topic {topic_name} already exists")
# Main execution
config = load_config()
admin_client = AdminClient(config)
topic_name = "consumer_example_v2_classroom"
# Create topic
topic_config = {'cleanup.policy': 'compact'}
create_topic(admin_client, topic_name, partitions=3, replication_factor=3, config=topic_config)
# List all topics
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
topic_metadata = admin_client.list_topics(timeout=5)
list(topic_metadata.topics.keys())
Demo: Running Consumer¶
import asyncio, os
import random
from confluent_kafka import Consumer, Producer, OFFSET_BEGINNING# Consumer Group ID for ensuring unique offset tracking
CONSUMER_GROUP_ID = os.getenv('CONSUMER_GROUP_ID', 'default-group-id')
async def consume(topic_name):
"""Asynchronously consume data from the specified Kafka Topic."""
# Short delay before initiating the consumer
await asyncio.sleep(2.5)
# Configure consumer with Kafka settings and subscribe to the topic
c = Consumer({
**config,
"group.id": CONSUMER_GROUP_ID,
"auto.offset.reset": "earliest",
})
c.subscribe([topic_name], on_assign=on_assign)
# Continuously poll for new messages in the topic
while True:
message = c.poll(1.0)
if message is None:
print("no message received by consumer")
elif message.error() is not None:
print(f"error from consumer {message.error()}")
else:
print(f"consumed message {message.key()}: {message.value()}")
await asyncio.sleep(0.1) # Brief pause to reduce CPU load
def on_assign(consumer, partitions):
"""Callback executed when partitions are assigned. Sets partition offset to beginning."""
for partition in partitions:
partition.offset = OFFSET_BEGINNING
consumer.assign(partitions)
def delivery_report(err, msg):
"""Callback function to report the result of a produce operation."""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
async def produce(topic_name, config = config):
"""Asynchronously produce random person-location data into the specified Kafka Topic."""
p = Producer(config)
names = ["Alice", "Bob", "Charlie"]
# Continuously produce messages to the topic
while True:
name = random.choice(names)
lat = random.uniform(-90, 90)
long = random.uniform(-180, 180)
message_key = f"rider-{name}".encode("utf-8")
message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})".encode("utf-8")
p.produce(topic_name, key=message_key, value=message_value, callback=delivery_report)
p.poll(0.1) # Poll to allow callbacks to be executed
await asyncio.sleep(0.1) # Brief pause to reduce CPU load
async def produce_consume(topic_name):
"""Concurrently run producer and consumer tasks using asyncio."""
t1 = asyncio.create_task(produce(topic_name)) # Task for producing messages
t2 = asyncio.create_task(consume(topic_name)) # Task for consuming messages
await t1 # Wait for producer task to complete (infinite loop in this case)
await t2 # Wait for consumer task to complete (infinite loop in this case)
## main
topic_name = 'consumer_example_v2_classroom'
await produce_consume(topic_name)
Note: Stopping the execution of a cell in Jupyter (such as by pressing the "Stop" button) will interrupt the kernel, which should ideally stop the execution of the code within the cell. However, there could be a few reasons why you're still seeing outputs:
Asynchronous Nature: The nature of asynchronous tasks means that while the main task (in this case, the Jupyter cell execution) might be stopped, some tasks might still be in the queue to be executed.
Kafka Consumer Lag: Depending on how you've configured Kafka and your consumer, there might be a lag between when messages are produced and when they are consumed. If your producer produced many messages quickly, the consumer might still be processing those even after the producer has stopped.
Jupyter Kernel State: In some situations, the Jupyter kernel might not effectively stop the execution of certain tasks or threads. This can especially be the case with certain libraries or tools that manage their own internal threads or processes.
To ensure that everything is stopped:
Manual Stop: After you've pressed the "Stop" button in Jupyter, you can manually close the producer and consumer connections to Kafka if they were opened. This will ensure that no new messages are produced or consumed.
Restart the Kernel: In the Jupyter Notebook toolbar, there's an option to restart the kernel. This will completely reset the Python process running your notebook, ensuring all tasks, threads, and processes related to it are stopped. It's a bit of a "nuclear option", but it's a surefire way to stop everything.
Improve Cleanup in Code: Consider adding cleanup code that will be executed when stopping the tasks. For instance, closing the Kafka connections gracefully, ensuring all asynchronous tasks are canceled, etc.
In the long term, if you're finding that running these tasks in Jupyter is causing issues, you might want to consider other environments for long-running or complex asynchronous tasks, like a standalone Python script, especially when dealing with systems like Kafka.
Summary¶
The key points in the Kafka producer-consumer demo:
- The demo uses
asyncioto handle concurrency between the producer and consumer tasks. This allows them to run asynchronously. - The producer randomly generates
rider nameandlocation data, and publishes it to a Kafka topic. - The consumer listens to the same topic and prints any messages it receives.
- They use the
confluent_kafkaPython library to interact with Kafka. This handles the connections, serialization, etc. Configurationlike bootstrap servers and authentication is loaded from a.envfile usingpython-dotenv. This keeps secrets out of code.Callbacksare defined to handle events like successful delivery or partition assignment.- A
consumer groupis configured to allow scaling consumers while ensuring message ordering. - Topics are created programmatically if they don't already exist.
- The demo could be adapted to publish real data and have the consumer process it in some application-specific way.
Optional¶
In the context of an indefinite loop, using close() directly within the loop itself wouldn't make sense, because the loop is designed to run forever. However, it's still good practice to handle cases where the program might be interrupted (e.g., by a keyboard interrupt, SIGTERM, or other termination signals).
The key idea is to gracefully handle those interruptions so that any buffered messages in the producer are sent to the Kafka broker (flush()) and the consumer's connection and any pending offsets are properly closed (close()).
Here's how you can achieve that:
Signal Handling: You can use Python's
signallibrary to handle termination signals and gracefully shut down the producer and consumer.Global references: You might have to make the producer and consumer references global or pass them around to ensure you can close them from the signal handling function.
Here's a simple example focusing on signal handling:
import signal
producer = None
consumer = None
def shutdown(signalnum, frame):
"""Gracefully shut down on SIGINT or SIGTERM."""
global producer, consumer
if producer:
producer.flush()
print("Flushed producer")
if consumer:
consumer.close()
print("Closed consumer")
sys.exit(0)
# Attach the signal handlers:
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
# Your existing asyncio code
# ...
async def produce(topic_name,config = config):
global producer
producer = Producer(config)
# ... rest of the produce code ...
async def consume(topic_name):
global consumer
consumer = Consumer({
**config,
"group.id": CONSUMER_GROUP_ID,
"auto.offset.reset": "earliest",
})
consumer.subscribe([topic_name], on_assign=on_assign)
# ... rest of the consume code ...
With the above implementation, if the program receives a SIGINT (Ctrl+C) or a SIGTERM, it will invoke the shutdown function, which will flush the producer and close the consumer.
This method ensures that even with an indefinite loop, there's a mechanism to handle unexpected interruptions gracefully.
Created: 2023-10-26