您的当前位置:首页正文

golang控制goroutine数量以及获取处理结果

2024-12-01 来源:个人技术集锦

一、前言

      最近遇到批量刷新ES数据的需求,为了加快处理速度,那必须首选goroutine了,但是众所周知,goroutine的返回值和错误处理一直都让人难以捉摸,go出去简单,怎么监测go出去的结果是个问题。

1、goroutine的错误处理

      sync.ErrGroupsync.WaitGroup功能的基础上,增加了错误传递,以及在发生不可恢复的错误时取消整个goroutine集合,或者等待超时。

具体的大家可以百度学习下。

2、goroutine的处理结果

      目前使用goroutine一般采用的是 channelsync.WaitGroupcontext,来实现各个协程之间的流程控制和消息传递,首选是channel来获取处理结果,channel参考:。

除了channel,那么是否可以用并发安全的sync.Map来存储结果,在所有的goroutine执行完毕后,再统一获取处理结果呢?答案是可以的,sync.Map就可以完美实现。

二、控制goroutine数量以及获取处理结果

1、实战代码

以下是使用goroutine批量刷新ES数据,并获取处理结果的代码:

//定义要获取的返回值,成功数量,失败数量,失败id集合
succeededNums, failedNums:= 0, 0, 
errData:=""
var syncMap sync.Map
wg := sync.WaitGroup{}
//控制goroutine数量,保证同时只有10个goroutine
chan1 := make(chan struct{}, 10) 
for {
   //业务逻辑
   // ....
   //goroutine加速,chan1写满,则阻塞。等待之前的goroutine释放才能继续循环
   chan1 <- struct{}{}
   wg.Add(1)
   go func(ctx context.Context, targetIndexName string, esClient *elastic.Client) {
      defer func() {
         if err1 := recover(); err1 != nil { //产生了panic异常
            log.Errorf(ctx, "%s go panic! err:(+%v)", logPreFix, err1)
         }
         wg.Done() //每个goroutine执行完毕则释放
         return
      }()
      bulkRequest := esClient.Bulk()
      //ES使用bulk方法批量刷新数据
      bulkResByAssetInfo := new(EsBulkDataRes)
      bulkResByAssetInfo, err = BulkEsDataByAssetInfo(ctx, bulkRequest, targetIndexName)
      if err != nil {
         log.Errorf(ctx, "%s BulkEsDataByAssetInfo error (%+v) ,test:(+%v)", logPreFix, err, string_util.TransferToString(fileUnitList))
         return
      }
      //累加执行结果到sync.map,保证并发安全
      tempMap := make(map[string]interface{})
      tempMap["successNums"] = bulkResByAssetInfo.SucceededNums
      tempMap["failedNums"] = bulkResByAssetInfo.FailedNums
      tempMap["errData"] = bulkResByAssetInfo.ErrData
      //每次取循环的的最大id,作为syncMap的key
      syncMap.Store(test[nums-1].ID, tempMap)
      //执行完毕再释放channel
      <-chan1
   }(ctx, targetIndexName, esClient)
  
}
wg.Wait()
//刷新结束,写入通知,通知内容包括,遍历sync.map,获取返回值
syncMap.Range(func(key, value interface{}) bool {
   val := value.(map[string]interface{})
   succeededNums += val["successNums"].(int)
   failedNums += val["failedNums"].(int)
   errData += val["errData"].(string)
   return true
})

//打印结果
fmt.Println("成功数量:",succeededNums)
fmt.Println("失败数量:",failedNums)
fmt.Println("失败id:",errData)

2、syncMap的使用

1)写入处理结果到map2) 写入map到sync.Map中,注意key不要重复
(3)使用Range来循环sync.Map,获取处理结果,并累加

3、控制goroutine的数量

这块主要是通过设置channel的长度来实现的。

1)设定channel长度,循环开始每生成一个goroutine则写入一次channel
(2) channel写满则阻塞
(3)goroutine执行完毕,释放channel
(4for循环中继续写入channel,保证同时执行的goroutine只有10

三、sync.Map的缺点

1、需要对value做断言处理,这个是interface{}的特性决定的

2、大家都知道sync.Map适合读多写少的场景,博主这里因为是跑脚本,所以使用sync.Map也无伤大雅,大家要追求性能的话,可以看一下currentMap的实现,通过hash分桶,减小锁的粒度来提升性能。

end

显示全文