Python操作Kafka例子

Kafka作为消息中间件是非常受欢迎的,对应Python库名为kafka-python

import json
import traceback
from kafka.errors import kafka_errors
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], 
                         key_serializer=lambda key: json.dumps(key).encode(),
                         value_serializer=lambda value: json.dumps(value).encode())

for i in range(3):
    msg = 'message ' + str(i)
    future = producer.send('kafka_topic', key='count', value=msg)  
    print(f"send {msg}")
    try:
        future.get(timeout=10)    # 看是否发送成功           
    except kafka_errors:
        traceback.print_exc()

 上段为Kafka生产者,下段对应Kafka消费者,KafkaConsumer会进入持续接收状态;

consumer = KafkaConsumer('kafka_topic', bootstrap_servers=['localhost:9092'], group_id='test')
for msg in consumer:
    print(f"receive, key: {json.loads(msg.key.decode())}, value: {json.loads(msg.value.decode())}")

### PythonKafka集成概述 为了使Python能够与Kafka进行交互,通常会借助`kafka-python`库。该库提供了用于连接至Kafka集群、发送和接收消息的功能[^1]。 ### 安装依赖包 在开始之前,需确保已安装`kafka-python`库。可以通过pip命令轻松完成此操作: ```bash pip install kafka-python ``` ### 生产者示例:向Kafka主题发送消息 下面展示了一个简单的生产者例子,展示了如何创建一个KafkaProducer对象并向指定的主题发送字符串形式的消息。 ```python from kafka import KafkaProducer import json producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) for i in range(10): producer.send('demo-topic', {'number': i}) producer.flush() producer.close() ``` 这段代码初始化了一个KafkaProducer实例,并指定了序列化器以便于传输JSON格式的数据。接着循环调用了send()函数十次,每次都将一个新的字典作为消息体发出;最后调用flush()确保所有未确认的消息都被立即发送出去,并关闭了生产者的连接。 ### 消费者示例:从Kafka主题接收消息 对于消费者而言,则是监听某个特定主题中的新消息到达事件。这里给出了一段基本的消费者实现方式: ```python from kafka import KafkaConsumer consumer = KafkaConsumer( 'demo-topic', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group' ) try: for msg in consumer: print(f"Received message: {msg.value.decode('utf-8')}") finally: consumer.close() ``` 上述脚本构建了一个名为`my-group`的消费者组成员,它将持续不断地拉取来自`demo-topic`的新记录直到程序被手动终止为止。每当有新的消息到来时,便会触发一次迭代过程,在其中解析并显示每条接收到的信息内容[^2]。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值