kafka 通过时间获取偏移量


confluent-kafka-go producter一个实例占用内存22M,所以尽量使用单例,另外不要用它默认的初始化channel参数1000000



https://github.com/confluentinc/confluent-kafka-go/blob/v1.5.2/kafka/producer.go#L510-L511



p.events = make(chan Event, eventsChanSize)
p.produceChannel = make(chan *Message, produceChannelSize)


interface 占用16字节
指针占用8字节


kafkaMsgConsumer, err := kafka.NewConsumer(&kafka.ConfigMap{
“bootstrap.servers”: kafkaServer,
“group.id”: “logsnap”,
“auto.offset.reset”: “latest”,
})
if err != nil {
log.Panicln(“Error creating Kafka consumer:”, err)
}
err = kafkaMsgConsumer.SubscribeTopics([]string{
kafkaMessageTopic}, nil)
if err != nil {
log.Println(“Cannot subscribe to Kafka topic”, *kafkaMessageTopic)
continue
}



firstLastOffset := []kafka.TopicPartition{
kafka.TopicPartition{
Topic: kafkaMessageTopic,
Partition: 0,
Offset: kafka.Offset(kre.FirstError.DateTime.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))),
},
kafka.TopicPartition{
Topic: kafkaMessageTopic,
Partition: 0,
Offset: kafka.Offset(kre.LastError.DateTime.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))),
},
}



tofs, err := kafkaMsgConsumer.OffsetsForTimes(firstLastOffset, -1)
if err != nil {
log.Println(“Error in kafkaMsgConsumer.OffsetsForTimes():”, err)
bailoutRetryKre(kre)
continue
}



offsetMin := tofs[0].Offset
offsetMax := tofs[1].Offset



log.Println(“Error batch”, kre.ErrorID, “for times:”, firstLastOffset, “translated to offsets”, tofs, “offsetMin:”, offsetMin, “offsetMax:”, offsetMax)



tp := kafka.TopicPartition{
Topic: kafkaMessageTopic,
Partition: 0,
Offset: offsetMin,
}
kafkaMsgConsumer.Seek(tp, 0)



// Then use ReadMessage() in a loop and stop looping when Offset > offsetMax



https://github.com/confluentinc/confluent-kafka-go/issues/639



https://pkg.go.dev/github.com/confluentinc/confluent-kafka-go/kafka#section-readme



Category golang