生产者发送消息到Kafka
我们先来看一个生产者-消费者最简单的实例.
public class DemoProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.124.20:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "greeting", "hello world! 520");
//只发不管结果
producer.send(record);
producer.close();
}
public class DemoConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.124.20:9092");
props.setProperty("group.id", "test1");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
//poll参数值, 指明线程如果没有数据时等待多长时间,0表示不等待立即返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}finally {
consumer.close();
}
}
}
结果:
offset = 0, key = greeting, value = hello world! 520
下面我们来分析下生产者发送消息过程.
生产者的配置
生产者有很多配置, 这里只介绍几个最简单的配置.
(1) bootstrap.servers
kafka服务地址, 只需要配置集群中的一个broker就行, 但保险起来一般配置多个, 以防某个broker宕机.
(2) acks
acks控制多少个分区副本必须写入消息后生产者才能认为写入成功,这个参数对消息丢失可能性有很大影响
- acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果. 这种方式的吞吐最高,但也是最容易丢失消息的.
- acks=1:生产者在该分区的主副本(leader)写入消息并返回成功后,认为消息发送成功. 这种方式能够一定程度避免消息丢失, 但如果在写入主副本后, 主副本还没有把消息同步到其他副本, 这时主副本所在broker宕机了, 那么该消息还是会丢失.
- acks=all:生产者会等待所有副本成功写入该消息, 这种方式是最安全的, 能够保证消息不丢失, 但是延迟也是最大的.
(3) key.serializer和value.serializer
配置key和value的序列化类, Kafka自带了两种序列化类, 一种是字节序列化类, 一种是字符串序列化类.
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.StringSerializer
创建消息
上面的实例中, 我们创建了一个简单的String消息, 其中"my-topic"是主题名, "greeting"是key, "hello world! 520"是value.
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "greeting", "hello world! 520");
在生产环境中, 我们一般会创建一个JSON消息, 然后以字节或字符串的形式发送到Kafka, 比如:
User user = new User();
user.setId(1);
user.setName("tyshawn");
user.setAge(18);
String msg = JSON.toJSONString(user);
//可以不设置key
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", msg);
producer.send(record);
发送消息
(1) 同步发送
在上面的实例中, 我们使用send()方法发送消息, 其实发送消息后会有一个返回值, send()方法的源码如下:
/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
我们可以把发送消息的代码块修改如下:
try {
RecordMetadata recordMetadata = producer.send(record).get();
} catch (Exception e) {
//记录日志
e.printStackTrace();
}
注意, 这里使用了Future.get()来获取发送结果, 如果发送消息失败则会抛出异常,否则返回一个