https://github.com/Shopify/sarama
Kafka 消费的过程似乎大致上可以分为 ConsumerGroup 的 Rebalance 协议和 Partition 消费协议两部分
每个 ConsumerGroup 有五种状态:
Empty:无任何活跃 Consumer 存在;
Stable:已完成 Rebalance,可供稳定消费;
PreparingRebalance:情况发生变化,有新成员加入或旧成员心跳丢失,需要重新 Balance,要求所有成员重新加入 Group;
CompletingRebalance:所有成员均已入组,各成员等待分配计划;
Dead:Group 生命周期结束,可能因为 session 过期,或者 Group 迁移到其他 Group Coordinator;
ConsumerGroup 的这几个状态大致上都是服务于 Rebalance 流程的:
一个新的 Consumer 想要加入 ConsumerGroup,发起 JoinGroup 请求;
Kafka 将 ConsumerGroup 设置为 PreparingRebalancing 状态,递增 ConsumerGroup 的 generation 值,通过心跳的响应通知现所有成员退出,等待它们重新加入;
选举其中一位成员成为 ConsumerGroup 的 leader;
为所有的成员发送 Join 成功的元信息;
Follower 向 Coordinator 发送同步请求申请同步 Partition 分配关系;
Leader 按 Balance 策略生成 Partition 的分配关系,向 Coordinator 发送 SyncGroup 请求发送分配关系;
Coordinator 回复所有成员各自的 Partition 分配关系;
二阶段 Balance
为什么还要在这里进行一次 Group Leader 的选举呢?
《Kafka Client-side Assignment Proposal》这篇讲到过去的 Kafka 直接通过 Coordinator 服务端来直接管理 Partition 与 Consumer 的分配,客户端向 Coordinator 发送 JoinGroup 请求即可拿到自己分配得到的 Partition 列表。这一机制足够简单,但有一些场景会对分配策略有特殊的需求,需要自定义分配策略时服务端分配便不够灵活,比如:
Co-partitioning:比如对两个 Topic 做 join,需要将两个 Topic 按相映射的 Partition 来分配给同一个 Consumer;
Sticky Partitioning:对于有状态的 Consumer,希望重启后仍能恢复原有的 Partition 关系而不要 rebalance;
Redundant partitioning:比如搜索引擎需要建多份冗余的索引,希望能使单个 Partition 被多个 Consumer 所消费;
Metadata-based assignment:比如让 Consumer Group 做到 rack aware,消费来自本机架的 Partition;
为了支持这类高级的分配策略甚至未知的分配策略,Kafka 将分配策略下放给了客户端。使新版的协议增加了一个 Group Leader 的选举环节,由作为 Group Leader 的提供 Balance 策略同步给 Coordinator,再使 Coordinator 仲裁过的 Claim 列表同步给其他客户端。
分配 Partition 和同步分配结果两个阶段需所有成员参与协调,对应 PreparingRebalance 和 CompletingBalance 两个状态。
https://zhuanlan.zhihu.com/p/109574627
https://zhuanlan.zhihu.com/p/110114004
重要的生产者参数
在这里我打算介绍一部分我个人认为比较重要的生产者参数。
MaxMessageBytes int
复制代码
这个参数影响了一条消息的最大字节数,默认是1000000。但是注意,这个参数必须要小于broker中的 message.max.bytes。
RequiredAcks RequiredAcks
复制代码
这个参数影响了消息需要被多少broker写入之后才返回。取值可以是0、1、-1,分别代表了不需要等待broker确认才返回、需要分区的leader确认后才返回、以及需要分区的所有副本确认后返回。
Partitioner PartitionerConstructor
复制代码
这个是分区器。Sarama默认提供了几种分区器,如果不指定默认使用Hash分区器。
Retry
复制代码
这个参数代表了重试的次数,以及重试的时间,主要发生在一些可重试的错误中。
Flush
复制代码
用于设置将消息打包发送,简单来讲就是每次发送消息到broker的时候,不是生产一条消息就发送一条消息,而是等消息累积到一定的程度了,再打包发送。所以里面含有两个参数。一个是多少条消息触发打包发送,一个是累计的消息大小到了多少,然后发送。
2.4 幂等生产者
在聊幂等生产者之前,我们先来看看生产者中另外一个很重要的参数:
MaxOpenRequests int
复制代码
这个参数代表了允许没有收到acks而可以同时发送的最大batch数。
Idempotent bool
复制代码
用于幂等生产者,当这一项设置为true的时候,生产者将保证生产的消息一定是有序且精确一次的。
为什么会需要这个选项呢?
当MaxOpenRequests这个参数配置大于1的时候,代表了允许有多个请求发送了还没有收到回应。假设此时的重试次数也设置为了大于1,当同时发送了2个请求,如果第一个请求发送到broker中,broker写入失败了,但是第二个请求写入成功了,那么客户端将重新发送第一个消息的请求,这个时候会造成乱序。
又比如当第一个请求返回acks的时候,因为网络原因,客户端没有收到,所以客户端进行了重发,这个时候就会造成消息的重复。
所以,幂等生产者就是为了保证消息发送到broker中是有序且不重复的。
消息的有序可以通过MaxOpenRequests设置为1来保证,这个时候每个消息必须收到了acks才能发送下一条,所以一定是有序的,但是不能够保证不重复。
而且当MaxOpenRequests设置为1的时候,吞吐量不高。
注意,当启动幂等生产者的时候,Retry次数必须要大于0,ack必须为all。
在Java客户端中,允许MaxOpenRequests小于等于5。
https://juejin.cn/post/6866316565348876296
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
https://chrzaszcz.dev/2019/06/kafka-rebalancing/
https://www.lixueduan.com/post/kafka/06-sarama-producer/