Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get partial results with Java structured concurrency in a timeout scenario?

I have a scenario where I need to send two parallel requests, where they should have a combined timeout of 2 seconds. However if one of the requests times out, I would like to see the partial result of the other request. And partial result can be a result or an exception. So I do not want to shutdown on failures. I had tried to solve it like this with StructuredConcurrency:

import java.time.Instant;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeoutException;

public class Main {
    public static void main(String[] myData) {
        try (var scope = new StructuredTaskScope<String>()) {

             var subtask1 = scope.fork(() -> {
                Thread.sleep(3000);
                return "slow";
            });
             var subtask2 = scope.fork(() -> {
                Thread.sleep(1000);
                return "fast";
            });

            try {
                scope.joinUntil(Instant.now().plusMillis(2000));
            } catch (TimeoutException te) {
                System.out.println("Timeout occurred");
            }

            switch (subtask1.state()) {
                case SUCCESS -> System.out.println("1s"+subtask1.get());
                case FAILED -> System.out.println("1f");
                case UNAVAILABLE -> System.out.println("1u");
                case null, default -> System.out.println("1w");
            }

            switch (subtask2.state()) {
                case SUCCESS -> System.out.println("2s"+subtask2.get());
                case FAILED -> System.out.println("2f");
                case UNAVAILABLE -> System.out.println("2u");
                case null, default -> System.out.println("2w");
            }

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

So this is the output:

Timeout occurred
1u
Exception in thread "main" java.lang.IllegalStateException: Owner did not join after forking subtasks
    at java.base/java.util.concurrent.StructuredTaskScope.newIllegalStateExceptionNoJoin(StructuredTaskScope.java:439)
    at java.base/java.util.concurrent.StructuredTaskScope.ensureJoinedIfOwner(StructuredTaskScope.java:477)
    at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:917)
    at org.example.Main.main(Main.java:34)

So slow task's state is UNAVAILABLE. Fast task's state is SUCCESS, but not able to call get on it. It is throwing IllegalStateException. What is the best way to capture the results that each subtask emits? Updating outer variables from the subtasks?

like image 603
Mehmet Atakan Serin Avatar asked Nov 15 '25 15:11

Mehmet Atakan Serin


1 Answers

I've been looking at the new API, but I'm not sure this is possible in the way that Oracle intended. You'd need a custom StructuredTaskScope.Joiner, but that can only tell the scope to stop waiting after a task has completed. You can configure a timeout, but then scope.join() will throw an exception and you can't get the results. You also need to configure the StructuredTaskScope.Configuration to define the timeout.

The following hack stills work though. The first thing you need is a thread-safe Joiner. Thread-safe, because the results will not be used in the way that Oracle intended.

class MyJoiner implements StructuredTaskScope.Joiner<String, List<String>> {

    private final List<String> results = new ArrayList<>();

    @Override
    public boolean onComplete(StructuredTaskScope.Subtask<? extends String> subtask) {
        String result = subtask.get();
        synchronized (results) {
            results.add(result);
        }
        // don't cancel the scope
        return false;
    }

    @Override
    public List<String> result() {
        synchronized (results) {
            return List.copyOf(results);
        }
    }
}

Using it is now where the hack comes in: we're not using the result of scope.join() if a timeout occurs but instead ask the Joiner for the results.

var joiner = new MyJoiner();
try (var scope = StructuredTaskScope.open(joiner, config -> config.withTimeout(Duration.ofSeconds(2)))) {
    scope.fork(() -> {
        Thread.sleep(3000);
        return "slow";
    });
    scope.fork(() -> {
        Thread.sleep(1000);
        return "fast";
    });

    List<String> result;
    try {
        result = scope.join();
    } catch (StructuredTaskScope.TimeoutException _) {
        result = joiner.result();
    }
    System.out.printf("Result: %s%n", result);
} (catch InterruptedException e) {
    // It's common practice do this when the thread isn't finished
    Thread.currentThread().interrupt();
    throw new RuntimeException(e);
}

In this example I store the subtask results, but you can store the entire subtask if you want. Whenever joiner.result() is called, either directly or through scope.join(), it will only contain completed tasks. That may be a bit safer, because my custom joiner will fail if a task fails before the timeout (subtask.get() may only be called on successfully completed subtasks).

like image 59
Rob Spoor Avatar answered Nov 17 '25 07:11

Rob Spoor



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!