杨 发布的文章

场景1:实现并发调用多个方法,有一个返回结果就都返回
思路:可以使用 select 语句结合通道来等待多个 goroutine 返回结果,一旦有 goroutine 返回结果,就立即处理该结果并结束程序。

/**
 * @desc
 * @date 2025/4/19
 * @user yangshuo
 */

package main

import (
    "fmt"
    "time"
)

func https(resultChan chan string) {
    // 模拟 HTTP 请求
    // t := time.NewTicker(time.Duration(rand.Intn(5)+1) * time.Second)

    time.Sleep(time.Duration(4+1) * time.Second)
    resultChan <- "http"
    println("1")
}

func redis(resultChan chan string) {
    // 模拟 HTTP 请求
    // t := time.NewTicker(time.Duration(rand.Intn(5)+1) * time.Second)
    // <-t.C
    time.Sleep(time.Duration(5+1) * time.Second)
    resultChan <- "redis"
    println("2")
}

func mysql(resultChan chan string) {
    // 模拟 HTTP 请求
    // t := time.NewTicker(time.Duration(rand.Intn(5)+1) * time.Second)
    // <-t.C
    time.Sleep(time.Duration(3+1) * time.Second)
    resultChan <- "mysql"
    println("3")
}

func main() {
    resultChan := make(chan string)
    defer close(resultChan)

    go https(resultChan)
    go mysql(resultChan)
    go redis(resultChan)

    for {
        select {
        case result := <-resultChan:
            fmt.Println("返回结果:", result)
            return
        }
    }

}

场景2:实现并发调用多个校验方法,有一个成功就返回成功,所有方法都失败就返回失败。
思路:与场景1不同的是,失败情况需要等待所有goroutine都执行完。

/**
 * @desc
 * @date 2025/4/20
 * @user yangshuo
 */

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func validateFunc1() bool {
    // 模拟验证逻辑
    // 设置随机种子
    return false
    rand.Seed(time.Now().UnixNano())

    // 生成随机数(0 或 1)
    randomNum := rand.Intn(2)

    // 根据随机数返回 true 或 false
    result := randomNum == 1
    fmt.Println(result)
    return result
}

func validateFunc2() bool {
    return false
    // 设置随机种子
    rand.Seed(time.Now().UnixNano())

    // 生成随机数(0 或 1)
    randomNum := rand.Intn(2)

    // 根据随机数返回 true 或 false
    result := randomNum == 1
    fmt.Println(result)
    return result
}

func validateFunc3() bool {
    return false
    // 模拟验证逻辑
    // 设置随机种子
    rand.Seed(time.Now().UnixNano())

    // 生成随机数(0 或 1)
    randomNum := rand.Intn(2)

    // 根据随机数返回 true 或 false
    result := randomNum == 1
    fmt.Println(result)
    return result
}

func main() {
    // 定义验证函数
    validateFuncs := []func() bool{validateFunc1, validateFunc2, validateFunc3}
    // 使用 channel 通知是否有函数通过验证
    passed := make(chan bool)

    // 并发调用验证函数
    for _, vf := range validateFuncs {
        go func(fn func() bool) {
            if fn() {
                passed <- true
            }
        }(vf)
    }

    // 检查是否有函数通过验证
    for range validateFuncs {
        select {
        case <-passed:
            fmt.Println("Validation passed")
            return
    }

    fmt.Println("Validation failed")
}

需要注意的是:当所有 goroutines 都未通过验证时,由于没有 goroutine 向 passed 通道发送消息,select 语句会一直阻塞等待消息,从而导致程序发生死锁。为了避免这种情况,可以加上default分支。

// 检查是否有函数通过验证
    for range validateFuncs {
        select {
        case <-passed:
            fmt.Println("Validation passed")
            return
        default:

        }
    }

实例代码

func doPatch(
    ctx *gin.Context,
    addrs []string,
    request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
    wg := &sync.WaitGroup{}
    mutex := sync.Mutex{}
    res := make([]graph_raw.CapitalAddressEdge, 0)
    limit := make(chan bool, 5)
    
    for i := 0; i < len(addrs); i += 800 {
        wg.Add(1)
        start := i
        go func() {
            defer wg.Done()
            limit <- true
            group := collections.SliceSubRange(addrs, start, 800)
            edgeList, _ := multiGetAddressData(ctx, group, request)
            <-limit
            mutex.Lock()
            defer mutex.Unlock()
            for _, v := range edgeList {
                *txnCnt += int64(v.Count)
            }
            *rival += int64(len(edgeList))
            slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)

            res = append(res, edgeList...)
        }()
        cTime := time.Now().Unix()
        timeout := int64(9 * 60)
        totalTime := cTime - sTime
        slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d", cTime, sTime, totalTime)
        if totalTime >= timeout {
            return res, serror.CustomErr("查询超时", serror.CustomErrCode)
        }
    }
    wg.Wait()
    return res, nil
}

