Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to parallelize a recursive function

Tags:

go

I am trying to parallelize a recursive problem in Go, and I am unsure what the best way to do this is.

I have a recursive function, which works like this:

func recFunc(input string) (result []string) {
    for subInput := range getSubInputs(input) {
        subOutput := recFunc(subInput)
        result = result.append(result, subOutput...)
    }
    result = result.append(result, getOutput(input)...)
}

func main() {
    output := recFunc("some_input")
    ...
}

So the function calls itself N times (where N is 0 at some level), generates its own output and returns everything in a list.

Now I want to make this function run in parallel. But I am unsure what the cleanest way to do this is. My Idea:

  • Have a "result" channel, to which all function calls send their result.
  • Collect the results in the main function.
  • Have a wait group, which determines when all results are collected.

The Problem: I need to wait for the wait group and collect all results in parallel. I can start a separate go function for this, but how do I ever quit this separate go function?

func recFunc(input string) (result []string, outputChannel chan []string, waitGroup &sync.WaitGroup) {
    defer waitGroup.Done()
    waitGroup.Add(len(getSubInputs(input))
    for subInput := range getSubInputs(input) {
        go recFunc(subInput)
    }
    outputChannel <-getOutput(input)
}

func main() {
    outputChannel := make(chan []string)
    waitGroup := sync.WaitGroup{}

    waitGroup.Add(1)
    go recFunc("some_input", outputChannel, &waitGroup)

    result := []string{}
    go func() {
       nextResult := <- outputChannel
       result = append(result, nextResult ...)
    }
    waitGroup.Wait()
}

Maybe there is a better way to do this? Or how can I ensure the anonymous go function, that collects the results, is quited when done?

like image 767
Nathan Avatar asked Dec 06 '25 11:12

Nathan


2 Answers

tl;dr;

  • recursive algorithms should have bounded limits on expensive resources (network connections, goroutines, stack space etc.)
  • cancelation should be supported - to ensure expensive operations can be cleaned up quickly if a result is no longer needed
  • branch traversal should support error reporting; this allows errors to bubble up the stack & partial results to be returned without the entire recursion traversal to fail.

For asychronous results - whether using recursions or not - use of channels is recommended. Also, for long running jobs with many goroutines, provide a method for cancelation (context.Context) to aid with clean-up.

Since recursion can lead to exponential consumption of resources it's important to put limits in place (see bounded parallelism).

Below is a design patten I use a lot for asynchronous tasks:

  • always support taking a context.Context for cancelation
  • number of workers needed for the task
  • return a chan of results & a chan error (will only return one error or nil)

var (
    workers = 10
    ctx     = context.TODO() // use request context here - otherwise context.Background()
    input   = "abc"
)

resultC, errC := recJob(ctx, workers, input) // returns results & `error` channels

// asynchronous results - so read that channel first in the event of partial results ...
for r := range resultC {
    fmt.Println(r)
}

// ... then check for any errors
if err := <-errC; err != nil {
    log.Fatal(err)
}

Recursion:

Since recursion quickly scales horizontally, one needs a consistent way to fill the finite list of workers with work but also ensure when workers are freed up, that they quickly pick up work from other (over-worked) workers.

Rather than create a manager layer, employ a cooperative peer system of workers:

  • each worker shares a single inputs channel
  • before recursing on inputs (subIinputs) check if any other workers are idle
    • if so, delegate to that worker
    • if not, current worker continues recursing that branch

With this algorithm, the finite count of workers quickly become saturated with work. Any workers which finish early with their branch - will quickly be delegated a sub-branch from another worker. Eventually all workers will run out of sub-branches, at which point all workers will be idled (blocked) and the recursion task can finish up.

Some careful coordination is needed to achieve this. Allowing the workers to write to the input channel helps with this peer coordination via delegation. A "recursion depth" WaitGroup is used to track when all branches have been exhausted across all workers.

(To include context support and error chaining - I updated your getSubInputs function to take a ctx and return an optional error):

func recFunc(ctx context.Context, input string, in chan string, out chan<- string, rwg *sync.WaitGroup) error {

    defer rwg.Done() // decrement recursion count when a depth of recursion has completed

    subInputs, err := getSubInputs(ctx, input)
    if err != nil {
        return err
    }

    for subInput := range subInputs { 
        rwg.Add(1) // about to recurse (or delegate recursion)

        select {
        case in <- subInput:
            // delegated - to another goroutine

        case <-ctx.Done():
            // context canceled...

            // but first we need to undo the earlier `rwg.Add(1)`
            // as this work item was never delegated or handled by this worker
            rwg.Done()
            return ctx.Err()

        default:
            // noone available to delegate - so this worker will need to recurse this item themselves
            err = recFunc(ctx, subInput, in, out, rwg)
            if err != nil {
                return err
            }
        }

        select {
        case <-ctx.Done():
            // always check context when doing anything potentially blocking (in this case writing to `out`)
            // context canceled
            return ctx.Err()

        case out <- subInput:
        }
    }

    return nil
}

Connecting the Pieces:

recJob creates:

  • input & output channels - shared by all workers
  • "recursion" WaitGroup detects when all workers are idle
    • "output" channel can then safely be closed
  • error channel for all workers
  • kicks-off recursion workload by writing initial input to input channel

func recJob(ctx context.Context, workers int, input string) (resultsC <-chan string, errC <-chan error) {

    // RW channels
    out := make(chan string)
    eC := make(chan error, 1)

    // R-only channels returned to caller
    resultsC, errC = out, eC

    // create workers + waitgroup logic
    go func() {

        var err error // error that will be returned to call via error channel

        defer func() {
            close(out)
            eC <- err
            close(eC)
        }()

        var wg sync.WaitGroup
        wg.Add(1)
        in := make(chan string) // input channel: shared by all workers (to read from and also to write to when they need to delegate)

        workerErrC := createWorkers(ctx, workers, in, out, &wg)

        // get the ball rolling, pass input job to one of the workers
        // Note: must be done *after* workers are created - otherwise deadlock
        in <- input

        errCount := 0

        // wait for all worker error codes to return
        for err2 := range workerErrC {
            if err2 != nil {
                log.Println("worker error:", err2)
                errCount++
            }
        }

        // all workers have completed
        if errCount > 0 {
            err = fmt.Errorf("PARTIAL RESULT: %d of %d workers encountered errors", errCount, workers)
            return
        }

        log.Printf("All %d workers have FINISHED\n", workers)
    }()

    return
}

Finally, create the workers:

func createWorkers(ctx context.Context, workers int, in chan string, out chan<- string, rwg *sync.WaitGroup) (errC <-chan error) {

    eC := make(chan error) // RW-version
    errC = eC              // RO-version (returned to caller)

    // track the completeness of the workers - so we know when to wrap up
    var wg sync.WaitGroup
    wg.Add(workers)

    for i := 0; i < workers; i++ {
        i := i
        go func() {
            defer wg.Done()

            var err error

            // ensure the current worker's return code gets returned
            // via the common workers' error-channel
            defer func() {
                if err != nil {
                    log.Printf("worker #%3d ERRORED: %s\n", i+1, err)
                } else {
                    log.Printf("worker #%3d FINISHED.\n", i+1)
                }
                eC <- err
            }()

            log.Printf("worker #%3d STARTED successfully\n", i+1)

            // worker scans for input
            for input := range in {

                err = recFunc(ctx, input, in, out, rwg)
                if err != nil {
                    log.Printf("worker #%3d recurseManagers ERROR: %s\n", i+1, err)
                    return
                }
            }

        }()
    }

    go func() {
        rwg.Wait() // wait for all recursion to finish
        close(in)  // safe to close input channel as all workers are blocked (i.e. no new inputs)
        wg.Wait()  // now wait for all workers to return
        close(eC)  // finally, signal to caller we're truly done by closing workers' error-channel
    }()

    return
}
like image 179
colm.anseo Avatar answered Dec 09 '25 01:12

colm.anseo


I can start a separate go function for this, but how do I ever quit this separate go function?

You can range over the output channel in the separate go-routine. The go-routine, in that case, will exit safely, when the channel is closed

go func() {
   for nextResult := range outputChannel {
     result = append(result, nextResult ...)
   }
}

So, now the thing that we need to take care of is that the channel is closed after all the go-routines spawned as part of the recursive function call have successfully existed

For that, you can use a shared waitgroup across all the go-routines and wait on that waitgroup in your main function, as you are already doing. Once the wait is over, close the outputChannel, so that the other go-routine also exits safely

func recFunc(input string, outputChannel chan, wg &sync.WaitGroup) {
    defer wg.Done()
    for subInput := range getSubInputs(input) {
        wg.Add(1)
        go recFunc(subInput)
    }
    outputChannel <-getOutput(input)
}

func main() {
    outputChannel := make(chan []string)
    waitGroup := sync.WaitGroup{}

    waitGroup.Add(1)
    go recFunc("some_input", outputChannel, &waitGroup)

    result := []string{}
    go func() {
     for nextResult := range outputChannel {
      result = append(result, nextResult ...)
     }
    }
    waitGroup.Wait()
    close(outputChannel)        
}

PS: If you want to have bounded parallelism to limit the exponential growth, check this out

like image 42
rustedGeek Avatar answered Dec 09 '25 01:12

rustedGeek



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!