Notebook - Compare Async and Sync Producers¶
# Topic topic_example_v1 has been created
from confluent_kafka.admin import AdminClient, NewTopic
# def load_config():
# """Load Kafka configuration."""
# return {
# 'bootstrap.servers': '{server}',
# 'security.protocol': '{}',
# 'sasl.mechanisms': '{}',
# 'sasl.username': '{api key}',
# 'sasl.password': '{api password}'
# }
## Recommended way of loading secrets from .env file
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def load_config():
"""Load Kafka configuration."""
return {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
'security.protocol': os.getenv('SECURITY_PROTOCOL'),
'sasl.mechanisms': os.getenv('SASL_MECHANISMS'),
'sasl.username': os.getenv('SASL_USERNAME'),
'sasl.password': os.getenv('SASL_PASSWORD')
}
# in python
config = load_config()
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient(config)
# List all topics
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
topic_metadata = admin_client.list_topics(timeout=5)
list(topic_metadata.topics.keys())
['demo_3_producer_trial3_applicants', 'test_1_topic', 'demo_3_producer_trial2_soccer', 'demo_3_producer_trial3_evaluation', 'demo_3_producer_trial1', 'demo1_free_text', 'topic_example_v1']
# Create callback function
# Producer callback
def delivery_report(err, msg):
"""Callback to report the result of the produce operation."""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Define message to generate
import random
def produce_messages(producer, topic_name, num_messages=10):
"""Produce messages with the given format."""
names = ["Alice", "Bob", "Charlie"]
for i in range(num_messages):
name = random.choice(names)
lat = random.uniform(-90, 90)
long = random.uniform(-180, 180)
message_key = f"rider-{name}-{i}" # Just as an example. Choose a meaningful key if needed.
message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})"
producer.produce(topic_name, key=message_key, value=message_value, callback=delivery_report)
producer.poll(0.1) # To trigger the delivery report callback for feedback
producer.flush() # Ensure all messages are sent
## Using producer.flush() before shutting down your producer ensures that all messages are delivered and that any callbacks associated with these messages are executed.
from confluent_kafka import Producer
## main: Producer
producer = Producer(config)
topic_name = "topic_example_v1"
produce_messages(producer, topic_name,num_messages=10)
Message delivered to topic_example_v1 [2] at offset 526 Message delivered to topic_example_v1 [2] at offset 527 Message delivered to topic_example_v1 [0] at offset 507 Message delivered to topic_example_v1 [0] at offset 508 Message delivered to topic_example_v1 [0] at offset 509 Message delivered to topic_example_v1 [1] at offset 547 Message delivered to topic_example_v1 [1] at offset 548 Message delivered to topic_example_v1 [1] at offset 549 Message delivered to topic_example_v1 [1] at offset 550 Message delivered to topic_example_v1 [1] at offset 551
## synchronous producer
import random
def produce_messages_synchronously(producer, topic_name, num_messages=10):
"""Produce messages synchronously."""
names = ["Alice", "Bob", "Charlie"]
for i in range(num_messages):
name = random.choice(names)
lat = random.uniform(-90, 90)
long = random.uniform(-180, 180)
message_key = f"rider-{name}-{i}" # Example key
message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})"
producer.produce(topic_name, key=message_key, value=message_value)
# Wait for message delivery and handle the result
result = producer.flush(timeout=10) # Adjust the timeout as needed
if result > 0:
print(f"Failed to deliver {result} messages")
else:
print("All messages delivered successfully")
Summary¶
The producer.flush(timeout=10) method in the Confluent Kafka Python client is used to ensure that all outstanding/queued messages are delivered and all callbacks are invoked. The flush() method works by waiting until all the messages in the producer's local queue are delivered to the Kafka broker, or the provided timeout (in seconds) expires.
Here's a breakdown of what happens:
The
flush()method initiates the delivery of all the messages that are currently in the producer's local queue.It then waits for the acknowledgments from the broker for these messages.
If the acknowledgments for all the messages are received before the provided timeout,
flush()will return 0, indicating that all messages have been successfully delivered.If the provided timeout expires before acknowledgments for all messages are received,
flush()will return a non-zero value indicating the number of messages that remain in the queue (i.e., the number of messages that haven't been successfully delivered).
In the synchronous producer code you provided, after each message is produced, the flush() method is called with a timeout of 10 seconds. This means that after producing each message, the code will wait up to 10 seconds for the acknowledgment of that message from the Kafka broker. If the acknowledgment is received within that timeframe, the message is considered delivered; otherwise, the code will move to the next iteration of the loop (or exit the loop) and the if-else block will print out how many messages failed to be delivered.
This use of flush() after producing each message gives the producer synchronous behavior: it won't proceed until it has confirmation that the current message has been delivered or the timeout is reached.
from confluent_kafka import Producer
# Main Execution
producer = Producer(config)
topic_name = "topic_example_v1"
produce_messages_synchronously(producer, topic_name, num_messages=10)
All messages delivered successfully All messages delivered successfully All messages delivered successfully All messages delivered successfully All messages delivered successfully All messages delivered successfully All messages delivered successfully All messages delivered successfully All messages delivered successfully All messages delivered successfully
Created: 2023-10-22