Exactly Once语义与事务机制

实现Exactly Once的一种方法是让下游系统具有幂等处理特性,而在Kafka Stream中,Kafka Producer本身就是“下游”系统,因此如果能让Producer具有幂等处理特性,那就可以让Kafka Stream在一定程度上支持Exactly once语义。

为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。



对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。



类似地,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:



如果消息序号比Broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
上述设计解决了0.11.0.0之前版本中的两个问题:



Broker保存消息后,发送ACK前宕机,Producer认为消息未发送成功并重试,造成数据重复
前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序
事务性保证
上述幂等设计只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。



另外,它并不能保证写操作的原子性——即多个写操作,要么全部被Commit要么全部不被Commit。



更不能保证多个读写操作的的原子性。尤其对于Kafka Stream应用而言,典型的操作即是从某个Topic消费数据,经过一系列转换后写回另一个Topic,保证从源Topic的读取与向目标Topic的写入的原子性有助于从故障中恢复。



事务保证可使得应用程序将生产数据和消费数据当作一个原子单元来处理,要么全部成功,要么全部失败,即使该生产或消费跨多个<Topic, Partition>。



另外,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。



为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。



另外,为了保证新的Producer启动后,旧的具有相同Transaction ID的Producer即失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。



有了Transaction ID后,Kafka可保证:



跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作。
跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。
需要注意的是,上述的事务保证是从Producer的角度去考虑的。从Consumer的角度来看,该保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:



对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖
事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失
Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息
事务机制原理
事务性消息传递
这一节所说的事务主要指原子性,也即Producer将多条消息作为一个事务批量发送,要么全部成功要么全部失败。



为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为Transaction Coordinator,用于管理Producer发送的消息的事务性。



该Transaction Coordinator维护Transaction Log,该log存于一个内部的Topic内。由于Topic数据具有持久性,因此事务的状态也具有持久性。



Producer并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。



Transaction Log的设计与Offset Log用于保存Consumer的Offset类似。



事务中Offset的提交
许多基于Kafka的应用,尤其是Kafka Stream应用中同时包含Consumer和Producer,前者负责从Kafka中获取消息,后者负责将处理完的数据写回Kafka的其它Topic中。



为了实现该场景下的事务的原子性,Kafka需要保证对Consumer Offset的Commit与Producer对发送消息的Commit包含在同一个事务中。否则,如果在二者Commit中间发生异常,根据二者Commit的顺序可能会造成数据丢失和数据重复:



如果先Commit Producer发送数据的事务再Commit Consumer的Offset,即At Least Once语义,可能造成数据重复。
如果先Commit Consumer的Offset,再Commit Producer数据发送事务,即At Most Once语义,可能造成数据丢失。
用于事务特性的控制型消息
为了区分写入Partition的消息被Commit还是Abort,Kafka引入了一种特殊类型的消息,即Control Message。该类消息的Value内不包含任何应用相关的数据,并且不会暴露给应用程序。它只用于Broker与Client间的内部通信。



对于Producer端事务,Kafka以Control Message的形式引入一系列的Transaction Marker。Consumer即可通过该标记判定对应的消息被Commit了还是Abort了,然后结合该Consumer配置的隔离级别决定是否应该将该消息返回给应用程序。



事务处理样例代码
Producer<String, String> producer = new KafkaProducer<String, String>(props);



// 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
// 保证新的事务在一个正确的状态下启动
producer.initTransactions();



// 开始事务
producer.beginTransaction();



// 消费数据
ConsumerRecords<String, String> records = consumer.poll(100);



try{
// 发送数据
producer.send(new ProducerRecord<String, String>(“Topic”, “Key”, “Value”));



// 发送消费数据的Offset,将上述数据消费与数据发送纳入同一个Transaction内
producer.sendOffsetsToTransaction(offsets, "group1");

// 数据发送及Offset发送均成功的情况下,提交事务
producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 数据发送或者Offset发送出现异常时,终止事务
producer.abortTransaction(); } finally {
// 关闭Producer和Consumer
producer.close();
consumer.close(); }


找到Transaction Coordinator
由于Transaction Coordinator是分配PID和管理事务的核心,因此Producer要做的第一件事情就是通过向任意一个Broker发送FindCoordinator请求找到Transaction Coordinator的位置。



注意:只有应用程序为Producer配置了Transaction ID时才可使用事务特性,也才需要这一步。另外,由于事务性要求Producer开启幂等特性,因此通过将transactional.id设置为非空从而开启事务特性的同时也需要通过将enable.idempotence设置为true来开启幂等特性。



获取PID
找到Transaction Coordinator后,具有幂等特性的Producer必须发起InitPidRequest请求以获取PID。



注意:只要开启了幂等特性即必须执行该操作,而无须考虑该Producer是否开启了事务特性。



如果事务特性被开启
InitPidRequest会发送给Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有该Transaction ID的InitPidRequest请求,它将会把该<TransactionID, PID>存入Transaction Log,如上图中步骤2.1所示。这样可保证该对应关系被持久化,从而保证即使Transaction Coordinator宕机该对应关系也不会丢失。



除了返回PID外,InitPidRequest还会执行如下任务:



增加该PID对应的epoch。具有相同PID但epoch小于该epoch的其它Producer(如果有)新开启的事务将被拒绝。
恢复(Commit或Abort)之前的Producer未完成的事务(如果有)。
注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,Producer即可开始新的事务。