这段代码使用gorouting、waitgroup、sync.Mutex、channel,来实现并发下安全控制,

func doPatch(
    ctx *gin.Context,
    addrs []string,
    request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
    wg := &sync.WaitGroup{}
    mutex := sync.Mutex{}
    res := make([]graph_raw.CapitalAddressEdge, 0)
    limit := make(chan bool, 5)
    timeout := int64(9 * 60)
    var err error
    for i := 0; i < len(addrs); i += 800 {

        wg.Add(1)
        start := i
        go func(start int) {
            defer wg.Done()
            limit <- true
            if _, ok := ctx.Get("request_canceled"); ok {
                slog.Infof(ctx, "request_canceled: %v", request)
                err = serror.CustomErr("查询取消", serror.CustomErrCode)
                return
            }
            cTime := time.Now().Unix()
            totalTime := cTime - sTime
            slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d", cTime, sTime, totalTime, start, level)
            if totalTime >= timeout {
                slog.Warnf(ctx, "smart超时: %v, curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d, req_level: %d, txnCnt : %d, rival: %d", request.Address, cTime, sTime, totalTime, start, level, request.Level, *txnCnt, *rival)
                err = serror.CustomErr("查询超时", serror.CustomErrCode)
                return
            }

            group := collections.SliceSubRange(addrs, start, 800)
            edgeList, _ := multiGetAddressData(ctx, group, request)
            <-limit
            mutex.Lock()
            defer mutex.Unlock()
            for _, v := range edgeList {
                *txnCnt += int64(v.Count)
            }
            *rival += int64(len(edgeList))
            slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)

            res = append(res, edgeList...)
        }(start)

        if err != nil {
            return res, err
        }
    }
    wg.Wait()

    return res, err
}

git merge --squash xxx

合并并将所有提交放到暂存区,需手动提交,作用:将多个提交合为1个。

go build 是 Go 语言的一个编译打包工具,它可以将 Go 语言编写的源码文件编译并打包成一个可执行文件。
在执行 go build 的过程中,Go 编译器会进行几个步骤的操作:
解析源代码,生成 AST(抽象语法树)。
从 AST 生成 SSA(静态单分配)形式的字节码。
优化字节码。
生成机器代码,生成可执行文件。
如果你想在 go build 执行过程中进行某些操作,你可能需要使用 Go 的工具链,如 go/ast,go/token,go/parser,go/types 等包来操作 AST,或者使用 go/ssa 包来操作 SSA 形式的字节码。

用channel控制协程数量

在 Golang 中,可以使用各种方法来控制并发并限制 goroutine 的数量。以下是一种可能的实现方式,可以最多同时运行 50 个 goroutine:

package main

import (
    "sync"
)

func main() {
    maxConcurrency := 50
    taskCount := 100

    var wg sync.WaitGroup
    semaphore := make(chan struct{}, maxConcurrency)

    for i := 0; i < taskCount; i++ {
        wg.Add(1)
        go func(taskID int) {
            semaphore <- struct{}{} // 占用一个信号量,限制并发数量

            // 执行你的任务代码
            // ...

            <-semaphore // 释放信号量
            wg.Done()
        }(i)
    }

    wg.Wait()
}

在上面的示例中,使用了一个 sync.WaitGroup 来等待所有任务完成。通过创建一个有容量的 chan struct{},我们可以使用带缓冲的通道作为信号量来控制 goroutine 的数量。maxConcurrency 变量定义了最大并发数量。

在每个 goroutine 中,首先会占用一个信号量(通过将空结构体写入通道),这将限制并发数量。然后在任务完成后释放信号量(通过从通道读取一个值)。sync.WaitGroup 用于等待所有任务完成。

请注意,这只是一种示例实现方式,您可以根据实际需求和情况进行适当的调整和修改。确保在使用并发控制时遵循最佳实践,以避免竞态条件和其他并发问题。

Q1: elasticsearch不能以root身份启动

Q2:

[2023-11-14T11:34:19,978][WARN ][o.e.h.n.Netty4HttpServerTransport] [safeis-qitaihe-test] received plaintext http traffic on an https channel, closing connection Netty4HttpChannel{localAddress=/127.0.0.1:9200, remoteAddress=/127.0.0.1:33384}

解决方法:

es/config/es.yml
xpack.security.enabled: true => xpack.security.enabled: false

require: 必传
oneof: "oneof=left right",其中一个。
min:验证数值类型或字符串类型的值是否大于等于指定的最小值;
max:验证数值类型或字符串类型的值是否小于等于指定的最大值;
eq:验证两个值是否相等;
ne:验证两个值是否不相等;
len:验证字符串、数组或切片的长度是否等于指定的长度;
regex:验证字符串是否匹配指定的正则表达式;
email:验证字符串是否为有效的电子邮件地址;
url:验证字符串是否为有效的 URL 地址;
numeric:验证字符串是否只包含数字字符;
alpha:验证字符串是否只包含字母字符;
alphanum:验证字符串是否只包含字母和数字字符;
ascii:验证字符串是否只包含 ASCII 字符;
base64:验证字符串是否为有效的 Base64 编码;
file:验证上传的文件是否符合要求,例如文件类型、大小等。

