杨 发布的文章

实例代码

func doPatch(
    ctx *gin.Context,
    addrs []string,
    request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
    wg := &sync.WaitGroup{}
    mutex := sync.Mutex{}
    res := make([]graph_raw.CapitalAddressEdge, 0)
    limit := make(chan bool, 5)
    
    for i := 0; i < len(addrs); i += 800 {
        wg.Add(1)
        start := i
        go func() {
            defer wg.Done()
            limit <- true
            group := collections.SliceSubRange(addrs, start, 800)
            edgeList, _ := multiGetAddressData(ctx, group, request)
            <-limit
            mutex.Lock()
            defer mutex.Unlock()
            for _, v := range edgeList {
                *txnCnt += int64(v.Count)
            }
            *rival += int64(len(edgeList))
            slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)

            res = append(res, edgeList...)
        }()
        cTime := time.Now().Unix()
        timeout := int64(9 * 60)
        totalTime := cTime - sTime
        slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d", cTime, sTime, totalTime)
        if totalTime >= timeout {
            return res, serror.CustomErr("查询超时", serror.CustomErrCode)
        }
    }
    wg.Wait()
    return res, nil
}

这段代码使用gorouting、waitgroup、sync.Mutex、channel,来实现并发下安全控制,

func doPatch(
    ctx *gin.Context,
    addrs []string,
    request *graph_raw.SmartAmountListReq, txnCnt, rival *int64, level uint, sTime int64) ([]graph_raw.CapitalAddressEdge, error) {
    wg := &sync.WaitGroup{}
    mutex := sync.Mutex{}
    res := make([]graph_raw.CapitalAddressEdge, 0)
    limit := make(chan bool, 5)
    timeout := int64(9 * 60)
    var err error
    for i := 0; i < len(addrs); i += 800 {

        wg.Add(1)
        start := i
        go func(start int) {
            defer wg.Done()
            limit <- true
            if _, ok := ctx.Get("request_canceled"); ok {
                slog.Infof(ctx, "request_canceled: %v", request)
                err = serror.CustomErr("查询取消", serror.CustomErrCode)
                return
            }
            cTime := time.Now().Unix()
            totalTime := cTime - sTime
            slog.Infof(ctx, "curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d", cTime, sTime, totalTime, start, level)
            if totalTime >= timeout {
                slog.Warnf(ctx, "smart超时: %v, curr_time: %d, s_time: %d, total_time: %d, i: %d, level: %d, req_level: %d, txnCnt : %d, rival: %d", request.Address, cTime, sTime, totalTime, start, level, request.Level, *txnCnt, *rival)
                err = serror.CustomErr("查询超时", serror.CustomErrCode)
                return
            }

            group := collections.SliceSubRange(addrs, start, 800)
            edgeList, _ := multiGetAddressData(ctx, group, request)
            <-limit
            mutex.Lock()
            defer mutex.Unlock()
            for _, v := range edgeList {
                *txnCnt += int64(v.Count)
            }
            *rival += int64(len(edgeList))
            slog.Infof(ctx, "req_level: %d, curr_level: %d, txnCnt : %d, rival: %d, stime: %d ", request.Level, level, *txnCnt, *rival, sTime)

            res = append(res, edgeList...)
        }(start)

        if err != nil {
            return res, err
        }
    }
    wg.Wait()

    return res, err
}

git merge --squash xxx

合并并将所有提交放到暂存区,需手动提交,作用:将多个提交合为1个。

go build 是 Go 语言的一个编译打包工具,它可以将 Go 语言编写的源码文件编译并打包成一个可执行文件。
在执行 go build 的过程中,Go 编译器会进行几个步骤的操作:
解析源代码,生成 AST(抽象语法树)。
从 AST 生成 SSA(静态单分配)形式的字节码。
优化字节码。
生成机器代码,生成可执行文件。
如果你想在 go build 执行过程中进行某些操作,你可能需要使用 Go 的工具链,如 go/ast,go/token,go/parser,go/types 等包来操作 AST,或者使用 go/ssa 包来操作 SSA 形式的字节码。

用channel控制协程数量

在 Golang 中,可以使用各种方法来控制并发并限制 goroutine 的数量。以下是一种可能的实现方式,可以最多同时运行 50 个 goroutine:

package main

import (
    "sync"
)

func main() {
    maxConcurrency := 50
    taskCount := 100

    var wg sync.WaitGroup
    semaphore := make(chan struct{}, maxConcurrency)

    for i := 0; i < taskCount; i++ {
        wg.Add(1)
        go func(taskID int) {
            semaphore <- struct{}{} // 占用一个信号量,限制并发数量

            // 执行你的任务代码
            // ...

            <-semaphore // 释放信号量
            wg.Done()
        }(i)
    }

    wg.Wait()
}

在上面的示例中,使用了一个 sync.WaitGroup 来等待所有任务完成。通过创建一个有容量的 chan struct{},我们可以使用带缓冲的通道作为信号量来控制 goroutine 的数量。maxConcurrency 变量定义了最大并发数量。