另外,如果事务特性未开启,InitPidRequest可发送至任意Broker,并且会得到一个全新的唯一的PID。该Producer将只能使用幂等特性以及单一Session内的事务特性,而不能使用跨Session的事务特性。



开启事务
Kafka从0.11.0.0版本开始,提供beginTransaction()方法用于开启一个事务。调用该方法后,Producer本地会记录已经开启了事务,但Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。



Consume-Transform-Produce
这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。



AddPartitionsToTxnRequest
一个Producer可能会给多个<Topic, Partition>发送数据,给一个新的<Topic, Partition>发送数据前,它需要先向Transaction Coordinator发送AddPartitionsToTxnRequest。



Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为BEGIN,如上图中步骤4.1所示。有了该信息后,我们才可以在后续步骤中为每个Topic, Partition>设置COMMIT或者ABORT标记(如上图中步骤5.2所示)。



另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。



ProduceRequest
Producer通过一个或多个ProduceRequest发送一系列消息。除了应用数据外,该请求还包含了PID,epoch,和Sequence Number。该过程如上图中步骤4.2所示。



AddOffsetsToTxnRequest
为了提供事务性,Producer新增了sendOffsetsToTransaction方法,该方法将多组消息的发送和消费放入同一批处理内。



该方法先判断在当前事务中该方法是否已经被调用并传入了相同的Group ID。若是,直接跳到下一步;若不是,则向Transaction Coordinator发送AddOffsetsToTxnRequests请求,Transaction Coordinator将对应的所有<Topic, Partition>存于Transaction Log中,并将其状态记为BEGIN,如上图中步骤4.3所示。该方法会阻塞直到收到响应。



TxnOffsetCommitRequest
作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets中,如上图步骤4.4所示。



在此过程中,Consumer Coordinator会通过PID和对应的epoch来验证是否应该允许该Producer的该请求。



这里需要注意:



写入__consumer_offsets的Offset信息在当前事务Commit前对外是不可见的。也即在当前事务被Commit前,可认为该Offset尚未Commit,也即对应的消息尚未被完成处理。
Consumer Coordinator并不会立即更新缓存中相应<Topic, Partition>的Offset,因为此时这些更新操作尚未被COMMIT或ABORT。
Commit或Abort事务
一旦上述数据写入操作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。



EndTxnRequest
commitTransaction方法使得Producer写入的数据对下游Consumer可见。abortTransaction方法通过Transaction Marker将Producer写入的数据标记为Aborted状态。下游的Consumer如果将isolation.level设置为READ_COMMITTED,则它读到被Abort的消息后直接将其丢弃而不会返回给客户程序,也即被Abort的消息对应用程序不可见。



无论是Commit还是Abort,Producer都会发送EndTxnRequest请求给Transaction Coordinator,并通过标志位标识是应该Commit还是Abort。



收到该请求后,Transaction Coordinator会进行如下操作



将PREPARE_COMMIT或PREPARE_ABORT消息写入Transaction Log,如上图中步骤5.1所示
通过WriteTxnMarker请求以Transaction Marker的形式将COMMIT或ABORT信息写入用户数据日志以及Offset Log中,如上图中步骤5.2所示
最后将COMPLETE_COMMIT或COMPLETE_ABORT信息写入Transaction Log中,如上图中步骤5.3所示
补充说明:对于commitTransaction方法,它会在发送EndTxnRequest之前先调用flush方法以确保所有发送出去的数据都得到相应的ACK。对于abortTransaction方法,在发送EndTxnRequest之前直接将当前Buffer中的事务性消息(如果有)全部丢弃,但必须等待所有被发送但尚未收到ACK的消息发送完成。



上述第二步是实现将一组读操作与写操作作为一个事务处理的关键。因为Producer写入的数据Topic以及记录Comsumer Offset的Topic会被写入相同的Transactin Marker,所以这一组读操作与写操作要么全部COMMIT要么全部ABORT。



WriteTxnMarkerRequest
上面提到的WriteTxnMarkerRequest由Transaction Coordinator发送给当前事务涉及到的每个<Topic, Partition>的Leader。收到该请求后,对应的Leader会将对应的COMMIT(PID)或者ABORT(PID)控制信息写入日志,如上图中步骤5.2所示。



该控制消息向Broker以及Consumer表明对应PID的消息被Commit了还是被Abort了。



这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息。



写入最终的COMPLETE_COMMIT或COMPLETE_ABORT消息
写完所有的Transaction Marker后,Transaction Coordinator会将最终的COMPLETE_COMMIT或COMPLETE_ABORT消息写入Transaction Log中以标明该事务结束,如上图中步骤5.3所示。



此时,Transaction Log中所有关于该事务的消息全部可以移除。当然,由于Kafka内数据是Append Only的,不可直接更新和删除,这里说的移除只是将其标记为null从而在Log Compact时不再保留。



另外,COMPLETE_COMMIT或COMPLETE_ABORT的写入并不需要得到所有Rreplica的ACK,因为如果该消息丢失,可以根据事务协议重发。



补充说明,如果参与该事务的某些<Topic, Partition>在被写入Transaction Marker前不可用,它对READ_COMMITTED的Consumer不可见,但不影响其它可用<Topic, Partition>的COMMIT或ABORT。在该<Topic, Partition>恢复可用后,Transaction Coordinator会重新根据PREPARE_COMMIT或PREPARE_ABORT向该<Topic, Partition>发送Transaction Marker。



