I have a function that should make at max N number of goroutines, then each goroutine will read from the jobs channel and to some calculations. However the caveat is if the calculations take more than X amount of time, end that calculation and move on to the next one.
func doStuff(){
    rules := []string{
        "a",
        "b",
        "c",
        "d",
        "e",
        "f",
        "g",
    }
    var (
        jobs    = make(chan []string, len(rules))
        res     = make(chan bool, len(rules))
        matches []string
    )
    w := func(jobs <-chan []string, results chan<- bool) {
        for j := range jobs {
            k, id := j[0], j[1]
            if id == "c" || id == "e" {
                time.Sleep(time.Second * 5)
            }
            m := match(k, id)
            res <- m
        }
    }
    N := 2
    for i := 0; i < N; i++ {
        go w(jobs, res)
    }
    for _, rl := range rules {
        jobs <- []string{"a", rl}
    }
    close(jobs)
    for i := 0; i < len(rules); i++ {
        select {
        case match := <-res:
            matches = append(matches, match)
        case <-time.After(time.Second):
        }
    }
    fmt.Println(matches)
}
The expected result is:
[a, b, d, f, g]
But what I'm getting is:
[a, b, d]
It seems reading from the results channel ends before one of the goroutines can fully finish due to the sleep. So I added a context with deadline, but now it hangs indefinitely:
    w := func(jobs <-chan []string, results chan<- string) {
        for j := range jobs {
            ctx, c := context.WithDeadline(context.Background(), time.Now().Add(time.Second*2))
            defer c()
            k, id := j[0], j[1]
            if id == "c" || id == "e" {
                time.Sleep(time.Second * 5)
            }
            m := match(k, id)
            select {
            case res <- m:
            case <-ctx.Done():
                fmt.Println("Canceled by timeout")
                continue
            }
        }
    }
I've read other questions regarding completely killing off a goroutine if something times out, but couldn't find anything on skipping if something times out.
I made a package for use cases just like this one. Please see this repository: github.com/MicahParks/ctxerrgroup.
Here's a full example of how your code would look using the package and streaming the results. The streaming approach is more memory efficient. The original approach held all of the results in memory before printing them at the end.
package main
import (
    "context"
    "log"
    "time"
    "github.com/MicahParks/ctxerrgroup"
)
func main() {
    // The number of worker goroutines to use.
    workers := uint(2)
    // Create an error handler that logs all errors.
    //
    // The original work item didn't return an error, so this is not required.
    var errorHandler ctxerrgroup.ErrorHandler
    errorHandler = func(_ ctxerrgroup.Group, err error) {
        log.Printf("A job in the worker pool failed.\nError: %s", err.Error())
    }
    // Create the group of workers.
    group := ctxerrgroup.New(workers, errorHandler)
    // Create the question specific assets.
    rules := []string{
        "a",
        "b",
        "c",
        "d",
        "e",
        "f",
        "g",
    }
    results := make(chan bool)
    // Create a parent timeout.
    timeout := time.Second
    parentTimeout, parentCancel := context.WithTimeout(context.Background(), timeout)
    defer parentCancel()
    // Iterate through all the rules to use.
    for _, rule := range rules {
        // Create a child context for this specific work item.
        ctx, cancel := context.WithCancel(parentTimeout)
        // Create and add the work item.
        group.AddWorkItem(ctx, cancel, func(workCtx context.Context) (err error) {
            // Deliberately shadow the rule so the next iteration doesn't take over.
            rule := rule
            // Do the work using the workCtx.
            results <- match(workCtx, "a", rule)
            return nil
        })
    }
    // Launch a goroutine that will close the results channel when everyone is finished.
    go func() {
        group.Wait()
        close(results)
    }()
    // Print the matches as the happen. This will not hang.
    for result := range results {
        log.Println(result)
    }
    // Wait for the group to finish.
    //
    // This is not required, but doesn't hurt as group.Wait is idempotent. It's here in case you remove the goroutine
    // waiting and closing the channel above.
    group.Wait()
}
// match is a function from the original question. It now accepts and properly uses the context argument.
func match(ctx context.Context, key, id string) bool {
    panic("implement me")
}
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With