这个报错是因为将 map[string]decimal.Decimal 定义为 decimal.Decimal 类型,而实际上在 map 中赋值的是未命名的浮点型数值 '0.0001'。Go 语言中,未命名的浮点数值默认是 float64 类型,而 decimal.Decimal 类型是不能直接将 float64 类型的值赋给它的。

要解决这个问题,可以将 map[string]decimal.Decimal 中的 float64 类型的值使用 decimal.NewFromFloat() 函数转换为 decimal.Decimal 类型的值。

例如:

var FilterJunkTrans = map[string]decimal.Decimal{

"USDT": decimal.NewFromFloat(1),
"DAI": decimal.NewFromFloat(1),
"USDC": decimal.NewFromFloat(1),
"TUSD": decimal.NewFromFloat(1),
"HUSD": decimal.NewFromFloat(1),
"BUSD": decimal.NewFromFloat(1),
"TRX": decimal.NewFromFloat(1),
"ETH": decimal.NewFromFloat(0.00001),
"BNB": decimal.NewFromFloat(0.0001),
"WBNB": decimal.NewFromFloat(0.0001),

}
这样就可以将 float64 类型的值转换为 decimal.Decimal 类型的值,解决报错问题。

比较运算
greaterThan

当在循环内开启 goroutine 时,一定要非常小心地处理循环变量,避免它们在 goroutine 之间发生竞争,这是常见的 Go 并发问题之一。在当前给出的代码中,开启 goroutine 时使用的 tmpChain 变量就是循环变量 chain 的一个副本,但是它的值只会在循环开始时被赋值,而不会在每次迭代时重新赋值。因此,在你的代码里,如果开启了一些长时间运行的 goroutine 并且在迭代 chainCurrencyAddrMap 之后执行了它们,它们将会看到那个过时的 tmpChain 变量,而不是它们应该看到的当前值。这将导致 goroutine 处理的信息混乱,导致错误。

为了修复这个问题,一种解决方法是将循环变量传递给 goroutine 的参数中,这将确保在 goroutine 中使用的始终是变量的当前值。可以通过将 tmpChain := chain 改为 使用函数参数传递来实现。修改后的代码如下:

func getAddrDistInfoPatch(c * gin.Context,

rawData *graph_raw.GraphRawData, 
chainCurrencyAddrMap map[string]map[string][]string,
column []string) (inFlowMap map[string]map[string]map[string]aModel.AddressInfo, err error) {

mu := &sync.Mutex{}
wg := &sync.WaitGroup{}

inFlowMap = make(map[string]map[string]map[string]aModel.AddressInfo)

for chain, currencyInfo := range chainCurrencyAddrMap {
    for currency, addrs := range currencyInfo {
        limit := make(chan bool, 50)
        for i := 0; i < len(addrs); i += 50 {
            start := i
            wg.Add(1)
            limit <- true
            go func(c string, cr string) {
                defer wg.Done()
                group := collections.SliceSubRange(addrs, start, 50)
                req := aModel.AddressInfoReq{
                    Direction: contract.LeftDirection,
                    Chain:     c,
                    Currency:  cr,
                    MinAmount: rawData.SessionInfo.MinAmount,
                    Column:    column,
                    Addr:      group,
                    BeginTime: rawData.SessionInfo.BeginTime,
                    EndTime:   rawData.SessionInfo.EndTime,
                }

                tmp := map[string]aModel.AddressInfo{}
                resp := safeis.Call(c, "address", "getDistCurrencyAddrInfo",
                    http.MethodPost, nil, req, nil)
                if resp.Code != serror.Success {
                    slog.Errorf(c, "call address/getDistCurrencyAddrInfo failed, resp:%+v", resp)
                    return
                }
                _ = resp.ToStruct(&tmp)
                <-limit

                mu.Lock()
                defer mu.Unlock()

                for k, v := range tmp {
                    if _, ok := inFlowMap[k]; !ok {
                        inFlowMap[k] = make(map[string]map[string]aModel.AddressInfo)
                    }
                    if _, ok := inFlowMap[k][chain]; !ok {
                        inFlowMap[k][chain] = make(map[string]aModel.AddressInfo)
                    }
                    inFlowMap[k][chain][currency] = v
                }
            }(chain, currency)
        }
    }
}

wg.Wait()
return inFlowMap, nil

}
在上述代码中,通过将 tmpChain 赋值替换为将 chain 和 currency 作为参数传递给匿名函数,解决了迭代过程中 goroutine 错误描述的问题。