总结
PID与Sequence Number的引入实现了写操作的幂等性
写操作的幂等性结合At Least Once语义实现了单一Session内的Exactly Once语义
Transaction Marker与PID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性
Offset的更新标记了消息是否被读取,从而将对读操作的事务处理转换成了对写(Offset)操作的事务处理
Kafka事务的本质是,将一组写操作(如果有)对应的消息与一组读操作(如果有)对应的Offset的更新进行同样的标记(即Transaction Marker)来实现事务中涉及的所有读写操作同时对外可见或同时对外不可见
Kafka只提供对Kafka本身的读写操作的事务性,不提供包含外部系统的事务性
异常处理
Exception处理
InvalidProducerEpoch
这是一种Fatal Error,它说明当前Producer是一个过期的实例,有Transaction ID相同但epoch更新的Producer实例被创建并使用。此时Producer会停止并抛出Exception。



InvalidPidMapping
Transaction Coordinator没有与该Transaction ID对应的PID。此时Producer会通过包含有Transaction ID的InitPidRequest请求创建一个新的PID。



NotCorrdinatorForGTransactionalId
该Transaction Coordinator不负责该当前事务。Producer会通过FindCoordinatorRequest请求重新寻找对应的Transaction Coordinator。



InvalidTxnRequest
违反了事务协议。正确的Client实现不应该出现这种Exception。如果该异常发生了,用户需要检查自己的客户端实现是否有问题。



CoordinatorNotAvailable
Transaction Coordinator仍在初始化中。Producer只需要重试即可。



DuplicateSequenceNumber
发送的消息的序号低于Broker预期。该异常说明该消息已经被成功处理过,Producer可以直接忽略该异常并处理下一条消息



InvalidSequenceNumber
这是一个Fatal Error,它说明发送的消息中的序号大于Broker预期。此时有两种可能



数据乱序。比如前面的消息发送失败后重试期间,新的消息被接收。正常情况下不应该出现该问题,因为当幂等发送启用时,max.inflight.requests.per.connection被强制设置为1,而acks被强制设置为all。故前面消息重试期间,后续消息不会被发送,也即不会发生乱序。并且只有ISR中所有Replica都ACK,Producer才会认为消息已经被发送,也即不存在Broker端数据丢失问题。
服务器由于日志被Truncate而造成数据丢失。此时应该停止Producer并将此Fatal Error报告给用户。
InvalidTransactionTimeout
InitPidRequest调用出现的Fatal Error。它表明Producer传入的timeout时间不在可接受范围内,应该停止Producer并报告给用户。



处理Transaction Coordinator失败
写PREPARE_COMMIT/PREPARE_ABORT前失败
Producer通过FindCoordinatorRequest找到新的Transaction Coordinator,并通过EndTxnRequest请求发起COMMIT或ABORT流程,新的Transaction Coordinator继续处理EndTxnRequest请求——写PREPARE_COMMIT或PREPARE_ABORT,写Transaction Marker,写COMPLETE_COMMIT或COMPLETE_ABORT。



写完PREPARE_COMMIT/PREPARE_ABORT后失败
此时旧的Transaction Coordinator可能已经成功写入部分Transaction Marker。新的Transaction Coordinator会重复这些操作,所以部分Partition中可能会存在重复的COMMIT或ABORT,但只要该Producer在此期间没有发起新的事务,这些重复的Transaction Marker就不是问题。



写完COMPLETE_COMMIT/ABORT后失败
旧的Transaction Coordinator可能已经写完了COMPLETE_COMMIT或COMPLETE_ABORT但在返回EndTxnRequest之前失败。该场景下,新的Transaction Coordinator会直接给Producer返回成功。



事务过期机制
事务超时
transaction.timeout.ms



终止过期事务
当Producer失败时,Transaction Coordinator必须能够主动的让某些进行中的事务过期。否则没有Producer的参与,Transaction Coordinator无法判断这些事务应该如何处理,这会造成:



如果这种进行中事务太多,会造成Transaction Coordinator需要维护大量的事务状态,大量占用内存
Transaction Log内也会存在大量数据,造成新的Transaction Coordinator启动缓慢
READ_COMMITTED的Consumer需要缓存大量的消息,造成不必要的内存浪费甚至是OOM
如果多个Transaction ID不同的Producer交叉写同一个Partition,当一个Producer的事务状态不更新时,READ_COMMITTED的Consumer为了保证顺序消费而被阻塞
为了避免上述问题,Transaction Coordinator会周期性遍历内存中的事务状态Map,并执行如下操作



如果状态是BEGIN并且其最后更新时间与当前时间差大于transaction.remove.expired.transaction.cleanup.interval.ms(默认值为1小时),则主动将其终止:1)未避免原Producer临时恢复与当前终止流程冲突,增加该Producer对应的PID的epoch,并确保将该更新的信息写入Transaction Log;2)以更新后的epoch回滚事务,从而使得该事务相关的所有Broker都更新其缓存的该PID的epoch从而拒绝旧Producer的写操作
如果状态是PREPARE_COMMIT,完成后续的COMMIT流程————向各<Topic, Partition>写入Transaction Marker,在Transaction Log内写入COMPLETE_COMMIT
如果状态是PREPARE_ABORT,完成后续ABORT流程
终止Transaction ID
某Transaction ID的Producer可能很长时间不再发送数据,Transaction Coordinator没必要再保存该Transaction ID与PID等的映射,否则可能会造成大量的资源浪费。因此需要有一个机制探测不再活跃的Transaction ID并将其信息删除。



