channel实现消息的批量处理

当消息量特别大时,使用kafka之类的message queue自然是首选,但更多的时候,我们想用更加轻量的方案来解决这个问题。



下面来详细分析一下技术需求,这个方案需要实现以下几点:



消息聚合后处理(最大条数为BatchSize)
延迟处理(延迟时间为LingerTime)
自定义错误处理
并发处理



带buffer的channel相当于一个FIFO的队列
多个常驻的goroutine来提高并发
goroutine之间是并行的,但每个goroutine内是串行的,所以对batch操作是不用加锁的。





var (
eventQueue = make(chan interface{}, 4)
batchSize = 8
workers = 2
lingerTime = 14 * time.Millisecond
batchProcessor = func(batch []interface{}) error {
fmt.Printf(“%+v \n”, batch)
return nil
}
errHandler = func(err error, batch []interface{}) {
fmt.Println(“some error happens”)
}
)



for i := 0; i < workers; i++ {
go func() {
var batch []interface{}
lingerTimer := time.NewTimer(0)
if !lingerTimer.Stop() {
<-lingerTimer.C
}
defer lingerTimer.Stop()



    for {
select {
case msg := <-eventQueue:
batch = append(batch, msg)
if len(batch) != batchSize {
if len(batch) == 1 {
lingerTimer.Reset(lingerTime)
}
break
}

if err := batchProcessor(batch); err != nil {
errHandler(err, batch)
}

if !lingerTimer.Stop() {
<-lingerTimer.C
}

batch = make([]interface{}, 0)
case <-lingerTimer.C:
if err := batchProcessor(batch); err != nil {
errHandler(err, batch)
}

batch = make([]interface{}, 0)
}
}
}() }


for i := 0; i < 100; i++ {
eventQueue <- i
time.Sleep(1 * time.Millisecond)
}



https://www.jianshu.com/p/02f6647c3912


Category golang