Notebook - Asynchronous Producer w/ Serialization¶
Serialization with JSON:
- The serialization in
RiderRequestnow produces a formatted string as per your new requirements. - In the
produce_messages()function, I've created an instance ofRiderRequestand then called theserialize()method on it to generate the desired message value. - You had commas at the end of your lat and long assignments, which would make them tuples. I've corrected that mistake.
The dataclass decorator plays a couple roles in this producer code that uses serialization. It provides a structured data model that serializes cleanly. The serialization happens outside of the class, so dataclasses are not required. But they make the code and data model clearer.
- It defines the RiderRequest data model in a structured way:
The fields (name, lat, long) are explicitly declared with types.
Using a class lets you encapsulate serialization logic in the serialize() method.
The dataclass decorator autogenerates init, repr, etc.
- It allows creating RiderRequest objects easily:
Can instantiate with RiderRequest(name, lat, long)
Cleaner than using a dictionary or tuple
- It integrates well with serialization:
Libraries like Avro/Protobuf work well with classes/structs
The data is organized logically for serialization
serialize() method has access to fields directly
In [13]:
Copied!
# Topic topic_example_v1 has been created
from confluent_kafka.admin import AdminClient, NewTopic
# def load_config():
# """Load Kafka configuration."""
# return {
# 'bootstrap.servers': '{server}',
# 'security.protocol': '{}',
# 'sasl.mechanisms': '{}',
# 'sasl.username': '{api key}',
# 'sasl.password': '{api password}'
# }
## Recommended way of loading secrets from .env file
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def load_config():
"""Load Kafka configuration."""
return {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
'security.protocol': os.getenv('SECURITY_PROTOCOL'),
'sasl.mechanisms': os.getenv('SASL_MECHANISMS'),
'sasl.username': os.getenv('SASL_USERNAME'),
'sasl.password': os.getenv('SASL_PASSWORD')
}
# in python
config = load_config()
# Topic topic_example_v1 has been created from confluent_kafka.admin import AdminClient, NewTopic # def load_config(): # """Load Kafka configuration.""" # return { # 'bootstrap.servers': '{server}', # 'security.protocol': '{}', # 'sasl.mechanisms': '{}', # 'sasl.username': '{api key}', # 'sasl.password': '{api password}' # } ## Recommended way of loading secrets from .env file import os from dotenv import load_dotenv # Load environment variables load_dotenv() def load_config(): """Load Kafka configuration.""" return { 'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'), 'security.protocol': os.getenv('SECURITY_PROTOCOL'), 'sasl.mechanisms': os.getenv('SASL_MECHANISMS'), 'sasl.username': os.getenv('SASL_USERNAME'), 'sasl.password': os.getenv('SASL_PASSWORD') } # in python config = load_config()
In [14]:
Copied!
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient(config)
# List all topics
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
topic_metadata = admin_client.list_topics(timeout=5)
list(topic_metadata.topics.keys())
from confluent_kafka.admin import AdminClient, NewTopic admin_client = AdminClient(config) # List all topics # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html topic_metadata = admin_client.list_topics(timeout=5) list(topic_metadata.topics.keys())
Out[14]:
['demo_3_producer_trial3_applicants', 'test_1_topic', 'demo_3_producer_trial2_soccer', 'demo_3_producer_trial3_evaluation', 'demo_3_producer_trial1', 'demo1_free_text', 'topic_example_v1']
In [22]:
Copied!
# Create callback function
# Producer callback
def delivery_report(err, msg):
"""Callback to report the result of the produce operation."""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Create callback function # Producer callback def delivery_report(err, msg): """Callback to report the result of the produce operation.""" if err is not None: print(f"Message delivery failed: {err}") else: print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
In [31]:
Copied!
from dataclasses import dataclass
import json
@dataclass
class RiderRequest:
name: str
lat: float
long: float
def serialize(self) -> str:
"""Serializes the object in the desired string format"""
return f"rider {self.name} requests a car at ({self.lat:.2f}, {self.long:.2f})"
def produce_messages(producer, topic_name, num_messages=10):
names = ["Alice", "Bob", "Charlie"]
for i in range(num_messages):
name = random.choice(names)
lat = random.uniform(-90, 90)
long = random.uniform(-180, 180)
message_key = f"rider-{name}-{i}" # Just as an example. Choose a meaningful key if needed.
## serialization of the message_value
request = RiderRequest(name=name, lat=lat, long=long)
serialized_message = request.serialize()
print(serialized_message)
producer.produce(topic_name, key=message_key, value=serialized_message, callback=delivery_report)
producer.poll(0.1)
producer.flush()
from dataclasses import dataclass import json @dataclass class RiderRequest: name: str lat: float long: float def serialize(self) -> str: """Serializes the object in the desired string format""" return f"rider {self.name} requests a car at ({self.lat:.2f}, {self.long:.2f})" def produce_messages(producer, topic_name, num_messages=10): names = ["Alice", "Bob", "Charlie"] for i in range(num_messages): name = random.choice(names) lat = random.uniform(-90, 90) long = random.uniform(-180, 180) message_key = f"rider-{name}-{i}" # Just as an example. Choose a meaningful key if needed. ## serialization of the message_value request = RiderRequest(name=name, lat=lat, long=long) serialized_message = request.serialize() print(serialized_message) producer.produce(topic_name, key=message_key, value=serialized_message, callback=delivery_report) producer.poll(0.1) producer.flush()
In [ ]:
Copied!
# # Define message to generate
# import random
# def produce_messages(producer, topic_name, num_messages=10):
# """Produce messages with the given format."""
# names = ["Alice", "Bob", "Charlie"]
# for i in range(num_messages):
# name = random.choice(names)
# lat = random.uniform(-90, 90)
# long = random.uniform(-180, 180)
# message_key = f"rider-{name}-{i}" # Just as an example. Choose a meaningful key if needed.
# message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})"
# producer.produce(topic_name, key=message_key, value=message_value, callback=delivery_report)
# producer.poll(0.1) # To trigger the delivery report callback for feedback
# producer.flush() # Ensure all messages are sent
# # Define message to generate # import random # def produce_messages(producer, topic_name, num_messages=10): # """Produce messages with the given format.""" # names = ["Alice", "Bob", "Charlie"] # for i in range(num_messages): # name = random.choice(names) # lat = random.uniform(-90, 90) # long = random.uniform(-180, 180) # message_key = f"rider-{name}-{i}" # Just as an example. Choose a meaningful key if needed. # message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})" # producer.produce(topic_name, key=message_key, value=message_value, callback=delivery_report) # producer.poll(0.1) # To trigger the delivery report callback for feedback # producer.flush() # Ensure all messages are sent
In [32]:
Copied!
from confluent_kafka import Producer
## main: Producer
producer = Producer(config)
topic_name = "topic_example_v1"
produce_messages(producer, topic_name)
from confluent_kafka import Producer ## main: Producer producer = Producer(config) topic_name = "topic_example_v1" produce_messages(producer, topic_name)
rider Bob requests a car at (-27.66, 130.09) rider Charlie requests a car at (65.48, -90.80) rider Alice requests a car at (72.15, -110.09) rider Alice requests a car at (32.27, -95.44) rider Alice requests a car at (-39.97, -168.07) rider Alice requests a car at (-33.27, -85.01) rider Charlie requests a car at (-19.57, 100.28) rider Bob requests a car at (-11.50, 141.31) Message delivered to topic_example_v1 [2] at offset 541 Message delivered to topic_example_v1 [2] at offset 542 rider Alice requests a car at (38.43, 40.31) Message delivered to topic_example_v1 [0] at offset 520 Message delivered to topic_example_v1 [0] at offset 521 Message delivered to topic_example_v1 [0] at offset 522 rider Bob requests a car at (0.37, 30.56) Message delivered to topic_example_v1 [2] at offset 543 Message delivered to topic_example_v1 [1] at offset 569 Message delivered to topic_example_v1 [1] at offset 570 Message delivered to topic_example_v1 [1] at offset 571 Message delivered to topic_example_v1 [1] at offset 572
In [ ]:
Copied!
Last update: 2023-10-22
Created: 2023-10-22
Created: 2023-10-22