Kafka学习二

Kafka学习二

一个正常的生产逻辑具备的步骤:

     配置生产者参数及生产者实例

    构建待发送消息

    发送消息

    关闭生产者实例回收资源

 

需要设置的必要的三个参数:

    bootstrap.servers

    key.serializer

    value.serializer

 

消息发送通常有三种方式:

       发后既忘、同步、异步,从重载方法可以看到发送参数采用的返回值采用的是Future,同时异步采用了callbck来实现异步调用。

 

KafkaProducer通常会发生的两种类型的异常:

       可重试异常:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartionException、NotExceptionRepublicasException、NotCoordinatorException等

       对于可重试异常,如果配置了retries参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。配置参数方法:

props.put(ProducerConfig.RETRIES_CONFIG,10)

      不可重试异常:不可恢复,直接抛异常,需要处理解决,此时就需要一个好的异常处理了,而采用Future进行返回值,通常采用callback进行回调,要么成功,要么失败。

 

KafkaProducer的整体架构:

       前面我们说到,生产者消息在发送到Kafka之前需要经过一系列动作才能完成,那么具体的流程是怎样的呢?

       整个生产者由两个线程(主线程和Sender线程)协调运行。在主线程中,Kafka生产客户端会创建消息,消息可能经过拦截器、序列化器、分区器,这里用可能是因为拦截器和分区器不是必须的。

       由于消息不是一条一条发送到服务器端的,因此就需要对消息进行收集,收集的过程中会用到缓存,此时消息到消息收集器。接着采用Sender线程进行批量消息发送,这样做的好处是减少网络传输提高性能。而缓存大小可以在客户端通过参数进行调节。

       这个消息收集器采用的数据结构是双端队列结构,每个分区维护一个双端队列,队列内容是消息批次。因为我们知道消息采用的追加的形式。一个消息批次(ProducerBatch)包含多个消息(producerRecord)。

       在消息收集器内部还有一个BufferPool来实现ByteBuffer的复用,以便高效利用缓存。同时其只针对特定大小的ByteBuffer进行管理。我们可以调节batch.size参数来指定。

 

       Sender从消息收集器中获取缓存消息之后,会进一步将原本<分区,Deque<ProducerBatch>>的保存形式变成<Node,List<ProducerBatch>>的形式,接着进一步转成<Node,Request>的形式,而Request指Kafka的各种协议请求。这样就可以将Request请求发送到各个节点了。

       请求从Sender线程发往Kafka之前还会保存到InFlightReauests中,其保存对象格式Map<NodeId,Deque<Request>>,它的主要作用缓存了已经发出去但还没有收到响应的请求。

 

在发送之前需要知道的元信息:

      主题分区的数量、计算出的目标分区、broker的相关的端口、地址信息等。

       元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过Synchronized和final关键字来保障。

 

一些重要参数:

      acks:指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。分为三种:1(可靠和吞吐量的折中),0(可达到最大吞吐量),-1或all(消息达到最强可靠性)

     max.request.size:用来限制生产者客户端能发送消息的最大值,通常情况下,采用默认值即可,1048576b。

     retries:用于配置生产者重试的次数

     retry.backoff.ms:两次重试之间的时间间隔

     compression.type:消息采用的压缩方式,默认为none。当然我们可以采用gzip、snappy、lz4等方式。

     connections.max.idles.ms:在多久之后关闭限制连接,540000ms,9分钟

     linger.ms

     receive.buffer.bytes:设置Socket接收消息缓冲区的大小,32kb

     sender.buffer.bytes:设置Socket发送消息缓冲区的大小,默认128kb

     request.timeout.ms:配置producer等待请求响应的最长时间,默认30000毫秒,也即30秒。

以上是对生产者的学习。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值