以下文章来源于石臻臻的杂货铺 ,作者石彦祖
CSDN博客之星Top5、Kafka Contributor、LogiKM PMC、华为云MVP。进滴滴技术交流群, 不同技术专家轮流值班, 本号分享Java/大数据/中间件等领域干货和视频
在阅读本文之前,希望你可以思考下面几个问题,带着问题去阅读文章会获得更好的效果。
Kafka 为了提高 Producer 客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件再进行批量发送。这样可以减少网络请求,提高吞吐量。
而缓存这个消息的就是 RecordAccumulator 类。
上图表示的就是消息缓存的模型,生产的消息就是暂时存放在这个里面。
每条消息,我们按照 TopicPartition 维度把他们放在不同的Deque<ProducerBatch> 队列里面。TopicPartition 相同,会在相同Deque<ProducerBatch> 的里面;
ProducerBatch 表示同一个批次的消息。消息真正发送到 Broker 端的时候都是按照批次来发送的,这个批次可能包含一条或者多条消息;
如果没有找到消息对应的 ProducerBatch 队列,则创建一个队列;
找到 ProducerBatch 队尾的 Batch,发现 Batch 还可以塞下这条消息,则将消息直接塞到这个 Batch 中;
找到 ProducerBatch 队尾的 Batch。发现 Batch 中剩余内存不足以塞下这条消息,则会创建新的 Batch;
当消息发送成功之后,Batch 会被释放掉。
那么创建 ProducerBatch 的时候,应该分配多少的内存呢?
先说结论:当消息预估内存大于 batch.size 的时候,则按照消息预估内存创建。否则按照 batch.size 的大小创建(默认16K)。
我们来看一段代码,这段代码就是在创建 ProducerBatch 的时候预估内存的大小。
RecordAccumulator#append
// 找到 batch.size 和这条消息在batch中的总内存大小的最大值int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));// 申请内存buffer = free.allocate(size, maxTimeToBlock);
DefaultRecordBatch#estimateBatchSizeUpperBound
预估需要的 Batch 大小是一个预估值,因为没有考虑压缩算法的额外开销。
/*** 使用给定的键和值获取只有一条记录的批次大小的上限。* 这只是一个估计值,因为它没有考虑使用的压缩算法的额外开销。**/static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);}
也就是说创建一个 ProducerBatch 最少就要 83B。
比如我发送一条消息 " 1 " , 预估得到的大小是 86B,跟 batch.size(默认 16384)相比取最大值。那么申请内存的时候取最大值 16384 。
我们都知道 RecordAccumulator 里面的缓存大小是一开始定义好的,由 buffer.memory 控制,默认 33554432(32M)。
当生产的速度大于发送速度的时候,就可能出现 Producer 写入阻塞。
而且频繁的创建和释放 ProducerBatch 会导致频繁 GC,所以 Kafka 中有个缓存池的概念。这个缓存池会被重复使用,但是只有固定(batch.size)的大小才能够使用缓存池。
注意:以下 16K 指的是 batch.size 的默认值。
1) 创建 Batch 的时候,会去缓存池中获取队首的一块内存 ByteBuffer 使用。
2) 消息发送完成释放 Batch,则会把这个 ByteBuffer 放到缓存池的队尾中并且调用ByteBuffer.clear 清空数据,以便下次重复使用。
1) 创建 Batch 的时候, 去非缓存池中的内存获取一部分内存用于创建 Batch。
注意:这里说的获取内存给 Batch,其实就是让非缓存池 nonPooledAvailableMemory 减少 16K 的内存,然后Batch正常创建就行了。不要误以为好像真的发生了内存的转移。
2) 消息发送完成,释放 Batch。则会把这个 ByteBuffer 放到缓存池的队尾,并且调用ByteBuffer.clear 清空数据以便下次重复使用。
1) 创建 Batch 的时候,去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建 Batch。
注意:这里说的获取内存给 Batch,其实就是让非缓存池(nonPooledAvailableMemory)减少对应的内存,然后 Batch 正常创建就行了。不要误以为好像真的发生了内存的转移。
2) 消息发送完成,释放Batch。纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的 Batch 内存大小。当然,这个 Batch 会被 GC 掉。
1) 先尝试将缓存池中的内存一个一个释放到非缓存池中,直到非缓存池中的内存够用来创建 Batch 了。
2) 创建 Batch 的时候,去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建 Batch。
注意:这里说的获取内存给 Batch,其实就是让非缓存池(nonPooledAvailableMemory)减少对应的内存,然后 Batch 正常创建就行了。不要误以为好像真的发生了内存的转移。
3) 消息发送完成,释放 Batch。纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的 Batch 内存大小。当然这个Batch会被 GC 掉。
例如,下面我们需要创建 48K 的 batch。因为超过了 16K,所以需要在非缓存池中分配内存。但是非缓存池中当前可用内存为 0 分配不了,这个时候就会尝试去缓存池里面释放一部分内存到非缓存池。
释放第一个 ByteBuffer(16K)不够则继续释放第二个,直到释放了 3 个之后总共 48K。发现内存这时候够了,再去创建 Batch。
注意:这里我们涉及到的非缓存池中的内存分配,仅仅指内存数字的增加和减少。
当 Broker 挂掉了,Producer 会提示下面的警告⚠️。但是,发送消息过程中
这个消息体还是可以写入到消息缓存中的。也仅仅是写到到缓存中而已。
WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available那么会创建新的 ProducerBatch。
触发创建 ProducerBatch 的那条消息预估大小大于 batch.size ,则以预估内存创建。否则以 batch.size 创建。
还有一个问题供大家思考:
当消息还存储在缓存中的时候,假如 Producer 客户端挂掉了,消息是不是就丢失了?
- EOF -
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️