日志
如果一个topic的名称为”my_topic”,它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中;日志文件中保存了一序列”log entries”(日志条目),每个log entry格式为”4个字节的数字N表示消息的长度” + “N个字节的消息内容”;每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为”最小offset”.kafka.例如”00000000000.kafka”;其中”最小offset”表示此segment中起始消息的offset.
消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。对于Kafka而言,消息被发送至Topic中,而Topic又分成了多个分区(Partition),每一个Partition都有一个预写式的日志文件,虽然Partition可以继续细分为若干个段文件(Segment),但是对于上层应用来说可以将Partition看成最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。
LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。
要计算Kafka中某个消费者的滞后量很简单,首先看看其消费了几个Topic,然后针对每个Topic来计算其中每个Partition的Lag
Lag=HW - ConsumerOffset
首先来说说ConsumerOffset,Kafka中有两处可以存储,一个是Zookeeper,而另一个是”__consumer_offsets这个内部topic中,前者是0.8.x版本中的使用方式,但是随着版本的迭代更新,现在越来越趋向于后者。就拿1.0.0版本来说,虽然默认是存储在”__consumer_offsets”中,但是保不齐用于就将其存储在了Zookeeper中了。这个问题倒也不难解决,针对两种方式都去拉取,然后哪个有值的取哪个。不过这里还有一个问题,对于消费位移来说,其一般不会实时的更新,而更多的是定时更新,这样可以提高整体的性能。那么这个定时的时间间隔就是ConsumerOffset的误差区间之一。
再来说说HW,其也是Kafka中Partition的一个状态。有可能你会察觉到在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,但是这个值不是LEO而是HW。
https://blog.csdn.net/u013256816/article/details/79955578
https://www.pianshen.com/article/6905168315/
https://cwiki.apache.org/confluence/display/KAFKA/JMX+Reporters
Producer 的 offset 是通过 JMX 轮询获得的,Consumer 的 offset 是从 kafka 内的 __consumer_offsets 的 topic 中直接读取到的,很明显轮询获取 offset 比 直接从 topic 拿 offset 慢一点,也就可能会出现 Lag 计算后为负数的情况。
https://www.orchome.com/6788
kafka manager工具上有个指标lag,他是什么意思?正负代表什么意思?
先看单词意思:滞后,(时间上的)间隔;
正数:就是kafka数据积压了,往kafka进数据的速度,大于这个数据被消费的速度。a-b就是正数了。供大于求。
负数:就是有时候,我刚刚取了a还没来得及做减法呢,b已经查、超过a了,导致结果是负数,说明kafka的消费者干活很快,分分钟就处理完消费的数据,供小于求。
0:生产者和消费者速率基本相当,说明2者都工作正常。
https://blog.csdn.net/qq_27093465/article/details/115325758?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_ecpm_v1~rank_v31_ecpm-1-115325758.pc_agg_new_rank&utm_term=kafka%E6%9F%A5%E7%9C%8B%E6%B6%88%E8%B4%B9%E7%BB%84lag%E5%80%BC%E4%B8%BA%E8%B4%9F%E6%95%B0&spm=1000.2123.3001.4430
https://blog.csdn.net/sand_clock/article/details/68486599
https://blog.csdn.net/sheep8521/article/details/89491372
在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
在partition中如何通过offset查找message
例如读取offset=368776的message,需要通过下面2个步骤查找。
第一步查找segment file
上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message
通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
offset存储方式
1、在kafka 0.9版本之后,kafka为了降低zookeeper的io读写,减少network data transfer,也自己实现了在kafka server上存储consumer,topic,partitions,offset信息将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。
2、将消费的 offset 存放在 Zookeeper 集群中。
3、将offset存放至第三方存储,如Redis, 为了严格实现不重复消费
https://www.cnblogs.com/ITtangtang/p/8027217.html
Kafka 的基本术语
消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。
主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序
https://www.cnblogs.com/cxuanBlog/p/11949238.html
https://juejin.cn/post/6938234746837139463
2.计算平均大小
平均大小=总消息大小/总offset
https://blog.csdn.net/qq_39002724/article/details/114577404
https://developer.huawei.com/consumer/cn/forum/topic/0202556784431820988?fid=23
Kafka 的使用场景
活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
流式处理:流式处理是有一个能够提供多种应用程序的领域。
限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。
https://segmentfault.com/a/1190000021138998
Zookeeper 协调控制#
管理broker与consumer的动态加入与离开。(Producer不需要管理,随便一台计算机都可以作为Producer向Kakfa Broker发消息)
触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一
个consumer group内的多个consumer的消费负载平衡。(因为一个comsumer消费一个或多个partition,一个partition只能被一个consumer消费)
维护消费关系及每个partition的消费信息。
https://www.cnblogs.com/hongdada/p/11642718.html
kafka里的lag单位是什么呢?
亲自测试后发现是条数,也就是消息的个数。