I have a grpc benchmark test code which uses a function to merge hundreds of goroutine channels to one channel using for-select clause. the code is like this
     func (b *B) merge(
          ctx context.Context,
          nodes ...<-chan *pb.Node,
        ) chan *pb.Node {
    allNodes := make(chan *pb.Node)
    var wg sync.WaitGroup
    wg.Add(len(nodes))
    for _, n := range nodes {
        go func(n <-chan *pb.Node) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case val, ok := <-n:
                    if ok {
                        allNodes <- val
                    }
                }
            }
        }(n)
    }
    go func() {
        wg.Wait()
        close(allNodes)
    }()
    return allNodes
}
when I monitor the code by top command in ubuntu 16.04, I see the 2-core server spins crazy, more than 196% of cpu usage.
Then I use pprof package analyze my code, it says 98% of my cpu spins this function, and the top function generate the result like this
    flat  flat%   sum%        cum   cum%
   1640ms  5.78%  5.78%    27700ms 97.60%  B (*B).merge.func1
    5560ms 19.59% 25.37%    22130ms 77.98%  runtime.selectgo
     770ms  2.71% 28.08%    11190ms 39.43%  runtime.sellock
    2700ms  9.51% 37.60%    10430ms 36.75%  runtime.lock
    7710ms 27.17% 64.76%     7710ms 27.17%  runtime.procyield
     460ms  1.62% 66.38%     3850ms 13.57%  context.(*cancelCtx).Done
    1210ms  4.26% 70.65%     3350ms 11.80%  runtime.selunlock
    2700ms  9.51% 80.16%     2900ms 10.22%  sync.(*Mutex).Lock
    2110ms  7.43% 87.60%     2140ms  7.54%  runtime.unlock
     360ms  1.27% 88.87%      860ms  3.03%  runtime.typedmemclr
Anyone can give me some suggestions on how to write the correct code to merge large number of channels it seems this for-select block just make cpu goes crazily and at behind it use procyield which is not a very promising mechanism?
Is there anyway to control the cpu usage of a process?
It seems most likely that the channels passed in the nodes parameter are being closed before the context is cancelled. This turns your for loop into a tight loop which would consume all available CPU. Since a channel can't be reopened once closed, you can safely return from the goroutine once ok is false, which should resolve that issue:
    go func(n <-chan *pb.Node) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case val, ok := <-n:
                if !ok {
                    return
                }
                allNodes <- val
            }
        }
    }(n)
A closed chan does not block - see https://dave.cheney.net/2013/04/30/curious-channels
Set your chan to nil once it's closed.
  case val, ok := <-n:
    if ok {
      allNodes <- val
    } else {
      n = nil
    }
Then the select will block only waiting for the done message.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With