在每个 goroutine 中,首先会占用一个信号量(通过将空结构体写入通道),这将限制并发数量。然后在任务完成后释放信号量(通过从通道读取一个值)。sync.WaitGroup 用于等待所有任务完成。

请注意,这只是一种示例实现方式,您可以根据实际需求和情况进行适当的调整和修改。确保在使用并发控制时遵循最佳实践,以避免竞态条件和其他并发问题。

Q1: elasticsearch不能以root身份启动

Q2:

[2023-11-14T11:34:19,978][WARN ][o.e.h.n.Netty4HttpServerTransport] [safeis-qitaihe-test] received plaintext http traffic on an https channel, closing connection Netty4HttpChannel{localAddress=/127.0.0.1:9200, remoteAddress=/127.0.0.1:33384}

解决方法:

es/config/es.yml
xpack.security.enabled: true => xpack.security.enabled: false

require: 必传
oneof: "oneof=left right",其中一个。
min:验证数值类型或字符串类型的值是否大于等于指定的最小值;
max:验证数值类型或字符串类型的值是否小于等于指定的最大值;
eq:验证两个值是否相等;
ne:验证两个值是否不相等;
len:验证字符串、数组或切片的长度是否等于指定的长度;
regex:验证字符串是否匹配指定的正则表达式;
email:验证字符串是否为有效的电子邮件地址;
url:验证字符串是否为有效的 URL 地址;
numeric:验证字符串是否只包含数字字符;
alpha:验证字符串是否只包含字母字符;
alphanum:验证字符串是否只包含字母和数字字符;
ascii:验证字符串是否只包含 ASCII 字符;
base64:验证字符串是否为有效的 Base64 编码;
file:验证上传的文件是否符合要求,例如文件类型、大小等。

这个报错是因为将 map[string]decimal.Decimal 定义为 decimal.Decimal 类型,而实际上在 map 中赋值的是未命名的浮点型数值 '0.0001'。Go 语言中,未命名的浮点数值默认是 float64 类型,而 decimal.Decimal 类型是不能直接将 float64 类型的值赋给它的。

要解决这个问题,可以将 map[string]decimal.Decimal 中的 float64 类型的值使用 decimal.NewFromFloat() 函数转换为 decimal.Decimal 类型的值。

例如:

var FilterJunkTrans = map[string]decimal.Decimal{

"USDT": decimal.NewFromFloat(1),
"DAI": decimal.NewFromFloat(1),
"USDC": decimal.NewFromFloat(1),
"TUSD": decimal.NewFromFloat(1),
"HUSD": decimal.NewFromFloat(1),
"BUSD": decimal.NewFromFloat(1),
"TRX": decimal.NewFromFloat(1),
"ETH": decimal.NewFromFloat(0.00001),
"BNB": decimal.NewFromFloat(0.0001),
"WBNB": decimal.NewFromFloat(0.0001),

}
这样就可以将 float64 类型的值转换为 decimal.Decimal 类型的值,解决报错问题。

比较运算
greaterThan

当在循环内开启 goroutine 时,一定要非常小心地处理循环变量,避免它们在 goroutine 之间发生竞争,这是常见的 Go 并发问题之一。在当前给出的代码中,开启 goroutine 时使用的 tmpChain 变量就是循环变量 chain 的一个副本,但是它的值只会在循环开始时被赋值,而不会在每次迭代时重新赋值。因此,在你的代码里,如果开启了一些长时间运行的 goroutine 并且在迭代 chainCurrencyAddrMap 之后执行了它们,它们将会看到那个过时的 tmpChain 变量,而不是它们应该看到的当前值。这将导致 goroutine 处理的信息混乱,导致错误。

为了修复这个问题,一种解决方法是将循环变量传递给 goroutine 的参数中,这将确保在 goroutine 中使用的始终是变量的当前值。可以通过将 tmpChain := chain 改为 使用函数参数传递来实现。修改后的代码如下:

func getAddrDistInfoPatch(c * gin.Context,

rawData *graph_raw.GraphRawData, 
chainCurrencyAddrMap map[string]map[string][]string,
column []string) (inFlowMap map[string]map[string]map[string]aModel.AddressInfo, err error) {

mu := &sync.Mutex{}
wg := &sync.WaitGroup{}

inFlowMap = make(map[string]map[string]map[string]aModel.AddressInfo)

for chain, currencyInfo := range chainCurrencyAddrMap {
    for currency, addrs := range currencyInfo {
        limit := make(chan bool, 50)
        for i := 0; i < len(addrs); i += 50 {
            start := i
            wg.Add(1)
            limit <- true
            go func(c string, cr string) {
                defer wg.Done()
                group := collections.SliceSubRange(addrs, start, 50)
                req := aModel.AddressInfoReq{
                    Direction: contract.LeftDirection,
                    Chain:     c,
                    Currency:  cr,
                    MinAmount: rawData.SessionInfo.MinAmount,
                    Column:    column,
                    Addr:      group,
                    BeginTime: rawData.SessionInfo.BeginTime,
                    EndTime:   rawData.SessionInfo.EndTime,
                }

                tmp := map[string]aModel.AddressInfo{}
                resp := safeis.Call(c, "address", "getDistCurrencyAddrInfo",
                    http.MethodPost, nil, req, nil)
                if resp.Code != serror.Success {
                    slog.Errorf(c, "call address/getDistCurrencyAddrInfo failed, resp:%+v", resp)
                    return
                }
                _ = resp.ToStruct(&tmp)
                <-limit

                mu.Lock()
                defer mu.Unlock()

                for k, v := range tmp {
                    if _, ok := inFlowMap[k]; !ok {
                        inFlowMap[k] = make(map[string]map[string]aModel.AddressInfo)
                    }
                    if _, ok := inFlowMap[k][chain]; !ok {
                        inFlowMap[k][chain] = make(map[string]aModel.AddressInfo)
                    }
                    inFlowMap[k][chain][currency] = v
                }
            }(chain, currency)
        }
    }
}

wg.Wait()
return inFlowMap, nil

}
在上述代码中,通过将 tmpChain 赋值替换为将 chain 和 currency 作为参数传递给匿名函数,解决了迭代过程中 goroutine 错误描述的问题。

在Linux系统中,抢占式调度是通过内核的调度器来实现的,它在内核中的实现是一种基于时间片轮转(Round-Robin)和优先级的调度策略。Linux 2.6内核版本之后,引入了完全抢占式内核模型,这意味着即使在内核中运行的代码,也可以在各种情况下被中断或抢占。以下是Linux如何进行抢占式调度的大体步骤:

1.在Linux系统中,每个进程都有一个优先级,通常从0到139。进程的优先级可以通过nice或renice命令来调整。

2.Linux内核具有多个运行队列,每个队列都包含具有相同优先级的进程。这可以确保高优先级进程拥有更好的CPU运行时间,并且不会一直阻塞在等待队列中。

3.内核启动后,将由特殊进程init启动的第一个进程(通常是/bin/bash)添加到运行队列中。

4.运行队列被周期性地遍历,内核为每个队列中的进程分配一个时间片。

5.当进程的时间片耗尽时,内核将终止该进程的运行,并将其从当前队列中删除。

6.如果在一个队列中运行的进程需要等待I/O或其他资源(例如磁盘、网络等),那么该进程将被放入相应的等待队列中。当等待的资源变为可用时,内核将重新将该进程添加到适当的运行队列中。

7.如果有挂起的高优先级进程,内核通过抢占机制将该进程插入到当前进程的时间片中,并为其提供CPU时间片来运行。此时,内核将挂起原来运行的进程,并继续运行更高优先级的进程。

通过上述步骤,Linux内核实现了一种抢占式调度的机制,它可以在实时响应和非实时任务之间找到平衡点,并按照优先级给予适当的CPU时间。

锁机制

在抢占式调度中,为了确保多个进程之间的协调运行,通常需要对一些共享资源进行加锁。锁的机制可以防止出现一些并发问题,比如竞争条件、死锁等等,保护进程的完整性。

在Linux内核中,抢占式调度使用了自旋锁和读写锁(spinlock和rwlock)来确保临界区的互斥,而不会出现睡眠锁(sleep lock)的情况,以免影响响应时间。自旋锁允许一个持有锁的进程能够自旋在一个循环中,等待其他进程放弃锁,而不是进入等待队列或挂起状态。自旋锁不能用于保护长时间运行的临界区,否则会导致缺乏可调度性(优先级反转问题,优先级比自旋锁持有者高的进程被阻塞在自旋锁上)。而读写锁则允许许多进程同时读取资源,但只允许一个进程对资源进行写入操作。

因此,在实现抢占式调度时,锁的使用是必须的。在Linux内核中,锁机制被集成在调度器中,用于保护多个进程之间的协调运行,以确保实时响应任务的正确性和及时性。

区块链目前有多少种虚拟机

区块链目前有多种虚拟机,其中最流行和常用的是以太坊虚拟机(Ethereum Virtual Machine,EVM),其他主要的区块链虚拟机包括NEO虚拟机(NEO Virtual Machine,NeoVM)、EOS虚拟机(EOS Virtual Machine,EVM)和Cardano虚拟机等。不同的区块链平台选择不同的虚拟机,以适应其应用场景和开发要求。

波场用的是什么虚拟机

波场(TRON)使用的是TRON虚拟机(TVM,TRON Virtual Machine)。TVM是TRON生态系统的核心组件,它提供了一种基于Solidity语言的智能合约执行环境,与以太坊虚拟机(EVM)类似,但在性能和资源消耗方面有所优化。TVM的出现,可以帮助开发者更方便地部署智能合约,并且可以支持高性能、高并发的区块链应用场景。