confluent-kafka-go producter一个实例占用内存22M,所以尽量使用单例,另外不要用它默认的初始化channel参数1000000
https://github.com/confluentinc/confluent-kafka-go/blob/v1.5.2/kafka/producer.go#L510-L511
p.events = make(chan Event, eventsChanSize)
p.produceChannel = make(chan *Message, produceChannelSize)
interface 占用16字节
指针占用8字节
kafkaMsgConsumer, err := kafka.NewConsumer(&kafka.ConfigMap{
“bootstrap.servers”: kafkaServer,
“group.id”: “logsnap”,
“auto.offset.reset”: “latest”,
})
if err != nil {
log.Panicln(“Error creating Kafka consumer:”, err)
}
err = kafkaMsgConsumer.SubscribeTopics([]string{kafkaMessageTopic}, nil)
if err != nil {
log.Println(“Cannot subscribe to Kafka topic”, *kafkaMessageTopic)
continue
}
firstLastOffset := []kafka.TopicPartition{
kafka.TopicPartition{
Topic: kafkaMessageTopic,
Partition: 0,
Offset: kafka.Offset(kre.FirstError.DateTime.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))),
},
kafka.TopicPartition{
Topic: kafkaMessageTopic,
Partition: 0,
Offset: kafka.Offset(kre.LastError.DateTime.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))),
},
}
tofs, err := kafkaMsgConsumer.OffsetsForTimes(firstLastOffset, -1)
if err != nil {
log.Println(“Error in kafkaMsgConsumer.OffsetsForTimes():”, err)
bailoutRetryKre(kre)
continue
}
offsetMin := tofs[0].Offset
offsetMax := tofs[1].Offset
log.Println(“Error batch”, kre.ErrorID, “for times:”, firstLastOffset, “translated to offsets”, tofs, “offsetMin:”, offsetMin, “offsetMax:”, offsetMax)
tp := kafka.TopicPartition{
Topic: kafkaMessageTopic,
Partition: 0,
Offset: offsetMin,
}
kafkaMsgConsumer.Seek(tp, 0)
// Then use ReadMessage() in a loop and stop looping when Offset > offsetMax
https://github.com/confluentinc/confluent-kafka-go/issues/639
https://pkg.go.dev/github.com/confluentinc/confluent-kafka-go/kafka#section-readme