kafka OffsetsForTimes

在server.properties中配置:



自动创建主题


auto.create.topics.enable=true


默认主题的分区数


num.partitions=8


默认分区副本


default.replication.factor=3

https://www.orchome.com/10428



1,proto: cannot parse invalid wire-format data



原因:kafka消息编码和解码方式不一样,编码用json,解码用proto



如果是json,可以用base64 -d 查看
https://blog.csdn.net/aashuii/article/details/117410309



Local: Errorneous state 原因:kafka 在seek之前没有订阅和Assign



https://github.com/confluentinc/confluent-kafka-dotnet/issues/1061



seek poll的方式拉取数据的时候,如果auto comit 是true只能第一次拉到,false才能重复;拉取
https://pkg.go.dev/github.com/confluentinc/confluent-kafka-go/kafka#section-readme
https://github.com/confluentinc/confluent-kafka-dotnet/issues/827



topic := “test-topic”
offsets, err := c.OffsetsForTimes([]kafka.TopicPartition, 100000000)

fmt.Println(offsets[0], err) // return latest offset



Timestamps are in milliseconds since the epoch, so try 1553000712000



https://github.com/confluentinc/confluent-kafka-go/issues/312



You can use Seek() with Subscribe() but you need to wait for the assignment to be assigned with Assign(), and without registering a AssignedPartitions handler you won’t really know when the implicit Assign() kicks in (or if you call Assign() yourself from the handler).



https://github.com/confluentinc/confluent-kafka-go/issues/121



https://github.com/confluentinc/confluent-kafka-go/issues/35



https://github.com/confluentinc/confluent-kafka-go/issues/14



https://github.com/confluentinc/confluent-kafka-go/issues/312



Assign(partitions []TopicPartition) (err error)
分配一组要使用的 partition
这个 api 会覆盖之前分配过的



Seek(partition TopicPartition, timeoutMs int) error
获取指定 partition 的 offset
如果timeoutMs不是 0,则调用将等待这么长时间以执行查找。如果超时到达,内部状态将未知,并且此函数返回 ErrTimedOut。
如果timeoutMs 为 0,它将发起查找,但立即返回,不报告任何错误(例如,异步)。
Seek()只能用于已经使用的分区(通过 Assign()或隐式使用通过自平衡订阅())。
要设置起始偏移量,最好使用 Assign()并为每个分区提供一个起始偏移量。



https://whiteccinn.github.io/2020/06/01/Golang/confluent-kafka-go/



https://stackoverflow.com/questions/72058964/kafka-consumer-offset-export-golang-sharma-or-confluent-kafka-go-lib



kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic xx –partition 0 –offset 10



ev := w.consumer.Poll(int(timeout)). 这个正常
as, err := w.consumer.Assignment() 这个是1000
本地测试正常



const _Ciconst_RD_KAFKA_OFFSET_STORED = -0x3e8



ev := w.consumer.Poll(int(timeout)) e.TopicPartition.Offset++ r.consumer.Assign([]kafka.TopicPartition{e.TopicPartition})



KafkaConsumer#seekToBeginning的使用没有前置条件;
KafkaConsumer#seek的使用有前置条件,就是将AUTO_OFFSET_RESET_CONFIG设置成earliest;



https://blog.csdn.net/a1240466196/article/details/113342100



实战的时候发现poll会提交偏移量,即使设置auto.commit 为false,也会提交,所以问了解决这个问题每次poll前都assign偏移量;



注意每个分支都需要提交
判断如果poll返回为null的时候也需要,需要注意的是要限制poll的最大次数为达到high water mark的时候就停止,否则会死循环



https://pkg.go.dev/github.com/confluentinc/confluent-kafka-go/kafka#section-readme



https://github.com/edenhill/librdkafka/blob/7ae0fbda54e09cbf73057ddd6f1df42603585d28/INTRODUCTION.md


Category kafka