Kafka发送消息和读取消息

本文详细介绍了Kafka的生产者如何发送消息,包括配置、消息序列化、同步与异步发送,以及如何自定义分区策略。接着,讨论了消费者的配置、消息拉取、提交偏移量以及如何处理消费者组的重平衡。最后,讨论了避免消息丢失和重复消费的策略,以及如何优雅退出消费者。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

生产者发送消息到Kafka

我们先来看一个生产者-消费者最简单的实例.

public class DemoProducer {
	public static void main(String[] args) {
	    Properties props = new Properties();
	    props.put("bootstrap.servers", "192.168.124.20:9092");
	    props.put("acks", "all");
	    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
	
	    ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "greeting", "hello world! 520");
	    //只发不管结果
	    producer.send(record);
	    producer.close();
}

public class DemoConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.124.20:9092");
        props.setProperty("group.id", "test1");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                //poll参数值, 指明线程如果没有数据时等待多长时间,0表示不等待立即返回
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
                }
                consumer.commitSync();
            }
        }finally {
            consumer.close();
        }
    }
}

结果:
offset = 0, key = greeting, value = hello world! 520

下面我们来分析下生产者发送消息过程.

生产者的配置

生产者有很多配置, 这里只介绍几个最简单的配置.

(1) bootstrap.servers

kafka服务地址, 只需要配置集群中的一个broker就行, 但保险起来一般配置多个, 以防某个broker宕机.

(2) acks

acks控制多少个分区副本必须写入消息后生产者才能认为写入成功,这个参数对消息丢失可能性有很大影响

  • acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果. 这种方式的吞吐最高,但也是最容易丢失消息的.
  • acks=1:生产者在该分区的主副本(leader)写入消息并返回成功后,认为消息发送成功. 这种方式能够一定程度避免消息丢失, 但如果在写入主副本后, 主副本还没有把消息同步到其他副本, 这时主副本所在broker宕机了, 那么该消息还是会丢失.
  • acks=all:生产者会等待所有副本成功写入该消息, 这种方式是最安全的, 能够保证消息不丢失, 但是延迟也是最大的.

(3) key.serializer和value.serializer

配置key和value的序列化类, Kafka自带了两种序列化类, 一种是字节序列化类, 一种是字符串序列化类.

org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.StringSerializer

创建消息

上面的实例中, 我们创建了一个简单的String消息, 其中"my-topic"是主题名, "greeting"是key, "hello world! 520"是value.

ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "greeting", "hello world! 520");

在生产环境中, 我们一般会创建一个JSON消息, 然后以字节或字符串的形式发送到Kafka, 比如:

User user = new User();
user.setId(1);
user.setName("tyshawn");
user.setAge(18);
String msg = JSON.toJSONString(user);
//可以不设置key
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", msg);
producer.send(record);

发送消息

(1) 同步发送

在上面的实例中, 我们使用send()方法发送消息, 其实发送消息后会有一个返回值, send()方法的源码如下:

/**
 * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
 */
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return send(record, null);
}

我们可以把发送消息的代码块修改如下:

try {
    RecordMetadata recordMetadata = producer.send(record).get();
} catch (Exception e) {
    //记录日志
    e.printStackTrace();
}

注意, 这里使用了Future.get()来获取发送结果, 如果发送消息失败则会抛出异常,否则返回一个

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值