最近遇到批量刷新ES数据的需求,为了加快处理速度,那必须首选goroutine
了,但是众所周知,goroutine
的返回值和错误处理一直都让人难以捉摸,go
出去简单,怎么监测go
出去的结果是个问题。
sync.ErrGroup
在sync.WaitGroup
功能的基础上,增加了错误传递,以及在发生不可恢复的错误时取消整个goroutine
集合,或者等待超时。
具体的大家可以百度学习下。
目前使用goroutine
一般采用的是 channel
、 sync.WaitGroup
、context
,来实现各个协程之间的流程控制和消息传递,首选是channel
来获取处理结果,channel
参考:。
除了channel
,那么是否可以用并发安全的sync.Map
来存储结果,在所有的goroutine
执行完毕后,再统一获取处理结果呢?答案是可以的,sync.Map
就可以完美实现。
以下是使用goroutine
批量刷新ES数据,并获取处理结果的代码:
//定义要获取的返回值,成功数量,失败数量,失败id集合
succeededNums, failedNums:= 0, 0,
errData:=""
var syncMap sync.Map
wg := sync.WaitGroup{}
//控制goroutine数量,保证同时只有10个goroutine
chan1 := make(chan struct{}, 10)
for {
//业务逻辑
// ....
//goroutine加速,chan1写满,则阻塞。等待之前的goroutine释放才能继续循环
chan1 <- struct{}{}
wg.Add(1)
go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
defer func() {
if err1 := recover(); err1 != nil { //产生了panic异常
log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
}
wg.Done() //每个goroutine执行完毕则释放
return
}()
bulkRequest := esClient.Bulk()
//ES使用bulk方法批量刷新数据
bulkResByAssetInfo := new(EsBulkDataRes)
bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
if err != nil {
log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
return
}
//累加执行结果到sync.map,保证并发安全
tempMap := make(map[string]interface{})
tempMap["successNums"] = bulkResByAssetInfo.SucceededNums
tempMap["failedNums"] = bulkResByAssetInfo.FailedNums
tempMap["errData"] = bulkResByAssetInfo.ErrData
//每次取循环的的最大id,作为syncMap的key
syncMap.Store(test[nums-1].ID, tempMap)
//执行完毕再释放channel
<-chan1
}(ctx, targetIndexName, esClient)
}
wg.Wait()
//刷新结束,写入通知,通知内容包括,遍历sync.map,获取返回值
syncMap.Range(func(key, value interface{}) bool {
val := value.(map[string]interface{})
succeededNums += val["successNums"].(int)
failedNums += val["failedNums"].(int)
errData += val["errData"].(string)
return true
})
//打印结果
fmt.Println("成功数量:",succeededNums)
fmt.Println("失败数量:",failedNums)
fmt.Println("失败id:",errData)
(1)写入处理结果到map
(2) 写入map到sync.Map中,注意key不要重复
(3)使用Range来循环sync.Map,获取处理结果,并累加
这块主要是通过设置channel
的长度来实现的。
(1)设定channel长度,循环开始每生成一个goroutine则写入一次channel
(2) channel写满则阻塞
(3)goroutine执行完毕,释放channel
(4) for循环中继续写入channel,保证同时执行的goroutine只有10个
1、需要对value
做断言处理,这个是interface{}
的特性决定的
2、大家都知道sync.Map
适合读多写少的场景,博主这里因为是跑脚本,所以使用sync.Map
也无伤大雅,大家要追求性能的话,可以看一下currentMap
的实现,通过hash
分桶,减小锁的粒度来提升性能。
end