Notebook - Setup & Creating Topics¶
Virtual Environment Setup¶
A virtual environment is a tool that helps to keep dependencies required by different projects separate. It essentially allows you to create a virtual Python environment that is isolated from the global Python environment. This way, different projects can have different dependencies without any conflicts.
Setting up a Virtual Environment¶
Create a Virtual Environment: Navigate to your project directory in the terminal and run the following command to create a new virtual environment named
venv:python -m venv venv
Activate the Virtual Environment:
Linux & Mac:
source venv/bin/activate
Windows (PowerShell):
.\venv\Scripts\Activate.ps1
Install Required Packages:
After activating your virtual environment, you can install required packages using pip. If the project provides a
requirements.txtfile (we haveconfluent_kafkaandpython-dotenv), you can install all the required packages with:pip install confluent_kafka python-dotenv
Secrets
Get the description of the server.
confluent kafka cluster describe> confluent kafka cluster describe +----------------------+--------------------------------------------------------+ | Current | true | | ID | lkc-v1x365 | | Name | cluster_0 | | Type | BASIC | | Ingress Limit (MB/s) | 250 | | Egress Limit (MB/s) | 750 | | Storage | 5 TB | | Provider | gcp | | Region | us-west4 | | Availability | single-zone | | Status | UP | | Endpoint | SASL_SSL://pkc-lzvrd.us-west4.gcp.confluent.cloud:9092 | | REST Endpoint | https://pkc-lzvrd.us-west4.gcp.confluent.cloud:443 | | Topic Count | 7 | +----------------------+--------------------------------------------------------+confluent api-key create --resource {id}> confluent api-key create --resource lkc-v1x365 It may take a couple of minutes for the API key to be ready. Save the API key and secret. The secret is not retrievable later. +------------+------------------------------------------------------------------+ | API Key | 4REPMBKZ2PYXJ23S | | API Secret | bosLhayIKi6G6BAEmh9mhtzAb9kgwB+D2zAuSpfrF962wWt6ZuXpFODjL7NDHFeT | +------------+------------------------------------------------------------------+```Create a
.envfile in your project directory with the following content:BOOTSTRAP_SERVERS={Endpoint} SECURITY_PROTOCOL=SASL_SSL SASL_MECHANISMS=PLAIN SASL_USERNAME={API Key} SASL_PASSWORD={API Secret}
(Optional) If you're using Jupyter, specify Python Kernel within Jupyter Notebook (ipynb)¶
If you're using Jupyter Notebook or Jupyter Lab, it's important to ensure that the notebook is using the correct Python kernel, especially if you have multiple Python environments or versions.
Install
ipykernel:After activating your virtual environment, ensure you have
ipykernelinstalled:pip install ipykernel
Choose the Correct Kernel in Jupyter:
- Start Jupyter Notebook or Jupyter Lab.
- Open your
.ipynbnotebook. - From the menu, go to
Kernel->Change kerneland selectvenv(or the name you provided in the previous step).
Now, your Jupyter notebook will use the Python version from your virtual environment and any packages you've installed in it.
Create Topic¶
from confluent_kafka.admin import AdminClient, NewTopic
## Not recommended. Secrets are open to public.
# def load_config():
# """Load Kafka configuration."""
# return {
# 'bootstrap.servers': '{server}',
# 'security.protocol': '{}',
# 'sasl.mechanisms': '{}',
# 'sasl.username': '{api key}',
# 'sasl.password': '{api password}'
# }
## Recommended way of loading secrets from .env file
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def load_config():
"""Load Kafka configuration."""
return {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
'security.protocol': os.getenv('SECURITY_PROTOCOL'),
'sasl.mechanisms': os.getenv('SASL_MECHANISMS'),
'sasl.username': os.getenv('SASL_USERNAME'),
'sasl.password': os.getenv('SASL_PASSWORD')
}
##
def topic_exists(admin_client, topic_name):
"""Check topic existence."""
return topic_name in set(admin_client.list_topics(timeout=5).topics.keys())
def create_topic(admin_client, topic_name, partitions=1, replication_factor=1, config={}):
"""Create topic if not existing."""
if not topic_exists(admin_client, topic_name):
new_topic = [
NewTopic(
topic_name,
num_partitions=partitions,
replication_factor=replication_factor,
config=config)]
created_topic = admin_client.create_topics(new_topic)
for topic, f in created_topic.items():
try:
f.result()
print(f"Topic {topic} created")
except Exception as e:
print(f"Failed to create topic {topic}: {e}")
else:
print(f"Topic {topic_name} already exists")
# Main execution
conf = load_config()
admin_client = AdminClient(conf)
topic_name = "topic_example_v1"
# Example topic config
topic_config = {'cleanup.policy': 'compact'}
# setting 'cleanup.policy': 'compact' ensures that the topic retains only the latest message for a particular key, discarding older messages for the same key.
create_topic(admin_client, topic_name, partitions=3, replication_factor=3, config=topic_config)
Topic topic_example_v1 already exists
# List all topics
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
topic_metadata = admin_client.list_topics(timeout=5)
list(topic_metadata.topics.keys())
['demo_3_producer_trial3_applicants', 'test_1_topic', 'demo_3_producer_trial2_soccer', 'demo_3_producer_trial3_evaluation', 'demo_3_producer_trial1', 'demo1_free_text', 'topic_example_v1']
Created: 2023-10-22