实例代码

func doPatch(
    ctx *gin.Context,
    addrs []string,
    request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
    wg := &sync.WaitGroup{}
    mutex := sync.Mutex{}
    res := make([]graph_raw.CapitalAddressEdge, 0)
    limit := make(chan bool, 5)
    
    for i := 0; i < len(addrs); i += 800 {
        wg.Add(1)
        start := i
        go func() {
            defer wg.Done()
            limit <- true
            group := collections.SliceSubRange(addrs, start, 800)
            edgeList, _ := multiGetAddressData(ctx, group, request)
            <-limit
            mutex.Lock()
            defer mutex.Unlock()
            for _, v := range edgeList {
                *txnCnt += int64(v.Count)
            }
            *rival += int64(len(edgeList))
            slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)

            res = append(res, edgeList...)
        }()
        cTime := time.Now().Unix()
        timeout := int64(9 * 60)
        totalTime := cTime - sTime
        slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d", cTime, sTime, totalTime)
        if totalTime >= timeout {
            return res, serror.CustomErr("查询超时", serror.CustomErrCode)
        }
    }
    wg.Wait()
    return res, nil
}

这段代码使用gorouting、waitgroup、sync.Mutex、channel,来实现并发下安全控制,

func doPatch(
    ctx *gin.Context,
    addrs []string,
    request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
    wg := &sync.WaitGroup{}
    mutex := sync.Mutex{}
    res := make([]graph_raw.CapitalAddressEdge, 0)
    limit := make(chan bool, 5)
    timeout := int64(9 * 60)
    var err error
    for i := 0; i < len(addrs); i += 800 {

        wg.Add(1)
        start := i
        go func(start int) {
            defer wg.Done()
            limit <- true
            if _, ok := ctx.Get("request_canceled"); ok {
                slog.Infof(ctx, "request_canceled: %v", request)
                err = serror.CustomErr("查询取消", serror.CustomErrCode)
                return
            }
            cTime := time.Now().Unix()
            totalTime := cTime - sTime
            slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d", cTime, sTime, totalTime, start, level)
            if totalTime >= timeout {
                slog.Warnf(ctx, "smart超时: %v, curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d, req_level: %d, txnCnt : %d, rival: %d", request.Address, cTime, sTime, totalTime, start, level, request.Level, *txnCnt, *rival)
                err = serror.CustomErr("查询超时", serror.CustomErrCode)
                return
            }

            group := collections.SliceSubRange(addrs, start, 800)
            edgeList, _ := multiGetAddressData(ctx, group, request)
            <-limit
            mutex.Lock()
            defer mutex.Unlock()
            for _, v := range edgeList {
                *txnCnt += int64(v.Count)
            }
            *rival += int64(len(edgeList))
            slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)

            res = append(res, edgeList...)
        }(start)

        if err != nil {
            return res, err
        }
    }
    wg.Wait()

    return res, err
}

标签: none

添加新评论