Kafka学习二
一个正常的生产逻辑具备的步骤:
配置生产者参数及生产者实例
构建待发送消息
发送消息
关闭生产者实例回收资源
需要设置的必要的三个参数:
bootstrap.servers
key.serializer
value.serializer
消息发送通常有三种方式:
发后既忘、同步、异步,从重载方法可以看到发送参数采用的返回值采用的是Future,同时异步采用了callbck来实现异步调用。
KafkaProducer通常会发生的两种类型的异常:
可重试异常:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartionException、NotExceptionRepublicasException、NotCoordinatorException等
对于可重试异常,如果配置了retries参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。配置参数方法:
props.put(ProducerConfig.RETRIES_CONFIG,10)
不可重试异常:不可恢复,直接抛异常,需要处理解决,此时就需要一个好的异常处理了,而采用Future进行返回值,通常采用callback进行回调,要么成功,要么失败。
KafkaProducer的整体架构:
前面我们说到,生产者消息在发送到Kafka之前需要经过一系列动作才能完成,那么具体的流程是怎样的呢?
整个生产者由两个线程(主线程和Sender线程)协调运行。在主线程中,Kafka生产客户端会创建消息,消息可能经过拦截器、序列化器、分区器,这里用可能是因为拦截器和分区器不是必须的。
由于消息不是一条一条发送到服务器端的,因此就需要对消息进行收集,收集的过程中会用到缓存,此时消息到消息收集器。接着采用Sender线程进行批量消息发送,这样做的好处是减少网络传输提高性能。而缓存大小可以在客户端通过参数进行调节。
这个消息收集器采用的数据结构是双端队列结构,每个分区维护一个双端队列,队列内容是消息批次。因为我们知道消息采用的追加的形式。一个消息批次(ProducerBatch)包含多个消息(producerRecord)。
在消息收集器内部还有一个BufferPool来实现ByteBuffer的复用,以便高效利用缓存。同时其只针对特定大小的ByteBuffer进行管理。我们可以调节batch.size参数来指定。
Sender从消息收集器中获取缓存消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式变成<Node,List<ProducerBatch>>的形式,接着进一步转成<Node,Request>的形式,而Request指Kafka的各种协议请求。这样就可以将Request请求发送到各个节点了。
请求从Sender线程发往Kafka之前还会保存到InFlightReauests中,其保存对象格式Map<NodeId,Deque<Request>>,它的主要作用缓存了已经发出去但还没有收到响应的请求。
在发送之前需要知道的元信息:
主题分区的数量、计算出的目标分区、broker的相关的端口、地址信息等。
元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过Synchronized和final关键字来保障。
一些重要参数:
acks:指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。分为三种:1(可靠和吞吐量的折中),0(可达到最大吞吐量),-1或all(消息达到最强可靠性)
max.request.size:用来限制生产者客户端能发送消息的最大值,通常情况下,采用默认值即可,1048576b。
retries:用于配置生产者重试的次数
retry.backoff.ms:两次重试之间的时间间隔
compression.type:消息采用的压缩方式,默认为none。当然我们可以采用gzip、snappy、lz4等方式。
connections.max.idles.ms:在多久之后关闭限制连接,540000ms,9分钟
linger.ms
receive.buffer.bytes:设置Socket接收消息缓冲区的大小,32kb
sender.buffer.bytes:设置Socket发送消息缓冲区的大小,默认128kb
request.timeout.ms:配置producer等待请求响应的最长时间,默认30000毫秒,也即30秒。
以上是对生产者的学习。