Notebook - Asynchronous Producer¶
In [1]:
Copied!
# Topic topic_example_v1 has been created
from confluent_kafka.admin import AdminClient, NewTopic
# def load_config():
# """Load Kafka configuration."""
# return {
# 'bootstrap.servers': '{server}',
# 'security.protocol': '{}',
# 'sasl.mechanisms': '{}',
# 'sasl.username': '{api key}',
# 'sasl.password': '{api password}'
# }
## Recommended way of loading secrets from .env file
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def load_config():
"""Load Kafka configuration."""
return {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
'security.protocol': os.getenv('SECURITY_PROTOCOL'),
'sasl.mechanisms': os.getenv('SASL_MECHANISMS'),
'sasl.username': os.getenv('SASL_USERNAME'),
'sasl.password': os.getenv('SASL_PASSWORD')
}
# in python
config = load_config()
# Topic topic_example_v1 has been created from confluent_kafka.admin import AdminClient, NewTopic # def load_config(): # """Load Kafka configuration.""" # return { # 'bootstrap.servers': '{server}', # 'security.protocol': '{}', # 'sasl.mechanisms': '{}', # 'sasl.username': '{api key}', # 'sasl.password': '{api password}' # } ## Recommended way of loading secrets from .env file import os from dotenv import load_dotenv # Load environment variables load_dotenv() def load_config(): """Load Kafka configuration.""" return { 'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'), 'security.protocol': os.getenv('SECURITY_PROTOCOL'), 'sasl.mechanisms': os.getenv('SASL_MECHANISMS'), 'sasl.username': os.getenv('SASL_USERNAME'), 'sasl.password': os.getenv('SASL_PASSWORD') } # in python config = load_config()
In [2]:
Copied!
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient(config)
# List all topics
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
topic_metadata = admin_client.list_topics(timeout=5)
list(topic_metadata.topics.keys())
from confluent_kafka.admin import AdminClient, NewTopic admin_client = AdminClient(config) # List all topics # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html topic_metadata = admin_client.list_topics(timeout=5) list(topic_metadata.topics.keys())
Out[2]:
['demo_3_producer_trial3_applicants', 'test_1_topic', 'demo_3_producer_trial2_soccer', 'demo_3_producer_trial3_evaluation', 'demo_3_producer_trial1', 'demo1_free_text', 'topic_example_v1']
In [3]:
Copied!
# Create callback function
# Producer callback
def delivery_report(err, msg):
"""Callback to report the result of the produce operation."""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Create callback function # Producer callback def delivery_report(err, msg): """Callback to report the result of the produce operation.""" if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
In [4]:
Copied!
# Define message to generate
import random
def produce_messages(producer, topic_name, num_messages=10):
"""Produce messages with the given format."""
names = ["Alice", "Bob", "Charlie"]
for i in range(num_messages):
name = random.choice(names)
lat = random.uniform(-90, 90)
long = random.uniform(-180, 180)
message_key = f"rider-{name}-{i}" # Just as an example. Choose a meaningful key if needed.
message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})"
producer.produce(topic_name, key=message_key, value=message_value, callback=delivery_report)
producer.poll(0.1) # To trigger the delivery report callback for feedback
producer.flush() # Ensure all messages are sent
## Using producer.flush() before shutting down your producer ensures that all messages are delivered and that any callbacks associated with these messages are executed.
# Define message to generate import random def produce_messages(producer, topic_name, num_messages=10): """Produce messages with the given format.""" names = ["Alice", "Bob", "Charlie"] for i in range(num_messages): name = random.choice(names) lat = random.uniform(-90, 90) long = random.uniform(-180, 180) message_key = f"rider-{name}-{i}" # Just as an example. Choose a meaningful key if needed. message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})" producer.produce(topic_name, key=message_key, value=message_value, callback=delivery_report) producer.poll(0.1) # To trigger the delivery report callback for feedback producer.flush() # Ensure all messages are sent ## Using producer.flush() before shutting down your producer ensures that all messages are delivered and that any callbacks associated with these messages are executed.
In [5]:
Copied!
from confluent_kafka import Producer
## main: Producer
producer = Producer(config)
topic_name = "topic_example_v1"
produce_messages(producer, topic_name)
from confluent_kafka import Producer ## main: Producer producer = Producer(config) topic_name = "topic_example_v1" produce_messages(producer, topic_name)
Message delivered to topic_example_v1 [1] at offset 538 Message delivered to topic_example_v1 [1] at offset 539 Message delivered to topic_example_v1 [1] at offset 540 Message delivered to topic_example_v1 [1] at offset 541 Message delivered to topic_example_v1 [0] at offset 501 Message delivered to topic_example_v1 [0] at offset 502 Message delivered to topic_example_v1 [0] at offset 503 Message delivered to topic_example_v1 [0] at offset 504 Message delivered to topic_example_v1 [2] at offset 521 Message delivered to topic_example_v1 [2] at offset 522
In [ ]:
Copied!
Last update: 2023-10-22
Created: 2023-10-22
Created: 2023-10-22