goroutine和channel学习
实例代码
func doPatch(
ctx *gin.Context,
addrs []string,
request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
wg := &sync.WaitGroup{}
mutex := sync.Mutex{}
res := make([]graph_raw.CapitalAddressEdge, 0)
limit := make(chan bool, 5)
for i := 0; i < len(addrs); i += 800 {
wg.Add(1)
start := i
go func() {
defer wg.Done()
limit <- true
group := collections.SliceSubRange(addrs, start, 800)
edgeList, _ := multiGetAddressData(ctx, group, request)
<-limit
mutex.Lock()
defer mutex.Unlock()
for _, v := range edgeList {
*txnCnt += int64(v.Count)
}
*rival += int64(len(edgeList))
slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)
res = append(res, edgeList...)
}()
cTime := time.Now().Unix()
timeout := int64(9 * 60)
totalTime := cTime - sTime
slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d", cTime, sTime, totalTime)
if totalTime >= timeout {
return res, serror.CustomErr("查询超时", serror.CustomErrCode)
}
}
wg.Wait()
return res, nil
}
这段代码使用gorouting、waitgroup、sync.Mutex、channel,来实现并发下安全控制,
func doPatch(
ctx *gin.Context,
addrs []string,
request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
wg := &sync.WaitGroup{}
mutex := sync.Mutex{}
res := make([]graph_raw.CapitalAddressEdge, 0)
limit := make(chan bool, 5)
timeout := int64(9 * 60)
var err error
for i := 0; i < len(addrs); i += 800 {
wg.Add(1)
start := i
go func(start int) {
defer wg.Done()
limit <- true
if _, ok := ctx.Get("request_canceled"); ok {
slog.Infof(ctx, "request_canceled: %v", request)
err = serror.CustomErr("查询取消", serror.CustomErrCode)
return
}
cTime := time.Now().Unix()
totalTime := cTime - sTime
slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d", cTime, sTime, totalTime, start, level)
if totalTime >= timeout {
slog.Warnf(ctx, "smart超时: %v, curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d, req_level: %d, txnCnt : %d, rival: %d", request.Address, cTime, sTime, totalTime, start, level, request.Level, *txnCnt, *rival)
err = serror.CustomErr("查询超时", serror.CustomErrCode)
return
}
group := collections.SliceSubRange(addrs, start, 800)
edgeList, _ := multiGetAddressData(ctx, group, request)
<-limit
mutex.Lock()
defer mutex.Unlock()
for _, v := range edgeList {
*txnCnt += int64(v.Count)
}
*rival += int64(len(edgeList))
slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)
res = append(res, edgeList...)
}(start)
if err != nil {
return res, err
}
}
wg.Wait()
return res, err
}