Transaction Coordinator会周期性遍历内存中的Transaction ID与PID映射,如果某Transaction ID没有对应的正在进行中的事务并且它对应的最后一个事务的结束时间与当前时间差大于transactional.id.expiration.ms(默认值是7天),则将其从内存中删除并在Transaction Log中将其对应的日志的值设置为null从而使得Log Compact可将其记录删除。



与其它系统事务机制对比
PostgreSQL MVCC
Kafka的事务机制与《MVCC PostgreSQL实现事务和多版本并发控制的精华》一文中介绍的PostgreSQL通过MVCC实现事务的机制非常类似,对于事务的回滚,并不需要删除已写入的数据,都是将写入数据的事务标记为Rollback/Abort从而在读数据时过滤该数据。



两阶段提交
Kafka的事务机制与《分布式事务(一)两阶段提交及JTA》一文中所介绍的两阶段提交机制看似相似,都分PREPARE阶段和最终COMMIT阶段,但又有很大不同。



Kafka事务机制中,PREPARE时即要指明是PREPARE_COMMIT还是PREPARE_ABORT,并且只须在Transaction Log中标记即可,无须其它组件参与。而两阶段提交的PREPARE需要发送给所有的分布式事务参与方,并且事务参与方需要尽可能准备好,并根据准备情况返回Prepared或Non-Prepared状态给事务管理器。
Kafka事务中,一但发起PREPARE_COMMIT或PREPARE_ABORT,则确定该事务最终的结果应该是被COMMIT或ABORT。而分布式事务中,PREPARE后由各事务参与方返回状态,只有所有参与方均返回Prepared状态才会真正执行COMMIT,否则执行ROLLBACK
Kafka事务机制中,某几个Partition在COMMIT或ABORT过程中变为不可用,只影响该Partition不影响其它Partition。两阶段提交中,若唯一收到COMMIT命令参与者Crash,其它事务参与方无法判断事务状态从而使得整个事务阻塞
Kafka事务机制引入事务超时机制,有效避免了挂起的事务影响其它事务的问题
Kafka事务机制中存在多个Transaction Coordinator实例,而分布式事务中只有一个事务管理器
Zookeeper
Zookeeper的原子广播协议与两阶段提交以及Kafka事务机制有相似之处,但又有各自的特点



Kafka事务可COMMIT也可ABORT。而Zookeeper原子广播协议只有COMMIT没有ABORT。当然,Zookeeper不COMMIT某消息也即等效于ABORT该消息的更新。
Kafka存在多个Transaction Coordinator实例,扩展性较好。而Zookeeper写操作只能在Leader节点进行,所以其写性能远低于读性能。
Kafka事务是COMMIT还是ABORT完全取决于Producer即客户端。而Zookeeper原子广播协议中某条消息是否被COMMIT取决于是否有一大半FOLLOWER ACK该消息。



创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
如果partition没填,那么情况会是这样的:



key有填
按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)
key没填
round-robin来选partition
这些要发往同一个partition的请求按照配置,攒一波,然后由一个单独的线程一次性发过去。



API
有high level api,替我们把很多事情都干了,offset,路由啥都替我们干了,用以来很简单。
还有simple api,offset啥的都是要我们自己记录。



partition
当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower。当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)



然后这里就涉及两个细节:怎么分配partition,怎么选leader。



关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。



partition的分配
将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
leader容灾
controller会在Zookeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leader。controller从zk的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应partition的ISR(in-sync replica已同步的副本)列表,选一个出来做leader。
选出leader后,更新zk,然后发送LeaderAndISRRequest给受影响的broker,让它们改变知道这事。为什么这里不是使用zk通知,而是直接给broker发送rpc请求,我的理解可能是这样做zk有性能问题吧。



如果ISR列表是空,那么会根据配置,随便选一个replica做leader,或者干脆这个partition就是歇菜。如果ISR列表的有机器,但是也歇菜了,那么还可以等ISR的机器活过来。



多副本同步
这里的策略,服务端这边的处理是follower从leader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。
生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。



acks what happen
0 which means that the producer never waits for an acknowledgement from the broker.发过去就完事了,不关心broker是否处理成功,可能丢数据。
1 which means that the producer gets an acknowledgement after the leader replica has received the data. 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。
-1 which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里所有机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。
在acks=-1的时候,如果ISR少于min.insync.replicas指定的数目,那么就会返回不可用。



这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。以前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。



从ISA中选出leader后,follower会从把自己日志中上一个高水位后面的记录去掉,然后去和leader拿新的数据。因为新的leader选出来后,follower上面的数据,可能比新leader多,所以要截取。这里高水位的意思,对于partition和leader,就是所有ISR中都有的最新一条记录。消费者最多只能读到高水位;



从leader的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一轮的fetch中才能告诉leader。



订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。



offset的保存
一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。
在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。



确定consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式:



__consumers_offsets partition =
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
思考:
如果正在跑的服务,修改了offsets.topic.num.partitions,那么offset的保存是不是就乱套了?



分配partition–reblance
生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。
下面从顶向下,分别阐述一下



怎么选coordinator。
交互流程。
reblance的流程。
选coordinator
看offset保存在那个partition
该partition leader所在的broker就是被选定的coordinator
这里我们可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一台机器。



交互流程
把coordinator选出来之后,就是要分配了
整个流程是这样的:



consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。
consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行。
reblance流程
consumer给coordinator发送JoinGroupRequest请求。
这时其他consumer发heartbeat请求过来时,coordinator会告诉他们,要reblance了。
其他consumer发送JoinGroupRequest请求。
所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition
5、consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。
6、coordinator回包,把分配的情况告诉consumer,包括leader。



当partition或者消费者的数量发生变化时,都得进行reblance。
列举一下会reblance的情况:



增加partition
增加消费者
消费者主动关闭
消费者宕机了
coordinator自己也宕机了
消息投递语义
kafka支持3种消息投递语义
At most once:最多一次,消息可能会丢失,但不会重复
At least once:最少一次,消息不会丢失,可能会重复
Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)



在业务中,常常都是使用At least once的模型,如果需要可重入的话,往往是业务自己实现。



At least once
先获取数据,再进行业务处理,业务处理成功后commit offset。
1、生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息
2、消费者处理消息,业务处理成功后,更新offset失败,消费者重启的话,会重复消费



At most once
先获取数据,再commit offset,最后进行业务处理。
1、生产者生产消息异常,不管,生产下一个消息,消息就丢了
2、消费者处理消息,先更新offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了



Exactly once
思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着At least once的原因来搞。 首先想出来的:



生产者重做导致重复写入消息—-生产保证幂等性
消费者重复消费—消灭重复消费,或者业务接口保证幂等性重复消费也没问题
由于业务接口是否幂等,不是kafka能保证的,所以kafka这里提供的exactly once是有限制的,消费者的下游也必须是kafka。所以一下讨论的,没特殊说明,消费者的下游系统都是kafka(注:使用kafka conector,它对部分系统做了适配,实现了exactly once)。



生产者幂等性好做,没啥问题。



解决重复消费有两个方法:



下游系统保证幂等性,重复消费也不会导致多条记录。
把commit offset和业务处理绑定成一个事务。
本来exactly once实现第1点就ok了。



但是在一些使用场景下,我们的数据源可能是多个topic,处理后输出到多个topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。既然要做事务,那么干脆把重复消费的问题从根源上解决,把commit offset和输出到其他topic绑定成一个事务。



生产幂等性
思路是这样的,为每个producer分配一个pid,作为该producer的唯一标识。producer会为每一个<topic,partition>维护一个单调递增的seq。类似的,broker也会为每个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。因为:



消息的seq比broker的seq大超过时,说明中间有数据还没写入,即乱序了。
消息的seq不比broker的seq小,那么说明该消息已被保存。



解决重复生产
事务性/原子性广播
场景是这样的:



先从多个源topic中获取数据。
做业务处理,写到下游的多个目的topic。
更新多个源topic的offset。
其中第2、3点作为一个事务,要么全成功,要么全失败。这里得益与offset实际上是用特殊的topic去保存,这两点都归一为写多个topic的事务性处理。



基本思路是这样的:
引入tid(transaction id),和pid不同,这个id是应用程序提供的,用于标识事务,和producer是谁并没关系。就是任何producer都可以使用这个tid去做事务,这样进行到一半就死掉的事务,可以由另一个producer去恢复。
同时为了记录事务的状态,类似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每个tid对应唯一一个transaction coordinator。
注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。



做事务时,先标记开启事务,写入数据,全部成功就在transaction log中记录为prepare commit状态,否则写入prepare abort的状态。之后再去给每个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message可以被读取或已经废弃。成功后在transaction log记录下commit/abort状态,至此事务结束。



数据流:



Kafka Transactions Data Flow.png
首先使用tid请求任意一个broker(代码中写的是负载最小的broker),找到对应的transaction coordinator。



请求transaction coordinator获取到对应的pid,和pid对应的epoch,这个epoch用于防止僵死进程复活导致消息错乱,当消息的epoch比当前维护的epoch小时,拒绝掉。tid和pid有一一对应的关系,这样对于同一个tid会返回相同的pid。



client先请求transaction coordinator记录<topic,partition>的事务状态,初始状态是BEGIN,如果是该事务中第一个到达的<topic,partition>,同时会对事务进行计时;client输出数据到相关的partition中;client再请求transaction coordinator记录offset的<topic,partition>事务状态;client发送offset commit到对应offset partition。
client发送commit请求,transaction coordinator记录prepare commit/abort,然后发送marker给相关的partition。全部成功后,记录commit/abort的状态,最后这个记录不需要等待其他replica的ack,因为prepare不丢就能保证最终的正确性了。
这里prepare的状态主要是用于事务恢复,例如给相关的partition发送控制消息,没发完就宕机了,备机起来后,producer发送请求获取pid时,会把未完成的事务接着完成。



当partition中写入commit的marker后,相关的消息就可被读取。所以kafka事务在prepare commit到commit这个时间段内,消息是逐渐可见的,而不是同一时刻可见。



详细细节可看:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees



消费事务
前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。
消费时,partition中会存在一些消息处于未commit状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,kafka选择在消费者进程中进行过来,而不是在broker中过滤,主要考虑的还是性能。kafka高性能的一个关键点是zero copy,如果需要在broker中过滤,那么势必需要读取消息内容到内存,就会失去zero copy的特性。



文件组织
kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。



在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。



每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:



kafka 文件组织.png
为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。



baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。



position:在segment中的绝对位置。



查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。



