I have a scenario where I need to send two parallel requests, where they should have a combined timeout of 2 seconds. However if one of the requests times out, I would like to see the partial result of the other request. And partial result can be a result or an exception. So I do not want to shutdown on failures. I had tried to solve it like this with StructuredConcurrency:
import java.time.Instant;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeoutException;
public class Main {
public static void main(String[] myData) {
try (var scope = new StructuredTaskScope<String>()) {
var subtask1 = scope.fork(() -> {
Thread.sleep(3000);
return "slow";
});
var subtask2 = scope.fork(() -> {
Thread.sleep(1000);
return "fast";
});
try {
scope.joinUntil(Instant.now().plusMillis(2000));
} catch (TimeoutException te) {
System.out.println("Timeout occurred");
}
switch (subtask1.state()) {
case SUCCESS -> System.out.println("1s"+subtask1.get());
case FAILED -> System.out.println("1f");
case UNAVAILABLE -> System.out.println("1u");
case null, default -> System.out.println("1w");
}
switch (subtask2.state()) {
case SUCCESS -> System.out.println("2s"+subtask2.get());
case FAILED -> System.out.println("2f");
case UNAVAILABLE -> System.out.println("2u");
case null, default -> System.out.println("2w");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
So this is the output:
Timeout occurred
1u
Exception in thread "main" java.lang.IllegalStateException: Owner did not join after forking subtasks
at java.base/java.util.concurrent.StructuredTaskScope.newIllegalStateExceptionNoJoin(StructuredTaskScope.java:439)
at java.base/java.util.concurrent.StructuredTaskScope.ensureJoinedIfOwner(StructuredTaskScope.java:477)
at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:917)
at org.example.Main.main(Main.java:34)
So slow task's state is UNAVAILABLE. Fast task's state is SUCCESS, but not able to call get on it. It is throwing IllegalStateException. What is the best way to capture the results that each subtask emits? Updating outer variables from the subtasks?
I've been looking at the new API, but I'm not sure this is possible in the way that Oracle intended. You'd need a custom StructuredTaskScope.Joiner, but that can only tell the scope to stop waiting after a task has completed. You can configure a timeout, but then scope.join() will throw an exception and you can't get the results. You also need to configure the StructuredTaskScope.Configuration to define the timeout.
The following hack stills work though. The first thing you need is a thread-safe Joiner. Thread-safe, because the results will not be used in the way that Oracle intended.
class MyJoiner implements StructuredTaskScope.Joiner<String, List<String>> {
private final List<String> results = new ArrayList<>();
@Override
public boolean onComplete(StructuredTaskScope.Subtask<? extends String> subtask) {
String result = subtask.get();
synchronized (results) {
results.add(result);
}
// don't cancel the scope
return false;
}
@Override
public List<String> result() {
synchronized (results) {
return List.copyOf(results);
}
}
}
Using it is now where the hack comes in: we're not using the result of scope.join() if a timeout occurs but instead ask the Joiner for the results.
var joiner = new MyJoiner();
try (var scope = StructuredTaskScope.open(joiner, config -> config.withTimeout(Duration.ofSeconds(2)))) {
scope.fork(() -> {
Thread.sleep(3000);
return "slow";
});
scope.fork(() -> {
Thread.sleep(1000);
return "fast";
});
List<String> result;
try {
result = scope.join();
} catch (StructuredTaskScope.TimeoutException _) {
result = joiner.result();
}
System.out.printf("Result: %s%n", result);
} (catch InterruptedException e) {
// It's common practice do this when the thread isn't finished
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
In this example I store the subtask results, but you can store the entire subtask if you want. Whenever joiner.result() is called, either directly or through scope.join(), it will only contain completed tasks. That may be a bit safer, because my custom joiner will fail if a task fails before the timeout (subtask.get() may only be called on successfully completed subtasks).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With