在server.properties中配置:
auto.create.topics.enable=true
num.partitions=8
default.replication.factor=3
https://www.orchome.com/10428
1,proto: cannot parse invalid wire-format data
原因:kafka消息编码和解码方式不一样,编码用json,解码用proto
如果是json,可以用base64 -d 查看
https://blog.csdn.net/aashuii/article/details/117410309
Local: Errorneous state 原因:kafka 在seek之前没有订阅和Assign
https://github.com/confluentinc/confluent-kafka-dotnet/issues/1061
seek poll的方式拉取数据的时候,如果auto comit 是true只能第一次拉到,false才能重复;拉取
https://pkg.go.dev/github.com/confluentinc/confluent-kafka-go/kafka#section-readme
https://github.com/confluentinc/confluent-kafka-dotnet/issues/827
topic := “test-topic”
offsets, err := c.OffsetsForTimes([]kafka.TopicPartition, 100000000)
fmt.Println(offsets[0], err) // return latest offset
Timestamps are in milliseconds since the epoch, so try 1553000712000
https://github.com/confluentinc/confluent-kafka-go/issues/312
You can use Seek() with Subscribe() but you need to wait for the assignment to be assigned with Assign(), and without registering a AssignedPartitions handler you won’t really know when the implicit Assign() kicks in (or if you call Assign() yourself from the handler).
https://github.com/confluentinc/confluent-kafka-go/issues/121
https://github.com/confluentinc/confluent-kafka-go/issues/35
https://github.com/confluentinc/confluent-kafka-go/issues/14
https://github.com/confluentinc/confluent-kafka-go/issues/312
Assign(partitions []TopicPartition) (err error)
分配一组要使用的 partition
这个 api 会覆盖之前分配过的
Seek(partition TopicPartition, timeoutMs int) error
获取指定 partition 的 offset
如果timeoutMs不是 0,则调用将等待这么长时间以执行查找。如果超时到达,内部状态将未知,并且此函数返回 ErrTimedOut。
如果timeoutMs 为 0,它将发起查找,但立即返回,不报告任何错误(例如,异步)。
Seek()只能用于已经使用的分区(通过 Assign()或隐式使用通过自平衡订阅())。
要设置起始偏移量,最好使用 Assign()并为每个分区提供一个起始偏移量。
https://whiteccinn.github.io/2020/06/01/Golang/confluent-kafka-go/
https://stackoverflow.com/questions/72058964/kafka-consumer-offset-export-golang-sharma-or-confluent-kafka-go-lib
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic xx –partition 0 –offset 10
ev := w.consumer.Poll(int(timeout)). 这个正常
as, err := w.consumer.Assignment() 这个是1000
本地测试正常
const _Ciconst_RD_KAFKA_OFFSET_STORED = -0x3e8
ev := w.consumer.Poll(int(timeout)) e.TopicPartition.Offset++ r.consumer.Assign([]kafka.TopicPartition{e.TopicPartition})
KafkaConsumer#seekToBeginning的使用没有前置条件;
KafkaConsumer#seek的使用有前置条件,就是将AUTO_OFFSET_RESET_CONFIG设置成earliest;
https://blog.csdn.net/a1240466196/article/details/113342100
实战的时候发现poll会提交偏移量,即使设置auto.commit 为false,也会提交,所以问了解决这个问题每次poll前都assign偏移量;
注意每个分支都需要提交
判断如果poll返回为null的时候也需要,需要注意的是要限制poll的最大次数为达到high water mark的时候就停止,否则会死循环
https://pkg.go.dev/github.com/confluentinc/confluent-kafka-go/kafka#section-readme
https://github.com/edenhill/librdkafka/blob/7ae0fbda54e09cbf73057ddd6f1df42603585d28/INTRODUCTION.md