常用配置项
broker配置
配置项 作用
broker.id broker的唯一标识
auto.create.topics.auto 设置成true,就是遇到没有的topic自动创建topic。
log.dirs log的目录数,目录里面放partition,当生成新的partition时,会挑目录里partition数最少的目录放。
topic配置
配置项 作用
num.partitions 新建一个topic,会有几个partition。
log.retention.ms 对应的还有minutes,hours的单位。日志保留时间,因为删除是文件维度而不是消息维度,看的是日志文件的mtime。
log.retention.bytes partion最大的容量,超过就清理老的。注意这个是partion维度,就是说如果你的topic有8个partition,配置1G,那么平均分配下,topic理论最大值8G。
log.segment.bytes 一个segment的大小。超过了就滚动。
log.segment.ms 一个segment的打开时间,超过了就滚动。
message.max.bytes message最大多大
关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。
还有0.10之前的版本,时间看的是日志文件的mtime,但这个指是不准确的,有可能文件被touch一下,mtime就变了。因此在0.10版本开始,改为使用该文件最新一条消息的时间来判断。
按大小清理这里也要注意,Kafka在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。如果超过但是没超过一个日志段,那么就不会删除。
消息语义概述
在分布式系统中,构成系统的任何节点都是被定义为可以彼此独立失败的。比如在 Kafka中,broker可能会crash,在producer推送数据至topic的过程中也可能会遇到网络问题。根据producer处理此类故障所采取的提交策略类型,我们可以获得不同的语义:



at-least-once:如果producer收到来自Kafka broker的确认(ack)或者acks = all,则表示该消息已经写入到Kafka。但如果producer ack超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入Kafka。如果broker在发送Ack之前失败,但在消息成功写入Kafka之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终consumer,这种策略可能导致重复的工作和不正确的结果。
at-most-once:如果在ack超时或返回错误时producer不重试,则该消息可能最终不会写入Kafka,因此不会传递给consumer。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。
exactly-once:即使producer重试发送消息,消息也会保证最多一次地传递给最终consumer。该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作。例如如果在消费消息成功后,将Kafka consumer的偏移量rollback,我们将会再次从该偏移量开始接收消息。这表明消息传递系统和客户端应用程序必须配合调整才能实现excactly-once。
必须处理的常见灾难场景
为了清楚描述实现 exactly-once delivery语义的挑战,我们来看一个简单的例子。
假设有某个单进程producer应用在发送”Hello Kafka”到某个单partition topic(topic_name=EoS),有一个运行在其他节点的单实例consumer从topic里拉数据并进行打印。理想情况下如果没有任何灾难发生的话,”Hello Kafka”将会被exactly-once传递,consumer获取消息进行消费并提交commit到Kafka去完成这一次消息处理。即使在这之后consumer挂了或者被重启,也不会再收到这条消息。
然而生产环境错综复杂,灾难场景是无法避免的:



Broker失败:Kafka,作为一个高可用、持久化系统,保证每条消息被持久化并且冗余多份(假设是n份),所以理论上Kafka可以容忍n-1台broker宕机。Kafka的备份机制保证了一旦消息被成功写入leader replica,将会把数据同步到其他所有replica。
Producer到Broker的RPC失败:Kafka的durability特性是基于producer从broker收到的ack的,而没有收到ack并不代表请求肯定失败。Broker可能会在消息被写入之后返回ack之前宕机,同时也可能会在消息被写入topic之前宕机。因为producer没有任何途径可以得知失败的真实原因,而只会尝试重试。在一些场景下,下游consumer会收到若干的重复数据。
客户端也可能会失败:Exactly-once delivery也必须考虑客户端失败的情况。但是我们如何去区分客户端是真的挂了(永久性宕机)还是说只是暂时丢失心跳?追求正确性的话,broker应该丢弃由zombie producer发送的消息。 consumer也是如此,一旦新的客户端实例已经启动,它必须能够从失败实例的任何状态中恢复,并从安全点(safe checkpoint)开始处理,这意味着消费的偏移量必须始终与生成的输出保持同步。
Apache Kafka的exactly-once语义
在0.11.x版本之前,Apache Kafka支持at-least-once delivery语义以及partition内部的顺序delivery,如前所述这在某些场景下可能会导致数据重复消费。而Kafka 0.11.x支持exactly-once语义,不会导致该情况发生,其中主要包括三个内部逻辑的改造:



幂等:partition内部的exactly-once顺序语义
幂等操作,是指可以执行多次,而不会产生与仅执行一次不同结果的操作,Producer的send操作现在是幂等的。在任何导致producer重试的情况下,相同的消息,如果被producer发送多次,也只会被写入Kafka一次。要打开此功能,并让所有partition获得exactly-once delivery、无数据丢失和in-order语义,需要修改broker的配置:enable.idempotence = true。
这个功能如何工作?它的工作方式类似于TCP:发送到Kafka的每批消息将包含一个序列号,该序列号用于重复数据的删除。与TCP不同,TCP只能在transient in-memory中提供保证。序列号将被持久化存储topic中,因此即使leader replica失败,接管的任何其他broker也将能感知到消息是否重复。这种机制的开销相当低:它只是在每批消息中添加了几个额外字段。正如本文稍后将会看到的,该功能仅仅在非幂等producer上增加了可忽略的性能开销。



事务:跨partition的原子性写操作
第二点,Kafka现在支持使用新事务API原子性的对跨partition进行写操作,该API允许producer发送批量消息到多个partition。该功能同样支持在同一个事务中提交消费者offsets,因此真正意义上实现了end-to-end的exactly-once delivery语义。以下是一段示例代码:



producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
该代码片段描述了如何使用新的producer事务API原子性的发送消息至多个partition。值得注意的是,某个Kafka topic partition内部的消息可能是事务完整提交后的消息,也可能是事务执行过程中的部分消息。
而从consumer的角度来看,有两种策略去读取事务写入的消息,通过”isolation.level”来进行配置:



read_committed:可以同时读取事务执行过程中的部分写入数据和已经完整提交的事务写入数据;
read_uncommitted:完全不等待事务提交,按照offsets order去读取消息,也就是兼容0.11.x版本前Kafka的语义;
我们必须通过配置consumer端的配置isolation.level,来正确使用事务API,通过使用 new Producer API并且对一些unique ID设置transaction.id(该配置属于producer端),该unique ID用于提供事务状态的连续性。



Exactly-once 流处理
基于幂等和原子性,通过Streams API实现exactly-once流处理成为可能。如果要在流应用中实现相关语义,只需要配置 processing.guarantee=exactly_once,这会影响所有的流处理环境中的语义,包括将处理作业和由加工作业创建的所有物理状态同时写回到Kafka的操作。



这就是为什么Kafka Streams API提供的exactly-once保证是迄今为止任何流处理系统中的最强实现的原因。 它为以Kafka作为数据源的流处理应用程序提供端对端的exactly-once保证,Streams应用程序将任何Kafka的物化状态在最终环节写回到Kafka。 仅依靠外部数据系统实现物化状态的流处理系统仅支持对exactly-once的较弱保证。 即使他们使用Kafka作为流处理来源,在需要从故障中恢复的情况下,也只能rollback他们的Kafka消费者offset以重新消费并处理消息,而不能回滚关联状态,当更新不是幂等的时候会导致结果不正确。



我来解释下这段话的细节。 流处理系统的关键问题是我的流处理应用程序是否获得正确的答案,即使其中一个实例在处理过程中崩溃,恢复失败实例时的关键是把状态恢复到与崩溃前相同。



流处理可以看成是一个关于Kafka topic的读写操作集合, 消费者从Kafka topic读取消息,其他一些处理逻辑转换消息或修改cpu维护的状态,同时生产者将消息写入另一个Kafka topic。 Exactly-once流处理就是保证读写数据有且只有一次的一种能力。,在这种情况下,获得正确结果意味着不丢失任何输入消息或产生任何重复的输出,而这就是用户所期望的。



除了我们迄今为止讨论的简单灾难场景之外,还有许多其他故障情况需要考虑:



流处理器可能会从多个source topic获取输入,并且跨多个source topic的数据排序不是确定的,因此多次运行可能会产生不同的结果;
同样,流处理器可能产生多个dest topic的输出。如果生产者无法跨多个topic执行原子写入,如果对某些(但不是全部)分区的写入失败,则producer的输出可能不正确;
流处理器可以使用Streams API提供的managed state facilities去聚合或join多个输入的数据。如果流处理器的一个实例失败,那么需要能够回滚该流处理器实例的物化状态。在重新启动实例时,还需要能够恢复处理并重新创建其状态。
流处理器可以查找外部数据库或者调通服务来丰富信息。基于外部服务的流处理器基本上来说是非确定性的:如果外部服务在流处理器的两次运行之间改变其内部状态,则会导致下游的结果出错。但是,如果处理正确,则不会导致完全不正确的结果,而仅仅会导致流处理器的输出是期望输出的子集。
特别是当与非确定性操作和应用程序计算的持久状态的更改相结合时,如果实例失败或者重新启动,可能导致数据重复甚至是计算结果错误。



“流处理保证确定性操作exactly-once的正确方法是:保证读取写入操作的输出在任何非灾难场景下一致。”



针对非确定性操作的exactly-one流处理
Exactly-once流处理对确定性操作是有意义的,但是当处理逻辑本身存在不确定的逻辑时呢?假设有这样一个场景,流处理器用于计算满足条件的流入的事件数量,条件由外部服务动态决定。从根本上来说这种操作本质上是非决定性的,因为外部服务指定的条件是不确定的,这可能会导致下游数据流得到不同的结果。那么,对这样的非确定性操作来说,正确的策略又是什么呢?



“对于非确定性操作来说,正确的处理方式是确保读取写入流处理操作的输出属于预期输出的子集,该集合应该可以由非确定性输入得到的预期值组合得到。”



因此,对于我们的示例流处理器,假设当前计数为31,输入事件值为2,故障时正确输出只能是31或者33其中一个:如果输入事件被外部条件指定需要丢弃那么就是31 ,反之则为33。



Kafka的exactly-once保证真的起作用了吗?
为了回答这个关于Kafka exactly-once保证的问题,让我们来看看正确性(也就是我们如何设计,构建和测试这个功能)和性能。



精妙的设计和review过程
正确性和性能都从坚实的设计开始。 大约三年前,我们开始在LinkedIn上进行设计和原型开发工作。 我们在Confluent上寻求一个优雅的方式来将幂等和事务的功能性要求融合成一个整体的封装。 我们写了一个60+页的设计文档,概述了设计的各个方面:从高级消息流到每个数据结构和RPC的细节实现细节。 经过9个月的广泛公众监督,设计也从社区的不断反馈中大大得到改善。 例如,基于开源讨论,我们用更智能的服务器端过滤替代消费者端缓存以进行事务读取,从而避免了潜在的性能开销。 同时,我们也改进了事务与compacted topic,并增加了相应的安全机制。



