Kafka是一个分布式的基于发布、订阅的消息系统,具有着高吞吐、高容错、高可靠以及高性能等特性,主要用于应用解耦、流量削峰、异步消息等场景。
为了让大家更加深入的了解Kafka内部实现原理,文中将会从主题与日志开始介绍消息的存储、删除以及检索,然后介绍其副本机制的实现原理,最后介绍生产与消费的实现原理以便更合理的应用于实际业务。( 另外,本文较长,建议分享后慢慢阅读 : )
快速数据持久化,实现了O(1)时间复杂度的数据持久化能力。
高吞吐,能在普通的服务器上达到10W每秒的吞吐速率。
高可靠,消息持久化以及副本系统的机制保证了消息的可靠性,消息可以多次消费。
高扩展,与其他分布式系统一样,所有组件均支持分布式、自动实现负载均衡,可以快速便捷的扩容系统。
离线与实时处理能力并存,提供了在线与离线的消息处理能力。
正是因其具有这些的优秀特性而广泛用于应用解耦、流量削峰、异步消息等场景,比如消息中间件、日志聚合、流处理等等。
本文将从以下几个方面去介绍kafka:
第一章简单介绍下kafka作为分布式的消息发布与订阅系统所具备的特征与优势
第二章节介绍kafka系统的主题与日志,了解消息如何存放、如何检索以及如何删除
第三章节介绍kafka副本机制以了解kafka内部如何实现消息的高可靠
第四章节将会从消息的生产端去介绍消息的分区算法以及幂等特性的具体实现
第五章节将从消息的消费端去了解消费组、消费位移以及重平衡机制具体实现
最后章节简单总结下本文
每个主题又可以划分成多个分区,每个分区存储不同的消息。当消息添加至分区时,会为其分配一个位移offset(从0开始递增),并保证分区上唯一,消息在分区上的顺序由offset保证,即同一个分区内的消息是有序的,如下图所示
同一个主题的不同分区会分配在不同的节点上(broker),分区时保证Kafka集群具有水平扩展的基础。
以主题nginx_access_log为例,分区数为3,如上图所示。分区在逻辑上对应一个日志(Log),物理上对应的是一个文件夹。
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx_access_log-0/
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx_access_log-1/
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx_access_log-2/
(左右滑动查看完整代码)
消息写入分区时,实际上是将消息写入分区所在的文件夹中。日志又分成多个分片(Segment),每个分片由日志文件与索引文件组成,每个分片大小是有限的(在kafka集群的配置文件log.segment.bytes配置,默认为1073741824byte,即1GB),当分片大小超过限制则会重新创建一个新的分片,外界消息的写入只会写入最新的一个分片(顺序IO)。
-rw-r–r– 1 root root 1835920 10月 11 19:18 00000000000000000000.index
-rw-r–r– 1 root root 1073741684 10月 11 19:18 00000000000000000000.log
-rw-r–r– 1 root root 2737884 10月 11 19:18 00000000000000000000.timeindex
-rw-r–r– 1 root root 1828296 10月 11 19:30 00000000000003257573.index
-rw-r–r– 1 root root 1073741513 10月 11 19:30 00000000000003257573.log
-rw-r–r– 1 root root 2725512 10月 11 19:30 00000000000003257573.timeindex
-rw-r–r– 1 root root 1834744 10月 11 19:42 00000000000006506251.index
-rw-r–r– 1 root root 1073741771 10月 11 19:42 00000000000006506251.log
-rw-r–r– 1 root root 2736072 10月 11 19:42 00000000000006506251.timeindex
-rw-r–r– 1 root root 1832152 10月 11 19:54 00000000000009751854.index
-rw-r–r– 1 root root 1073740984 10月 11 19:54 00000000000009751854.log
-rw-r–r– 1 root root 2731572 10月 11 19:54 00000000000009751854.timeindex
-rw-r–r– 1 root root 1808792 10月 11 20:06 00000000000012999310.index
-rw-r–r– 1 root root 1073741584 10月 11 20:06 00000000000012999310.log
-rw-r–r– 1 root root 10 10月 11 19:54 00000000000012999310.snapshot
-rw-r–r– 1 root root 2694564 10月 11 20:06 00000000000012999310.timeindex
-rw-r–r– 1 root root 10485760 10月 11 20:09 00000000000016260431.index
-rw-r–r– 1 root root 278255892 10月 11 20:09 00000000000016260431.log
-rw-r–r– 1 root root 10 10月 11 20:06 00000000000016260431.snapshot
-rw-r–r– 1 root root 10485756 10月 11 20:09 00000000000016260431.timeindex
-rw-r–r– 1 root root 8 10月 11 19:03 leader-epoch-checkpoint
(左右滑动查看完整代码)
一个分片包含多个不同后缀的日志文件,分片中的第一个消息的offset将作为该分片的基准偏移量,偏移量固定长度为20,不够前面补齐0,然后将其作为索引文件以及日志文件的文件名,如00000000000003257573.index、00000000000003257573.log、00000000000003257573.timeindex、相同文件名的文件组成一个分片(忽略后缀名),除了.index、.timeindex、 .log后缀的日志文件外其他日志文件,对应含义如下:
文件类型 作用
.index 偏移量索引文件,记录<相对位移,起始地址>映射关系,其中相对位移表示该分片的第一个消息,从1开始计算,起始地址表示对应相对位移消息在分片.log文件的起始地址
.timeindex 时间戳索引文件,记录<时间戳,相对位移>映射关系
.log 日志文件,存储消息的详细信息
.snaphot 快照文件
.deleted 分片文件删除时会先将该分片的所有文件加上.delete后缀,然后有delete-file任务延迟删除这些文件(file.delete.delay.ms可以设置延时删除的的时间)
.cleaned 日志清理时临时文件
.swap Log Compaction 之后的临时文件
.leader-epoch-checkpoint
(左右滑动查看完整表格)
2.2 日志索引
首先介绍下.index文件,这里以文件00000000000003257573.index为例,首先我们可以通过以下命令查看该索引文件的内容,可以看到输出结构为<offset,position>,实际上索引文件中保存的并不是offset而是相对位移,比如第一条消息的相对位移则为0,格式化输出时加上了基准偏移量。
如上图所示,<114,17413>表示该分片相对位移为114的消息,其位移为3257573+114,即3257687,position表示对应offset在.log文件的物理地址,通过.index索引文件则可以获取对应offset所在的物理地址。
索引采用稀疏索引的方式构建,并不保证分片中的每个消息都在索引文件有映射关系(.timeindex索引也是类似),主要是为了节省磁盘空间、内存空间,因为索引文件最终会映射到内存中。
bin/kafka-dump-log.sh –files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index |head -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index
offset: 3257687 position: 17413
offset: 3257743 position: 33770
offset: 3257799 position: 50127
offset: 3257818 position: 66484
offset: 3257819 position: 72074
offset: 3257871 position: 87281
offset: 3257884 position: 91444
offset: 3257896 position: 95884
offset: 3257917 position: 100845
$ bin/kafka-dump-log.sh –files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index |tail -n 10
offset: 6506124 position: 1073698512
offset: 6506137 position: 1073702918
offset: 6506150 position: 1073707263
offset: 6506162 position: 1073711499
offset: 6506176 position: 1073716197
offset: 6506188 position: 1073720433
offset: 6506205 position: 1073725654
offset: 6506217 position: 1073730060
offset: 6506229 position: 1073734174
offset: 6506243 position: 1073738288
(左右滑动查看完整代码)
比如查看offset为6506155的消息:首先根据offset找到对应的分片,65061所对应的分片为00000000000003257573,然后通过二分法在00000000000003257573.index文件中找到不大于6506155的最大索引值,得到<offset: 6506150, position: 1073707263>,然后从00000000000003257573.log的1073707263位置开始顺序扫描找到offset为650155的消息
Kafka从0.10.0.0版本起,为分片日志文件中新增了一个.timeindex的索引文件,可以根据时间戳定位消息。同样我们可以通过脚本kafka-dump-log.sh查看时间索引的文件内容。
bin/kafka-dump-log.sh –files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex |head -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex
timestamp: 1570792689308 offset: 3257685
timestamp: 1570792689324 offset: 3257742
timestamp: 1570792689345 offset: 3257795
timestamp: 1570792689348 offset: 3257813
timestamp: 1570792689357 offset: 3257867
timestamp: 1570792689361 offset: 3257881
timestamp: 1570792689364 offset: 3257896
timestamp: 1570792689368 offset: 3257915
timestamp: 1570792689369 offset: 3257927
bin/kafka-dump-log.sh –files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex |tail -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex
timestamp: 1570793423474 offset: 6506136
timestamp: 1570793423477 offset: 6506150
timestamp: 1570793423481 offset: 6506159
timestamp: 1570793423485 offset: 6506176
timestamp: 1570793423489 offset: 6506188
timestamp: 1570793423493 offset: 6506204
timestamp: 1570793423496 offset: 6506214
timestamp: 1570793423500 offset: 6506228
timestamp: 1570793423503 offset: 6506240
timestamp: 1570793423505 offset: 6506248
(左右滑动查看完整代码)
比如我想查看时间戳1570793423501开始的消息:
1.首先定位分片,将1570793423501与每个分片的最大时间戳进行对比(最大时间戳取时间索引文件的最后一条记录时间,如果时间为0则取该日志分段的最近修改时间),直到找到大于或等于1570793423501的日志分段,因此会定位到时间索引文件00000000000003257573.timeindex,其最大时间戳为1570793423505;
2.通过二分法找到大于或等于1570793423501的最大索引项,即<timestamp: 1570793423503 offset: 6506240>(6506240为offset,相对位移为3247667);
3.根据相对位移3247667去索引文件中找到不大于该相对位移的最大索引值<3248656,1073734174>;
4.从日志文件00000000000003257573.log的1073734174位置处开始扫描,查找不小于1570793423501的数据。
2.3 日志删除
与其他消息中间件不同的是,Kafka集群中的消息不会因为消费与否而删除,跟日志一样消息最终会落盘,并提供对应的策略周期性(通过参数log.retention.check.interval.ms来设置,默认为5分钟)执行删除或者压缩操作(broker配置文件log.cleanup.policy参数如果为“delete”则执行删除操作,如果为“compact”则执行压缩操作,默认为“delete”)。
2.3.1 基于时间的日志删除
参数 默认值 说明
log.retention.hours 168 日志保留时间(小时)
log.retention.minutes 无 日志保留时间(分钟),优先级大于小时
log.retention.ms 无 日志保留时间(毫秒),优先级大于分钟
(左右滑动查看完整表格)
当消息在集群保留时间超过设定阈值(log.retention.hours,默认为168小时,即七天),则需要进行删除。这里会根据分片日志的最大时间戳来判断该分片的时间是否满足删除条件,最大时间戳首先会选取时间戳索引文件中的最后一条索引记录,如果对应的时间戳值大于0则取该值,否则为最近一次修改时间。
这里不直接选取最后修改时间的原因是避免分片日志的文件被无意篡改而导致其时间不准。
如果恰好该分区下的所有日志分片均已过期,那么会先生成一个新的日志分片作为新消息的写入文件,然后再执行删除参数。
2.3.2 基于空间的日志删除
参数 默认值 说明
log.retention.bytes 1073741824(即1G),默认未开启,即无穷大 日志文件总大小,并不是指单个分片的大小
log.segment.bytes 1073741824(即1G) 单个日志分片大小
(左右滑动查看完整表格)
首先会计算待删除的日志大小diff(totalSize-log.rentention.bytes),然后从最旧的一个分片开始查看可以执行删除操作的文件集合(如果diff-segment.size>=0,则满足删除条件),最后执行删除操作。
2.3.3 基于日志起始偏移量的日志删除
一般情况下,日志文件的起始偏移量(logStartOffset)会等于第一个日志分段的baseOffset,但是其值会因为删除消息请求而增长,logStartOffset的值实际上是日志集合中的最小消息,而小于这个值的消息都会被清理掉。如上图所示,我们假设logStartOffset=7421048,日志删除流程如下:
从最旧的日志分片开始遍历,判断其下一个分片的baseOffset是否小于或等于logStartOffset值,如果满足,则需要删除,因此第一个分片会被删除。
分片二的下一个分片baseOffset=6506251<7421048,所以分片二也需要删除。
分片三的下一个分片baseOffset=9751854>7421048,所以分片三不会被删除。
2.4 日志压缩
前面提到当broker配置文件log.cleanup.policy参数值设置为“compact”时,则会执行压缩操作,这里的压缩跟普通意义的压缩不一样,这里的压缩是指将相同key的消息只保留最后一个版本的value值,如下图所示,压缩之前offset是连续递增,压缩之后offset递增可能不连续,只保留5条消息记录。
Kafka日志目录下cleaner-offset-checkpoint文件,用来记录每个主题的每个分区中已经清理的偏移量,通过这个偏移量可以将分区中的日志文件分成两个部分:clean表示已经压缩过;dirty表示还未进行压缩,如下图所示(active segment不会参与日志的压缩操作,因为会有新的数据写入该文件)。
-rw-r–r– 1 root root 4 10月 11 19:02 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx_access_log-0/
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx_access_log-1/
drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx_access_log-2/
-rw-r–r– 1 root root 0 9月 18 09:50 .lock
-rw-r–r– 1 root root 4 10月 16 11:19 log-start-offset-checkpoint
-rw-r–r– 1 root root 54 9月 18 09:50 meta.properties
-rw-r–r– 1 root root 1518 10月 16 11:19 recovery-point-offset-checkpoint
-rw-r–r– 1 root root 1518 10月 16 11:19 replication-offset-checkpoint
#cat cleaner-offset-checkpoint
nginx_access_log 0 5033168
nginx_access_log 1 5033166
nginx_access_log 2 5033168
(左右滑动查看完整代码)
日志压缩时会根据dirty部分数据占日志文件的比例(cleanableRatio)来判断优先压缩的日志,然后为dirty部分的数据建立key与offset映射关系(保存对应key的最大offset)存入SkimpyoffsetMap中,然后复制segment分段中的数据,只保留SkimpyoffsetMap中记录的消息。
压缩之后的相关日志文件大小会减少,为了避免出现过小的日志文件与索引文件,压缩时会对所有的segment进行分组(一个组的分片大小不会超过设置的log.segment.bytes值大小),同一个分组的多个分片日志压缩之后变成一个分片。
如上图所示,所有消息都还没压缩前clean checkpoint值为0,表示该分区的数据还没进行压缩,第一次压缩后,之前每个分片的日志文件大小都有所减少,同时会移动clean checkpoint的位置到这一次压缩结束的offset值。第二次压缩时,会将前两个分片{0.5GB,0.4GB}组成一个分组,{0.7GB,0.2GB}组成一个分组进行压缩,以此类推。
如上图所示,日志压缩的主要流程如下:
计算deleteHorizonMs值:当某个消息的value值为空时,该消息会被保留一段时间,超时之后会在下一次的得日志压缩中被删除,所以这里会计算deleteHorizonMs,根据该值确定可以删除value值为空的日志分片。(deleteHorizonMs = clean部分的最后一个分片的lastModifiedTime - deleteRetionMs,deleteRetionMs通过配置文件log.cleaner.delete.retention.ms配置,默认为24小时)。
确定压缩dirty部分的offset范围[firstDirtyOffset,endOffset):其中firstDirtyOffset表示dirty的起始位移,一般会等于clear checkpoint值,firstUncleanableOffset表示不能清理的最小位移,一般会等于活跃分片的baseOffset,然后从firstDirtyOffset位置开始遍历日志分片,并填充key与offset的映射关系至SkimpyoffsetMap中,当该map被填充满或到达上限firstUncleanableOffset时,就可以确定日志压缩上限endOffset。
将(logStartOffset,endOffset)中的日志分片进行分组,然后按照分组的方式进行压缩。
每个分区的副本集合中会有一个副本被选举为主副本(leader),其他为从副本,所有的读写请求由主副本对外提供,从副本负责将主副本的数据同步到自己所属分区,如果主副本所在分区宕机,则会重新选举出新的主副本对外提供服务。
3.1 ISR集合
ISR(In-Sync Replica)集合,表示目前可以用的副本集合,每个分区中的leader副本会维护此分区的ISR集合。这里的可用是指从副本的消息量与主副本的消息量相差不大,加入至ISR集合中的副本必须满足以下几个条件:
副本所在节点需要与ZooKeeper维持心跳。
从副本的最后一条消息的offset需要与主副本的最后一条消息offset差值不超过设定阈值(replica.lag.max.messages)或者副本的LEO落后于主副本的LEO时长不大于设定阈值(replica.lag.time.max.ms),官方推荐使用后者判断,并在新版本kafka0.10.0移除了replica.lag.max.messages参数。
如果从副本不满足以上的任意条件,则会将其提出ISR集合,当其再次满足以上条件之后又会被重新加入集合中。ISR的引入主要是解决同步副本与异步复制两种方案各自的缺陷(同步副本中如果有个副本宕机或者超时就会拖慢该副本组的整体性能;如果仅仅使用异步副本,当所有的副本消息均远落后于主副本时,一旦主副本宕机重新选举,那么就会存在消息丢失情况)
3.2 HW&LEO
HW(High Watermark)是一个比较特殊的offset标记,消费端消费时只能拉取到小于HW的消息而HW及之后的消息对于消费者来说是不可见的,该值由主副本管理,当ISR集合中的全部从副本都拉取到HW指定消息之后,主副本会将HW值+1,即指向下一个offset位移,这样可以保证HW之前消息的可靠性。
LEO(Log End Offset)表示当前副本最新消息的下一个offset,所有副本都存在这样一个标记,如果是主副本,当生产端往其追加消息时,会将其值+1。当从副本从主副本成功拉取到消息时,其值也会增加。
3.2.1 从副本更新LEO与HW
从副本的数据是来自主副本,通过向主副本发送fetch请求获取数据,从副本的LEO值会保存在两个地方,一个是自身所在的节点),一个是主副本所在节点,自身节点保存LEO主要是为了更新自身的HW值,主副本保存从副本的LEO也是为了更新其HW。
当从副本每写入一条新消息就会增加其自身的LEO,主副本收到从副本的fetch请求,会先从自身的日志中读取对应数据,在数据返回给从副本之前会先去更新其保存的从副本LEO值。一旦从副本数据写入完成,就会尝试更新自己的HW值,比较LEO与fetch响应中主副本的返回HW,取最小值作为新的HW值。
3.2.2 主副本更新LEO与HW
主副本有日志写入时就会更新其自身的LEO值,与从副本类似。而主副本的HW值是分区的HW值,决定分区数据对应消费端的可见性,以下四种情况,主副本会尝试更新其HW值:
副本成为主副本:当某个副本成为主副本时,kafka会尝试更新分区的HW值。
broker出现奔溃导致副本被踢出ISR集合:如果有broker节点奔溃则会看是否影响对应分区,然后会去检查分区的HW值是否需要更新。
生成端往主副本写入消息时:消息写入会增加其LEO值,此时会查看是否需要修改HW值。
主副本接受到从副本的fetch请求时:主副本在处理从副本的fetch请求时会尝试更新分区HW值。
前面是去尝试更新HW,但是不一定会更新,主副本上保存着从副本的LEO值与自身的LEO值,这里会比较所有满足条件的副本LEO值,并选择最小的LEO值最为分区的HW值,其中满足条件的副本是指满足以下两个条件之一:
副本在ISR集合中
副本的LEO落后于主副本的LEO时长不大于设定阈值(replica.lag.time.max.ms,默认为10s)
3.3 数据丢失场景
前面提到如果仅仅依赖HW来进行日志截断以及水位的判断会存在问题,如上图所示,假定存在两个副本A、副本B,最开始A为主副本,B为从副本,且参数min.insync.replicas=1,即ISR只有一个副本时也会返回成功:
初始情况为主副本A已经写入了两条消息,对应HW=1,LEO=2,LEOB=1,从副本B写入了一条消息,对应HW=1,LEO=1。
此时从副本B向主副本A发起fetchOffset=1请求,主副本收到请求之后更新LEOB=1,表示副本B已经收到了消息0,然后尝试更新HW值,min(LEO,LEOB)=1,即不需要更新,然后将消息1以及当前分区HW=1返回给从副本B,从副本B收到响应之后写入日志并更新LEO=2,然后更新其HW=1,虽然已经写入了两条消息,但是HW值需要在下一轮的请求才会更新为2。
此时从副本B重启,重启之后会根据HW值进行日志截断,即消息1会被删除。
从副本B向主副本A发送fetchOffset=1请求,如果此时主副本A没有什么异常,则跟第二步骤一样没有什么问题,假设此时主副本也宕机了,那么从副本B会变成主副本。
当副本A恢复之后会变成从副本并根据HW值进行日志截断,即把消息1丢失,此时消息1就永久丢失了。
3.4 数据不一致场景
如图所示,假定存在两个副本A、副本B,最开始A为主副本,B为从副本,且参数min.insync.replicas=1,即ISR只有一个副本时也会返回成功:
初始状态为主副本A已经写入了两条消息对应HW=1,LEO=2,LEOB=1,从副本B也同步了两条消息,对应HW=1,LEO=2。
此时从副本B向主副本发送fetchOffset=2请求,主副本A在收到请求后更新分区HW=2并将该值返回给从副本B,如果此时从副本B宕机则会导致HW值写入失败。
我们假设此时主副本A也宕机了,从副本B先恢复并成为主副本,此时会发生日志截断,只保留消息0,然后对外提供服务,假设外部写入了一个消息1(这个消息与之前的消息1不一样,用不同的颜色标识不同消息)。
等副本A起来之后会变成从副本,不会发生日志截断,因为HW=2,但是对应位移1的消息其实是不一致的
3.5 leader epoch机制
HW值被用于衡量副本备份成功与否以及出现失败情况时候的日志截断依据可能会导致数据丢失与数据不一致情况,因此在新版的Kafka(0.11.0.0)引入了leader epoch概念。
leader epoch表示一个键值对<epoch, offset>,其中epoch表示leader主副本的版本号,从0开始编码,当leader每变更一次就会+1,offset表示该epoch版本的主副本写入第一条消息的位置。
比如<0,0>表示第一个主副本从位移0开始写入消息,<1,100>表示第二个主副本版本号为1并从位移100开始写入消息,主副本会将该信息保存在缓存中并定期写入到checkpoint文件中,每次发生主副本切换都会去从缓存中查询该信息,下面简单介绍下leader epoch的工作原理:
每条消息会都包含一个4字节的leader epoch number值
每个log目录都会创建一个leader epoch sequence文件用来存放主副本版本号以及开始位移。
当一个副本成为主副本之后,会在leader epoch sequence文件末尾添加一条新的记录,然后每条新的消息就会变成新的leader epoch值。
当某个副本宕机重启之后,会进行以下操作:
从leader epoch sequence文件中恢复所有的leader epoch。
向分区主副本发送LeaderEpoch请求,请求包含了从副本的leader epoch sequence文件中的最新leader epoch值。
主副本返回从副本对应LeaderEpoch的lastOffset,返回的lastOffset分为两种情况,一种是返回比从副本请求中leader epoch版本大1的开始位移,另外一种是与请求中的leader epoch相等则直接返回当前主副本的LEO值。
如果从副本的leader epoch开始位移大于从leader中返回的lastOffset,那么会将从副本的leader epoch sequence值保持跟主副本一致。
从副本截断本地消息到主副本返回的LastOffset所在位移处。
从副本开始从主副本开始拉取数据。
在获取数据时,如果从副本发现消息中的leader epoch值比自身的最新leader epoch值大,则会将该leader epoch 值写到leader epoch sequence文件,然后继续同步文件。
下面看下leader epoch机制如何避免前面提到的两种异常场景
3.5.1 数据丢失场景解决
如图所示,当从副本B重启之后向主副本A发送offsetsForLeaderEpochRequest,epoch主从副本相等,则A返回当前的LEO=2,从副本B中没有任何大于2的位移,因此不需要截断。
当从副本B向主副本A发送fetchoffset=2请求时,A宕机,所以从副本B成为主副本,并更新epoch值为<epoch=1, offset=2>,HW值更新为2。
当A恢复之后成为从副本,并向B发送fetcheOffset=2请求,B返回HW=2,则从副本A更新HW=2。
主副本B接受外界的写请求,从副本A向主副本A不断发起数据同步请求。
从上可以看出引入leader epoch值之后避免了前面提到的数据丢失情况,但是这里需要注意的是如果在上面的第一步,从副本B起来之后向主副本A发送offsetsForLeaderEpochRequest请求失败,即主副本A同时也宕机了,那么消息1就会丢失,具体可见下面数据不一致场景中有提到。
3.5.2 数据不一致场景解决
从副本B恢复之后向主副本A发送offsetsForLeaderEpochRequest请求,由于主副本也宕机了,因此副本B将变成主副本并将消息1截断,此时接受到新消息1的写入。
副本A恢复之后变成从副本并向主副本A发送offsetsForLeaderEpochRequest请求,请求的epoch值小于主副本B,因此主副本B会返回epoch=1时的开始位移,即lastoffset=1,因此从副本A会截断消息1。
从副本A从主副本B拉取消息,并更新epoch值<epoch=1, offset=1>。
可以看出epoch的引入避免的数据不一致,但是两个副本均宕机,则还是存在数据丢失的场景,前面的所有讨论都是建立在min.insync.replicas=1的前提下,因此需要在数据的可靠性与速度方面做权衡。
如果消息发送时指定了消息所属分区,则会直接发往指定分区。
如果没有指定消息分区,但是设置了消息的key,则会根据key的哈希值选择分区。
如果前两者均不满足,则会采用轮询的方式选择分区。
4.2 ack参数设置及意义
生产端往kafka集群发送消息时,可以通过request.required.acks参数来设置数据的可靠性级别
1:默认为1,表示在ISR中的leader副本成功接收到数据并确认后再发送下一条消息,如果主节点宕机则可能出现数据丢失场景,详细分析可参考前面提到的副本章节。
0:表示生产端不需要等待节点的确认就可以继续发送下一批数据,这种情况下数据传输效率最高,但是数据的可靠性最低。
-1:表示生产端需要等待ISR中的所有副本节点都收到数据之后才算消息写入成功,可靠性最高,但是性能最低,如果服务端的min.insync.replicas值设置为1,那么在这种情况下允许ISR集合只有一个副本,因此也会存在数据丢失的情况。
4.3 幂等特性
所谓的幂等性,是指一次或者多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外),通俗一点的理解就是同一个操作任意执行多次产生的影响或效果与一次执行影响相同,幂等的关键在于服务端能否识别出请求是否重复,然后过滤掉这些重复请求,通常情况下需要以下信息来实现幂等特性:
唯一标识:判断某个请求是否重复,需要有一个唯一性标识,然后服务端就能根据这个唯一标识来判断是否为重复请求。
记录已经处理过的请求:服务端需要记录已经处理过的请求,然后根据唯一标识来判断是否是重复请求,如果已经处理过,则直接拒绝或者不做任何操作返回成功。
kafka中Producer端的幂等性是指当发送同一条消息时,消息在集群中只会被持久化一次,其幂等是在以下条件中才成立:
只能保证生产端在单个会话内的幂等,如果生产端因为某些原因意外挂掉然后重启,此时是没办法保证幂等的,因为这时没办法获取到之前的状态信息,即无法做到垮会话级别的幂等。
幂等性不能垮多个主题分区,只能保证单个分区内的幂等,涉及到多个消息分区时,中间的状态并没有同步。
如果要支持垮会话或者垮多个消息分区的情况,则需要使用kafka的事务性来实现。
为了实现生成端的幂等语义,引入了Producer ID(PID)与Sequence Number的概念:
Producer ID(PID):每个生产者在初始化时都会分配一个唯一的PID,PID的分配对于用户来说是透明的。
Sequence Number(序列号):对于给定的PID而言,序列号从0开始单调递增,每个主题分区均会产生一个独立序列号,生产者在发送消息时会给每条消息添加一个序列号。broker端缓存了已经提交消息的序列号,只有比缓存分区中最后提交消息的序列号大1的消息才会被接受,其他会被拒绝。
4.3.1 生产端消息发送流程
下面简单介绍下支持幂等的消息发送端工作流程
生产端通过Kafkaproducer会将数据添加到RecordAccumulator中,数据添加时会判断是否需要新建一个ProducerBatch。
生产端后台启动发送线程,会判断当前的PID是否需要重置,重置的原因是因为某些消息分区的batch重试多次仍然失败最后因为超时而被移除,这个时候序列号无法连续,导致后续消息无法发送,因此会重置PID,并将相关缓存信息清空,这个时候消息会丢失。
发送线程判断是否需要新申请PID,如果需要则会阻塞直到获取到PID信息。
发送线程在调用sendProducerData()方法发送数据时,会进行以下判断:
判断主题分区是否可以继续发送、PID是否有效、如果是重试batch需要判断之前的batch是否发送完成,如果没有发送完成则会跳过当前主题分区的消息发送,直到前面的batch发送完成。
如果对应ProducerBatch没有分配对应的PID与序列号信息,则会在这里进行设置。
4.3.2 服务端消息接受流程
服务端(broker)在收到生产端发送的数据写请求之后,会进行一些判断来决定是否可以写入数据,这里也主要介绍关于幂等相关的操作流程。
如果请求设置了幂等特性,则会检查是否对ClusterResource有IdempotentWrite权限,如果没有,则会返回错误CLUSTER_AUTHORIZATION_FAILED。
检查是否有PID信息。
根据batch的序列号检查该batch是否重复,服务端会缓存每个PID对应主题分区的最近5个batch信息,如果有重复,则直接返回写入成功,但是不会执行真正的数据写入操作。
如果有PID且非重复batch,则进行以下操作:
判断该PID是否已经存在缓存中。
如果不存在则判断序列号是否是从0开始,如果是则表示为新的PID,在缓存中记录PID的信息(包括PID、epoch以及序列号信息),然后执行数据写入操作;如果不存在但是序列号不是从0开始,则直接返回错误,表示PID在服务端以及过期或者PID写的数据已经过期。
如果PID存在,则会检查PID的epoch版本是否与服务端一致,如果不一致且序列号不是从0开始,则返回错误。如果epoch不一致但是序列号是从0开始,则可以正常写入。
如果epoch版本一致,则会查询缓存中最近一次序列号是否连续,不连续则会返回错误,否则正常写入。
5.1 消费组
多个消费者可以组成一个消费组,每个消费者只属于一个消费组。消费组订阅主题的每个分区只会分配给该消费组中的某个消费者处理,不同的消费组之间彼此隔离无依赖。同一个消息只会被消费组中的一个消费者消费,如果想要让同一个消息被多个消费者消费,那么每个消费者需要属于不同的消费组,且对应消费组中只有该一个消费者,消费组的引入可以实现消费的“独占”或“广播”效果。
消费组下可以有多个消费者,个数支持动态变化。
消费组订阅主题下的每个分区只会分配给消费组中的一个消费者。
group.id标识消费组,相同则属于同一消费组。
不同消费组之间相互隔离互不影响。
如图所示,消费组1中包含两个消费者,其中消费者1分配消费分区0,消费者2分配消费分区1与分区2。此外消费组的引入还支持消费者的水平扩展及故障转移,比如从上图我们可以看出消费者2的消费能力不足,相对消费者1来说消费进度比较落后,我们可以往消费组里面增加一个消费者以提高其整体的消费能力,如下图所示。
假设消费者1所在机器出现宕机,消费组会发送重平衡,假设将分区0分配给消费者2进行消费,如下图所示。同个消费组中消费者的个数不是越多越好,最大不能超过主题对应的分区数,如果超过则会出现超过的消费者分配不到分区的情况,因为分区一旦分配给消费者就不会再变动,除非组内消费者个数出现变动而发生重平衡。
5.2 消费位移
5.2.1 消费位移主题
Kafka 0.9开始将消费端的位移信息保存在集群的内部主题(__consumer_offsets)中,该主题默认为50个分区,每条日志项的格式都是:<TopicPartition, OffsetAndMetadata>,其key为主题分区主要存放主题、分区以及消费组信息,value为OffsetAndMetadata对象主要包括位移、位移提交时间、自定义元数据等信息。
只有消费组往kafka中提交位移才会往这个主题中写入数据,如果消费端将消费位移信息保存在外部存储,则不会有消费位移信息,下面可以通过kafka-console-consumer.sh脚本查看主题消费位移信息。
(左右滑动查看完整代码)
5.2.2 消费位移自动提交
消费端可以通过设置参数enable.auto.commit来控制是自动提交还是手动,如果值为true则表示自动提交,在消费端的后台会定时的提交消费位移信息,时间间隔由auto.commit.interval.ms(默认为5秒)。
但是如果设置为自动提交会存在以下几个问题:
可能存在重复的位移数据提交到消费位移主题中,因为每隔5秒会往主题中写入一条消息,不管是否有新的消费记录,这样就会产生大量的同key消息,其实只需要一条,因此需要依赖前面提到日志压缩策略来清理数据。
重复消费,假设位移提交的时间间隔为5秒,那么在5秒内如果发生了rebalance,则所有的消费者会从上一次提交的位移处开始消费,那么期间消费的数据则会再次被消费。
5.2.3 消费位移手动提交
手动提交需要将enable.auto.commit值设置为false,然后由业务消费端来控制消费进度,手动提交又分为以下三种类型:
同步手动提交位移:如果调用的是同步提交方法commitSync(),则会将poll拉取的最新位移提交到kafka集群,提交成功前会一直等待提交成功。
异步手动提交位移:调用异步提交方法commitAsync(),在调用该方法之后会立刻返回,不会阻塞,然后可以通过回调函数执行相关的异常处理逻辑。
指定提交位移:指定位移提交也分为异步跟同步,传参为Map<TopicPartition, OffsetAndMetadata>,其中key为消息分区,value为位移对象。
5.3 分组协调者
分组协调者(Group Coordinator)是一个服务,kafka集群中的每个节点在启动时都会启动这样一个服务,该服务主要是用来存储消费分组相关的元数据信息,每个消费组均会选择一个协调者来负责组内各个分区的消费位移信息存储,选择的主要步骤如下:
首选确定消费组的位移信息存入哪个分区:前面提到默认的__consumer_offsets主题分区数为50,通过以下算法可以计算出对应消费组的位移信息应该存入哪个分区partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)其中groupId为消费组的id,这个由消费端指定,groupMetadataTopicPartitionCount为主题分区数。
根据partition寻找该分区的leader所对应的节点broker,该broker的Coordinator即为该消费组的Coordinator。
5.4 重平衡机制
5.4.1 重平衡发生场景
以下几种场景均会触发重平衡操作:
新的消费者加入到消费组中。
消费者被动下线。比如消费者长时间的GC、网络延迟导致消费者长时间未向Group Coordinator发送心跳请求,均会认为该消费者已经下线并踢出。
消费者主动退出消费组。
消费组订阅的任意一个主题分区数出现变化。
消费者取消某个主题的订阅。
5.4.2 重平衡操作流程
重平衡的实现可以分为以下几个阶段:
查找Group Coordinator:消费者会从kafka集群中选择一个负载最小的节点发送GroupCoorinatorRequest请求,并处理返回响应GroupCoordinatorResponse。其中请求参数中包含消费组的id,响应中包含Coordinator所在节点id、host以及端口号信息。
Join group:当消费者拿到协调者的信息之后会往协调者发送加入消费组的请求JoinGroupRequest,当所有的消费者都发送该请求之后,协调者会从中选择一个消费者作为leader角色,然后将组内成员信息、订阅等信息发给消费者(响应格式JoinGroupResponse见下表),leader负责消费方案的分配。
JoinGroupRequest请求数据格式
名称 类型 说明
group_id String 消费者id
seesion_timeout int 协调者超过session_timeout指定的时间没有收到心跳消息,则认为该消费者下线
member_id String 协调者分配给消费者的id
protocol_type String 消费组实现的协议,默认为sonsumer
group_protocols List 包含此消费者支持的全部PartitionAssignor类型
protocol_name String PartitionAssignor类型
protocol_metadata byte[] 针对不同PartitionAssignor类型序列化后的消费者订阅信息,包含用户自定义数据userData
(左右滑动查看完整表格)
JoinGroupResponse响应数据格式式
名称 类型 说明
error_code short 错误码
generation_id int 协调者分配的年代信息
group_protocol String 协调者选择的PartitionAssignor类型
leader_id String Leader的member_id
member_id String 协调者分配给消费者的id
members Map集合 消费组中全部的消费者订阅信息
member_metadata byte[] 对应消费者的订阅信息
(左右滑动查看完整表格)
Synchronizing Group State阶段:当leader消费者完成消费方案的分配后会发送SyncGroupRequest请求给协调者,其他非leader节点也会发送该请求,只是请求参数为空,然后协调者将分配结果作为响应SyncGroupResponse发给各个消费者,请求及相应的数据格式如下表所示:
SyncGroupRequest请求数据格式
名称 类型 说明
group_id String 消费组的id
generation_id int 消费组保存的年代信息
member_id String 协调者分配的消费者id
member_assignment byte[] 分区分配结果
(左右滑动查看完整表格)
SyncGroupResponse响应数据格式
名称 类型 说明
error_code short 错误码
member_assignment byte[] 分配给当前消费者的分区
(左右滑动查看完整表格)
5.4.3 分区分配策略
Kafka提供了三个分区分配策略:RangeAssignor、RoundRobinAssignor以及StickyAssignor,下面简单介绍下各个算法的实现。
RangeAssignor:kafka默认会采用此策略进行分区分配,主要流程如下
假设一个消费组中存在两个消费者{C0,C1},该消费组订阅了三个主题{T1,T2,T3},每个主题分别存在三个分区,一共就有9个分区{TP1,TP2,…,TP9}。通过以上算法我们可以得到D=4,R=1,那么消费组C0将消费的分区为{TP1,TP2,TP3,TP4,TP5},C1将消费分区{TP6,TP7,TP8,TP9}。这里存在一个问题,如果不能均分,那么前面的几个消费者将会多消费一个分区。
将所有订阅主题下的分区进行排序得到集合TP={TP0,Tp1,…,TPN+1}。
对消费组中的所有消费者根据名字进行字典排序得到集合CG={C0,C1,…,CM+1}。
计算D=N/M,R=N%M。
消费者Ci获取消费分区起始位置=D*i+min(i,R),Ci获取的分区总数=D+(if (i+1>R)0 else 1)。
RoundRobinAssignor:使用该策略需要满足以下两个条件:1) 消费组中的所有消费者应该订阅主题相同;2) 同一个消费组的所有消费者在实例化时给每个主题指定相同的流数。
对所有主题的所有分区根据主题+分区得到的哈希值进行排序。
对所有消费者按字典排序。
通过轮询的方式将分区分配给消费者。
StickyAssignor:该分配方式在0.11版本开始引入,主要是保证以下特性:1) 尽可能的保证分配均衡;2) 当重新分配时,保留尽可能多的现有分配。其中第一条的优先级要大于第二条。
最后介绍了消费端的相关原理,消费组机制实现了消费端的消息隔离,既有广播也有独占的场景支持,而重平衡机制则保证的消费端的健壮性与扩展性。
参考文献
二、工作队列Work Queue
三、发布/订阅 Publish/Subscribe
四、路由Routing
五、Topic类型的exchange
六、rabbitmq部分封装代码及装备工作
一、单发单收
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。
单发单收模式下:一发一收
发送端只需要创建队列,然后向队列发送消息。
接收端也需要创建队列,因为如果接收端先启动,没有此队列就会报错,虽然发送端和接收端都创建此队列,但rabbitmq还是很智能的,它只会创建一次。
需要注意的地方:
1.发送端和接收端都需要创建同名队列
2.接收端指定从这个同名队列中接收消息
发送端
package main
import (
“RabbitMQ”
“time”
)
func main(){
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
send_mq := rabbitMQ.New(“amqp://user:password@ip:port/”,”hello”)
for{
time.Sleep(1)
send_mq.Send(“Hello World!”)
}
}
接收端
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
}
}
二、工作队列Work Queue
工作队列和单发单收模式比起来,接收端可以有多个,接收端多了以后就会出现数据分配问题,发过来的数据到底该被哪个接收端接收,所以有两种模式:
公平分发:每个接收端接收消息的概率是相等的,发送端会循环依次给每个接收端发送消息,图一是公平分发。
公平派遣:保证接收端在处理完某个任务,并发送确认信息后,RabbitMQ才会向它推送新的消息,在此之间若是有新的消息话,将会被推送到其它接收端,若所有的接收端都在处理任务,那么就会等待,图二为公平派遣。
图一:
图二:
公平分发模式下的发送端和接收端
发送端
package main
import (
“RabbitMQ”
“strconv”
“strings”
“time”
)
func main(){
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
send_mq := rabbitMQ.New(“amqp://user:password@ip:port/”,”hello”)
i := 0
for{
time.Sleep(1)
greetings := []string{“Helloworld!”,strconv.Itoa(i)}
send_mq.Send(strings.Join( greetings, “ “))
i = i+1
}
}
接收端1
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie 1 Received a message: %s", d.Body)
}
}()
}
}
接收端2
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie 1 Received a message: %s", d.Body)
}
}()
}
}
公平派遣模式下的发送端和接收端
公平派遣模式下发送端与公平分发相同,接收端只需要加一端配置代码
我们可以将预取计数设置为1。这告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给不忙的下一个工作程序。
//配置队列参数
func (q *RabbitMQ)Qos(){
e := q.channel.Qos(1,0,false)
failOnError(e,”无法设置QoS”)
}
接收端
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
receive_mq := rabbitMQ.New(“amqp://user:password@ip:port/”,”hello”)
//配置公平派遣
receive_mq.Qos()
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf(“recevie 2 Received a message: %s”, d.Body)
}
}()
}
}
官方在这里介绍了出现以下两种问题的解决办法:
1.当接收者挂掉的时候,我们将丢失发送给接收端还没有处理的消息。
2.当rabbitmq服务器挂了,我们怎么保证我们的消息不丢失。
具体参考:https://www.rabbitmq.com/tutorials/tutorial-two-go.html
三、发布/订阅 Publish/Subscribe
发布订阅模式下多了一个概念:exchange,如何理解这个exchange,exchange的作用就是类似路由器,发送端发送消息需要带有routing key 就是路由键,服务器会根据路由键将消息从交换器路由到队列上去,所以发送端和接收端之间有了中介。
exchange有多个种类:direct,fanout,topic,header(非路由键匹配,功能和direct类似,很少用)。
首先介绍exchange下的fanout exchange,它会将发到这个exchange的消息广播到关注此exchange的所有接收端上。
广播模式下(1:N):
发送端连接到rabbitmq后,创建exchange,需要指定交换机的名字和类型,fanout为广播,然后向此exchange发送消息,其它就不用管了。
接收端的执行流程在程序备注中。
注意:广播模式下的exchange是发送端是不需要带路由键的哦。
package main
import (
“RabbitMQ”
“strconv”
“strings”
“time”
)
func main(){
ch := rabbitMQ.Connect(“amqp://user:password@ip:port/”)
rabbitMQ.NewExchange(“amqp://user:password@ip:port/”,”exchange1”,”fanout”)
i := 0
for{
time.Sleep(1)
greetings := []string{“Helloworld!”,strconv.Itoa(i)}
ch.Publish(“exchange1”,strings.Join( greetings, “ “),””)
i = i+1
}
}
接收端1
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
// 1.接收者,首先创建自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
//3
// 队列绑定到exchange
receive_mq.Bind("exchange1","")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
} } 接收端2
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
// 1.接收者,首先创建自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
//3
// 队列绑定到exchange
receive_mq.Bind("exchange1","")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
} }
四、路由Routing
路由模式其实就是全值匹配模式(direct),发送端发送消息需要带有路由键,就是下面发送端程序的routing key1,是一个字符串,发送端发给exchange,路由模式下的exchange会匹配这个路由键,如下面这个图,发送者发送时带有orange此路由键时,这条消息只会被转发给Q1队列,如果路由键没有匹配上的怎么办?,全值匹配,没有匹配到,那么所有接收者都接收不到消息,消息只会发送给匹配的队列,接收端的路由键是绑定exchange的时候用的。
注意:接收队列可以绑定多个路由键到exchange上,比如下面,当发送路由键为black,green,会被Q2接收。
发送端
package main
import (
“RabbitMQ”
“strconv”
“strings”
“time”
)
func main(){
ch := rabbitMQ.Connect(“amqp://user:password@ip:port/”)
rabbitMQ.NewExchange(“amqp://user:password@ip:port/”,”exchange”,”direct”)
i := 0
for{
time.Sleep(1)
greetings := []string{“Helloworld!”,strconv.Itoa(i)}
if i%2 ==1 {
//如果是奇数
ch.Publish(“exchange”,strings.Join( greetings, “ “),”routing key1”)
} else{
ch.Publish(“exchange”,strings.Join( greetings, “ “),”routing key2”)
}
i = i+1
}
}
接收端1
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")
//3
receive_mq.Bind("exchange","routing key1")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
} } 接收端2
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")
//3
receive_mq.Bind("exchange","routing key2")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
} }
五、Topic类型的exchange
前面的direct是全值匹配,那么topic就可以部分匹配,又可以全值匹配,比direct更加灵活。
消息发送到topic类型的exchange上时不能随意指定routing_key(一定是指由一系列由点号连接单词的字符串,单词可以是任意的,但一般都会与消息或多或少的有些关联)。Routing key的长度不能超过255个字节。
Binding key也一定要是同样的方式。Topic类型的exchange就像一个直接的交换:一个由生产者指定了确定routing key的消息将会被推送给所有Binding key能与之匹配的消费者。然而这种绑定有两种特殊的情况:
*(星号):可以(只能)匹配一个单词
#(井号):可以匹配多个单词(或者零个)
下边来举个例子:
在这个例子中,我们将会发送一些描述动物的消息。Routing key的第一个单词是描述速度的,第二个单词是描述颜色的,第三个是描述物种的:“
这里我们创建三个Binding:Binding key为”.orange.”的Q1,和binding key为”..rabbit”和”lazy.#”的Q2。
这些binding可以总结为:
Q1对所有橘色的(orange)的动物感兴趣;
Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。
一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为”lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。
如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:”orange”和”quick.orange.male.rabbit”。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。
Topic类型的exchange:
Topic类型的exchange是很强大的,也可以实现其它类型的exchange。
当一个队列被绑定为binding key为”#”时,它将会接收所有的消息,此时和fanout类型的exchange很像。
当binding key不包含”*”和”#”时,这时候就很像direct类型的exchange。
发送端
package main
import (
“RabbitMQ”
“time”
)
func main(){
ch := rabbitMQ.Connect(“amqp://user:password@ip/”)
rabbitMQ.NewExchange(“amqp://user:password@ip/”,”exchange”,”topic”)
for{
time.Sleep(1)
ch.Publish(“exchange”,”hello world”,”lazy.brown.fox”)
}
}
接收端
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.orange.*")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie1 Received a message: %s", d.Body)
}
}()
} } 接收端2
package main
import (
rabbitMQ “RabbitMQ”
“log”
)
func main(){
// 1.接收者,首先自己队列
// 2.创建交换机
// 3.将自己绑定到交换机上
// 4.接收交换机上发过来的消息
//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字
//1
receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
//2
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
//3
receive_mq.Bind("exchange","*.*.rabbit")
receive_mq.Bind("exchange","lazy.#")
//4
for{
//接收消息时,指定
msgs := receive_mq .Consume()
go func() {
for d := range msgs {
log.Printf("recevie2 Received a message: %s", d.Body)
}
}()
} }
六、rabbitmq部分封装代码及准备工作
目录参考:
准备工作:
1.我们再创建go项目时,首先指定gopath目录,然后在目录下创建bin、src、pkg目录。
2.下载github.com/streadway/amqp包,会自动添加到项目的pkg目录下。
go get github.com/streadway/amqp
3.在rabbitmq服务器上创建用户,指定管理员,并赋予访问权限。
4.rabbitmq封装
package rabbitMQ
import (
“encoding/json”
“github.com/streadway/amqp”
“log”
)
//声明队列类型
type RabbitMQ struct {
channel *amqp.Channel
Name string
exchange string
}
//连接服务器
func Connect(s string) * RabbitMQ{
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,”连接Rabbitmq服务器失败!”)
ch ,e :=conn.Channel()
failOnError(e,”无法打开频道!”)
mq := new(RabbitMQ)
mq.channel =ch
return mq
}
//初始化单个消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string,name string) * RabbitMQ{
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,”连接Rabbitmq服务器失败!”)
ch ,e :=conn.Channel()
failOnError(e,”无法打开频道!”)
q,e := ch.QueueDeclare(
name,//队列名
false,//是否开启持久化
true,//不使用时删除
false, //排他
false, //不等待
nil, //参数
)
failOnError(e,”初始化队列失败!”)
mq := new(RabbitMQ)
mq.channel =ch
mq.Name =q.Name
return mq }
//批量初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字列表
//配置队列参数
func (q *RabbitMQ)Qos(){
e := q.channel.Qos(1,0,false)
failOnError(e,”无法设置QoS”)
}
//配置交换机参数
//初始化交换机
//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string,name string,typename string){
//连接rabbitmq
conn,e := amqp.Dial(s)
failOnError(e,”连接Rabbitmq服务器失败!”)
ch ,e :=conn.Channel()
failOnError(e,”无法打开频道!”)
e = ch.ExchangeDeclare(
name, // name
typename, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(e,”初始化交换机失败!”)
}
//删除交换机
func (q *RabbitMQ)ExchangeDelete(exchange string){
e := q.channel.ExchangeDelete(exchange,false,true)
failOnError(e,”绑定队列失败!”)
}
//绑定消息队列到哪个exchange
func (q *RabbitMQ)Bind(exchange string,key string){
e := q.channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(e,”绑定队列失败!”)
q.exchange = exchange
}
//向消息队列发送消息
//Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(body interface{}){
str,e := json.Marshal(body)
failOnError(e,”消息序列化失败!”)
e = q.channel.Publish(
“”,//交换
q.Name,//路由键:当前队列的名字
false, //必填
false, //立即
amqp.Publishing{
ReplyTo:q.Name,
Body:[]byte(str),
})
msg := "向队列:"+q.Name+"发送消息失败!"
failOnError(e,msg) }
//向exchange发送消息
//Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string,body interface{},key string) {
str,e := json.Marshal(body)
failOnError(e,”消息序列化失败!”)
e = q.channel.Publish(
exchange,
key,
false,
false,
amqp.Publishing{ReplyTo:q.Name,
Body:[]byte(str)},
)
failOnError(e,”向路由发送消息失败!”)
}
//接收某个消息队列的消息
func (q * RabbitMQ) Consume() <-chan amqp.Delivery{
c,e :=q.channel.Consume(
q.Name,//指定从哪个队列中接收消息
“”,
true,
false,
false,
false,
nil,
)
failOnError(e,”接收消息失败!”)
return c
}
//关闭队列连接
func (q *RabbitMQ) Close() {
q.channel.Close()
}
//错误处理函数
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf(“%s: %s”, msg, err)
}
}
消息队列
1
2
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
消息队列的使用场景
1
2
3
从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、
发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。
RabbitMQ 特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
生产者与消费者
RabbitMQ是为了应用程序与应用程序之间的通信
RabbitMQ是消息投递服务,RabbitMQ在应用程序之间扮演路由器的角色
应用程序可以发送信息,发送消息给RabbitMQ(代理服务器)
应用程序可以接收信息,从RabbitMQ(代理服务器)接收信息
当一个应用程序连接到RabbitMQ时,它只有一个身份,要么是生产者,或者消费者。
生产者创建消息,然后发布到代理服务器RabbitMQ,消息包含两部分:有效载荷与标签。
消费者连接到代理服务器,并订阅到队列上,我们可以把消息队列想象成一个具体邮箱,每当消息到达特定的邮箱时,RabbitMQ会将其发送给其中一个订阅/监听的消费者。
注意:消费者接收消息里面只有有效载荷,没有消息的标签,RabbitMQ不会告诉你是谁是消息的生产者,就好像你拿着一封信,上面并没有署名。
整个消息传递的过程:生产者创建消息,消费者接收消息,两者是可以切换的。
RabbitMQ 内部结构
· Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
· Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
· Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
· Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
· Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
· Connection
网络连接,比如一个TCP连接。
· Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
· Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
· Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
· Broker
表示消息队列服务器实体。
应用程序与代理服务器的通信机制
AMQP:
AMQP,高级消息队列协议,以解决众多的消息队列需求和拓扑结构问题
而RabbitMQ是唯一实现了AMQP标准的代理服务器
信道:
生产者和消费者都需要连接到RabbitMQ,才能消费或者发布消息。
应用程序与RabbitMQ之间建立一条TCP连接,在TCP连接上就可以创建一条AMOP信道,信道是建立在TCP连接内的虚拟连接。
每一条信道都由一个唯一标识的ID(AMQP库会帮你记住ID),发布消息,订阅队列、接收消息都是在信道里完成的。
信道存在的意义:
我们知道建立和销毁TCP会话是很昂贵的开销,比如现在发送者大量的发送消息给RabbitMQ,如果现在每条信息发送都是一个线程来发送信息,难道每个线程发送信息都需要与RabbitMQ建立一条TCP连接吗?
这里面就有了一种办法:在现成的TCP连接上建立信道,信道与信道之间相互独立,不会给操作系统的TCP栈造成额外负担,在一条TCP连接上创建信道是没有限制的,所以这里使用信道可以创建并行的传输层,不会受到TCP层的连接限制。
AMQP消息路由:队列、交换器和绑定
1
AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
AMQP 的消息路由过程
队列
队列:消息最终到达队列中并等待消费。
消费者通过两种方式从队列中接收消息:
(1)
AMQP的basic.consume命令订阅。
将信道置为接收模式,直到取消队列的订阅为止,订阅了队列,消费者(或者拒绝)最近接收的消息后,就能从队列中自动接收下一条消息。如果消费者处理队列消息,并且需要消息一到达队列就自动接收的话,使用basic.consume,说白了就是持续接收信息。
(2)
AMQP的basic.get命令订阅
只想从队列中获得单条消息而不是持续的订阅。总结:basic.get命令会订阅队列,获得单条消息,然后取消订阅,所以消费者应该使用basic.consume来实现高吞吐量
如果只有一个消费者订阅了队列,消息会立即发送给这个订阅者。
如果没有消费者订阅队列,消息会在队列里等待,一旦有消费者订阅了对了,那么队列上的消息就会发送给消费者
如果有多个消费者订阅到同一个队列上,消息是如何分发的?循环依次发送给消费者,一次只发送给一个消费者。
交换器和绑定
消息实际上投递到的是交换机,然后根据确定的规则,RabbitMQ将会决定消息该投递到哪个队列,这些规则被称为路由键,队列通过路由键绑定到交换器。
当你发消息到代理服务器时,即便路由键是空的,RabbitMQ也会将其和使用的路由键进行匹配。如果路由的消息不匹配任何绑定模式,消息将会进入黑洞。
交换机在队列与消息中间起到了中间层的作用,有了交换机我们可以实现更灵活的功能,RabbitMQ中有三种常用的交换机类型:
direct: 如果路由键匹配,消息就投递到对应的队列
fanout:投递消息给所有绑定在当前交换机上面的队列
topic:允许实现有趣的消息通信场景,使得5不同源头的消息能够达到同一个队列。topic队列名称有两个特殊的关键字。
o* 可以替换一个单词
o# 可以替换所有的单词
可以理解,direct为1v1, fanout为1v所有,topic比较灵活,可以1v任意。
(交换机)Exchange 类型
1
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
direct
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
fanout
1
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic
1
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
虚拟主机与隔离
每一个vhost本质都是一个mini版的Rabbitmq服务器,拥有自己的队列、交换器和绑定,还拥有自己的权限机制。
Vhost与Rabbit就像虚拟机与物理服务器一样。
它们通过在各自实例间提供逻辑分离,允许你为不同的应用程序安全保密运行数据。
这很有用,它能将同一个Rabbit的众多客户区分开来,又可以避免队列和交换器的命名冲突。否则你可能不得不运行多个Rabbit,这将带来更多管理问题,相反,你可以只运行一个Rabbit,然后按需启动或关闭vhost。
当你在Rabbit里创建一个用户,用户通常会被指派给至少一个vhost,并且只能访问被指派vhost内的队列、交换器和绑定。Vhost之间是相互隔离的。
你将无法将vhost banana_tree上的交换器绑定到vhostoak_tree中的队列去。这确保了安全性与可移植性。
注意:当你在RabbitMQ集群上创建vhost,整个集群上都会创建该vhost,vhost不仅消除了基础架构中每一层运行一个RabbitMQ服务器的需要,同样避免了为每一层创建不同的集群。
建vhost:rabbitmqctl add_vhost [vhost_name]
删除vhost:rabbitmqctl delete_vhost [vhost_name]
查看服务器运行的vhost:rabbitmqctl list_vhost