ACK有三种
0
意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息
提供了最低的延迟。但是最弱的持久性,当服务器发生故障时,就很可能发生数据丢失。例如leader已经死亡,producer不知情,还会继续发送消息broker接收不到数据就会数据丢失
1
意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。此选项提供了较好的持久性较低的延迟性。
Partition的Leader死亡,follwer尚未复制,数据就会丢失
-1
意味着producer得到follwer确认,才发送下一条数据
原理:
一个topic可以分成多个partition,一个partition可以在跨broker的节点上存放多副本(Leader & Follower)
我们知道一般情况下,一个Broker发生了问题或正常关闭,zookeeper会及时发现并将Leader移至其他Broker节点。
每个partition不再只有一个,而是有一个leader(红色)和多个replica(蓝色),生产者根据消息的topic和key值,确定了消息要发往哪个partition之后(假设是p1),会找到partition对应的leader(也就是broker2里的p1),然后将消息发给leader,leader负责消息的写入,并与其余的replica进行同步。
ACK = 0 时 发送一次 不论leader是否接收
ACK = 1 时,等待leader接收成功即可
ACK = -1 时 ,需等待leader将消息同步给follower
Kafka消息传递语义(Message Delivery Semantics)
对于消息传递语义,一般存在三种类型语义:
1.At most once - 消息传递过程中有可能丢失,丢失的消息也不会重新传递,其实就是保证消息不会重复发送或者重复消费
2.At least once - 消息在传递的过程中不可能会丢失,丢失的消息会重新传递,其实就是保证消息不会丢失,但是消息有可能重复发送或者重新被消费
3.Exactly once - 这个是大多数场景需要的语义,其实就是保证消息不会丢失,也不会重复被消费,消息只传递一次
那么怎么确定我们的使用场景是上面的哪种语意呢?
1.消息发送时的语义
2.消息消费时的语义
消息发送时的语义
现在假如有这么个场景:
当Producer发送一条消息给Kafka集群中的某个topic时,这个时候网络出问题了,那么Kafka会怎么处理呢?
在0.11.0.0之前版本的Kafka,因为不知道这条消息是否发送成功,会选择再一次的重发这条消息。这就符合At least once语义了,因为遇到网络错误的那条消息可能发送成功了,这个时候如果再重新发送的话,就会导致一条消息被重复发送两次了
在0.11.0.0版本开始,Kafka的Producer开始支持了幂等发送消息了,其实说白了,就是重复发送一条消息不会导致broker server中存储两条一样的消息了。这样的话Kafka在消息的发送的语义就可以达到Exactly once了
为了实现幂等发送消息,在0.11.0.0版本,Kafka给每一个Producer一个唯一的ID,以及给发送的每一条消息一个唯一的sequence number,利用这两个信息就可以在broker server段对消息进行去重了
消息消费时的语义
消费者消费消息的时候是根据topic + partition + offset进行消费的,当消费者进程一切正常的时候,这个消费之消费到的offset的信息可以存放在消费者进程中的内存中
但是,假如有这么一个场景:当一个消费者在消费某个topic的某个partition的时候,突然间这个消费者进程挂了,这个时候我们会再次启动这个消费者进程,我们怎么保证继续从挂了之前消费到的offset开始消费消息呢?
可以有两种方法来解决上的问题:
1.消费者可以读取消息,然后将offset保存在持久化系统中,然后消息进行处理。那么消费者进程有可能在保存offset之后,保存处理消息的结果之前挂掉了,在这种情况下,当消费者进程重新启动后,就会从已经存储的offset开始消费消息并处理消息了,挂掉之前的消费者进程处理消息的结果可能还没来得及保存,所以在这种情况下,有一些消息就可能没有得到处理了,那么,这种场景是符合At most once语义的,因为当消费者进程挂了之后,有些消息可能得不到处理
2.消费者可以读取消息,然后处理消息,将处理之后的结果进行保存,最后再保存消费到的消息offset了。那么消费者进程有可能在保存消息处理结果之后,保存消费到的消息offset之前挂了,这种情况下,当消费者进程重新启动的时候,则从上一次保存的消费的消息的offset开始处理消息了,这个时候就可能会导致重复处理消息了,因为在挂掉之前可能有部分数据处理成功了。那么这种场景符合At least once语义了,因为没有漏掉处理消息,但是有可能重复处理消息了
Kafka默认可以保证At least once的语义,也就是上面的第二种场景了。
如果你想实现At most once语义的话,那么你需要做的是:
1.首先要禁用掉Producer端的retry机制(消息重发),即将Producer的配置retries置为0(默认就是0)
2.在消费者进程中,先将消费到的消息的offset先保存在持久化系统中,然后再开始处理消息,并保存处理结果
那么在Kafka中怎么做到消费者消费消息时的Exactly once的语义呢?针对不同的场景,有下面三种方法可以做到:
A.可以在At least once语义的基础之上,在消费者进程中保存处理结果的时候采用幂等的保存方法,举个例子:比如将结果保存到MySQL的某张表中,这张表有一个字段是唯一的(你可以认为这个字段就是这张表的主键),那么在消费者进程保存处理结果的时候,如果这个唯一的字段的值已经存在了,则不会再保存这个结果了,这样就保证保存的结果不会重复了
B.如果我们协调好消费者消费的消息offset的保存和处理消息结果的保存之间的关系就可以达到Exactly once的语义,就是说,当消费的消息offset的保存成功以及处理消息结果的保存成功,则算成功,如果两者有一个失败的话,那么就需要回滚两个保存了(和事务有点像),我们可以用下面三种方法来协调这两者之间的关系:
a.two-phase commit 参考:https://www.cnblogs.com/sunddenly/articles/4072882.html
b.使用Kafka的事务,参考我的另一个文章:https://blog.csdn.net/weixin_42411818/article/details/99676962
c.将offset和处理结果保存在同一个地方,这样的话,要么都保存失败,要么都保存成功,这种方式是比较简单的,将Kafka中的数据导入到HDFS的Kafka Connector就是这样来实现的,参考:https://github.com/confluentinc/kafka-connect-hdfs/blob/d8f98e3cca7509d80d40982f6a962b90f183a6c3/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java#L99
做个总结
1.Kafka默认的语义是At least once,消费者进程消费消息并处理完后,会保存到Kafka的Broker Server上
2.如果你想实现At most once语义的话,那么你需要做:禁用掉Producer的消息重发机制以及先于保存处理结果,然后将offset保存到持久化系统中
3.如果我们协调好消费者消费的消息offset的保存和处理消息结果的保存之间的关系就可以达到Exactly once的语义
ack机制
在Producer向Kafka集群发送一条消息的时候,如果消息发送失败的话,并且我们设置了retries属性的话,Producer有重新发送消息的机制,这种机制主要是提高消息的发送成功率
如果我们想Producer发送的消息一定能发送成功,不会丢失的话,除了上面的消息重发机制,还有就是ack机制。
一个topic可以分成多个partition,每一个partition可以有多个副本,在多个副本中有一个Leader副本和若干个Follower副本,Leader副本提供这个分区消息的读写服务
如果Producer发送了一条消息,这个消息如果已经发送并存储在Leader副本中的日志文件中,那其实就可以说明这条消息已经发送成功了,但是如果这个Leader副本可能会挂掉,所以数据只存储在Leader副本也不能保证数据不会丢,所以有的时候Producer需要等到发送的消息都存储到ISR列表中的备份的时候才算成功,这个时候消息就不容易丢失了
根据上面的特点,Producer在发送消息的时候就可以指定acks的值了,acks值可以是[ 0, 1, -1, all],如果是flume是没有all的
1.当acks=0的时候,Producer发送消息到发送端的buffer中就直接返回了,至于这个消息有没有真的发送到Broker Server,Producer不关心,即使消息发送失败,上面说的retry机制也不起作用,所以在这种场景下,可能就会丢失消息了(类似UDP,只管发,不管对方有没有接收到消息)
2.当acks=1的时候,Producer发送的消息一定要存储到对应的分区的Leader副本日志文件中才算消息发送成功,要是失败的话,则会尝试retry。在这种场景下,只有当消息已经存储在Leader副本中,但是消息还没有被Follower副本同步的时候,如果Leader副本所在的broker server挂了,消息才会丢失
3.当acks=-1或者等于all的时候,Producer发送的消息一定要存储到对应的分区的所有的在ISR列表中的副本日志文件中才算消息发送成功,要是失败的话,则会尝试retry。这种场景下消息就很难丢失了,除非所有的副本所在的Broker Server都挂了
备注:
acks是英文单词acknowledgments的缩写,可以表示响应,即消息发送端发送消息给Kafka集群得到的响应,比如acks=all的时候,只有所有的ISR副本存储好消息之后才给客户端一个响应,表示消息存储成功
acks上面的三种取值可以根据你的业务需求来设置,消息的持久性是越来越强的,但是性能肯定越来越差的,上面的三种取值就是在消息持久性和性能两个方面做一个权衡(就像UDP和TCP,前者不管收不收到,只管发,这种性能肯定高,而TCP需要经过三次握手和四次挥手,以保证数据的传输,因此其性能肯定会比UDP差,所以,这个配置还是得看自己的业务场景)