最终我们机智地得到了一个极简设计,在很大程度上也依赖于强大的Kafka原型:



事务日志是一个Kafka topic,享受到了与生俱来的durability;
Broker内部新增了事务协调线程(用于管理每个生产者的事务状态),自然地利用了Kafka自有的选举算法来处理failover;
对于使用了Kafka Streams API构建的流处理应用程序,我们会将数据透明地fold起来合并成原子性操作以事务的形式写入多个分区,从而为读取写入操作提供exactly-once保证;
这种足够简单、专注于细节的设计,实施效果非常好。



迭代的开发过程
我们在开发该功能时,会确保每一个pull request经过广泛的审查。这意味着在几个月的时间内一些pull request经历过几十次迭代,审查过程中发现了之前设计上没有考虑到的无数边界问题。



我们编写了超过15,000个测试用例,包括分布式测试,运行时的故障测试。该流程揭示了各个方面的问题,从测试工具中的基本编码错误到深奥的NTP同步问题。其中的一个子集是分布式混沌测试,我们为多个事务客户端提供了一个完整的Kafka集群,通过事务产生消息,同时读取这些消息,并在过程中强行终止客户端或服务器,以确保数据既不丢失也不重复。



因此经过良好测试,高质量代码库的简单而坚固的设计构成了我们解决方案的基石。



好消息:Kafka 还是非常快!
在设计此功能时,一个重点是性能的保证:由于exactly-once设计带来的性能开销,我们淘汰了许多更简单的设计选型。经过多番思考,我们采用的设计尽可能地使每个事务的开销最小(每个分区约1次写入,尽可能少的写入记录至中心事务日志)。对于耗时100ms的1KB消息和事务写入,与配置为at-least-once并且保序交付(acks = all,max.in.flight.requests.per.connection = 1)的生产者的吞吐量相比吞吐量仅下降3%;与at-most-once并且无排序保证(acks = 1,max.in.flight.requests.per.connection = 5)的生产者的吞吐量相比下降20%。



具体的测试benchmark可以看这里。



除了确保新功能的低性能开销之外,我们也不希望在没有使用exactly-once功能的应用程序中看到性能有意外损耗。为了确保这一点,我们不仅在Kafka消息头中添加了一些新的字段来实现exactly-once功能,而且还重新设计了Kafka消息格式,在网络传输和磁盘存储时,更有效地压缩消息。特别是,我们将一大堆常见的元数据转移到批量头文件中,并将可变长度编码引入批次中的每个记录。通过这种批量优化,整体信息的size显著减小。例如,一批7条记录、每条10个字节的批量消息,使用新的格式将减少35%的体量,这使得生产者吞吐量提高了20%,处理小消息时提高了50%的消费者吞吐量。任何Kafka 0.11用户都可以使用此性能提升,即使没有使用任何exactly-once功能。



我们还着眼于优化Streams API中的exactly-once流处理的开销。 以100ms作为提交间隔的情况下(保证端到端延迟较低的一个值),我们看到吞吐量下降了15%至30%(损耗百分比取决于消息大小,前者为1KB的消息大小,后者为100字节)。 但是,对于>=1KB的消息,30秒的提交间隔是没有任何吞吐性能损耗的。 在下一个版本中,我们计划引入推测性执行机制:即使我们使用较大的提交间隔,我们也可以保持端到端的延迟较低,最终我们期望将事务的开销降至零。



总而言之,通过从根本上重新调整我们的一些核心数据结构,我们在较小的性能损耗下实现了幂等和事务功能使得Kafka在大部分场景下依然很快。



主题是承载真实数据的逻辑容器,主题之下分为若干个分区,Kafka的消息组织方式为三级结构:主题、分区、消息
主题下的每条消息只会保存在某个分区中,而不会在多个分区中被保存多份
分区的作用是提供负载均衡的能力,实现系统的高伸缩性
不同的分区能够被放置在不同的机器节点上,而数据读写操作的粒度也是分区
每个机器节点都能独立地执行各自分区的读写请求处理,还可以通过添加新的机器节点来增加整体系统的吞吐量
分区在不同的分布式系统有不同的叫法,但分区的思想都是类似的
Kafka – Partition
MongoDB、Elasticsearch – Shard
HBase – Region
分区策略
分区策略:决定生产者将消息发送到哪个分区的算法,Kafka提供了默认的分区策略,也支持自定义的分区策略
自定义的分区策略,需要显式地配置生产者端的参数partitioner.class
实现接口:org.apache.kafka.clients.producer.Partitioner
消息数据:topic、key、keyBytes、value、valueBytes
集群数据:cluster



轮询策略是Kafka Java生产者的默认分区策略
轮询策略的负载均衡表现非常优秀,总能保证消息最大限度地被平均分配到所有分区上,默认情况下它是最合理的分区策略



Kafka允许为每条消息定义消息键,简称为Key
Key可以是一个有明确业务含义的字符串:客户代码、部门编号、业务ID、用来表征消息的元数据等
一旦消息被定义了Key,可以保证同一个Key的所有消息都进入到相同的分区里
由于每个分区下的消息处理都是顺序的,所以这个策略被称为按消息键保序策略
Kafka Java生产者的默认分区策略
如果指定了Key,采用按消息键保序策略
如果没有指定Key,采用轮询策略


Category storage