Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to kill CompletableFuture related threads?

I have method that is checking the CompletableFuture execution time. If such CompletableFuture is executing for more than 2 seconds i want to kill this task. But how can I doit if i don't have control overy thread where CompletableFuture methods are executed ?

       final CompletableFuture<List<List<Student>>> responseFuture = new CompletableFuture<>();
responseFuture.supplyAsync(this::createAllRandomGroups)
        .thenAccept(this::printGroups)
        .exceptionally(throwable -> {
            throwable.printStackTrace();
            return null;
        });

createAllRandomGroups()

private List<List<Student>> createAllRandomGroups() {
    System.out.println("XD");
    List<Student> allStudents = ClassGroupUtils.getActiveUsers();
    Controller controller = Controller.getInstance();
    List<List<Student>> groups = new ArrayList<>();
    int groupSize = Integer.valueOf(controller.getGroupSizeComboBox().getSelectionModel().getSelectedItem());
    int numberOfGroupsToGenerate = allStudents.size() / groupSize;
    int studentWithoutGroup = allStudents.size() % groupSize;
    if (studentWithoutGroup != 0) groups.add(this.getListOfStudentsWithoutGroup(allStudents, groupSize));
    for(int i = 0; i < numberOfGroupsToGenerate; i++) {
        boolean isGroupCreated = false;
        while (!isGroupCreated){
            Collections.shuffle(allStudents);
            List<Student> newGroup = this.createNewRandomGroupOfStudents(allStudents, groupSize);
            groups.add(newGroup);
            if (!DataManager.isNewGroupDuplicated(newGroup.toString())) {
                isGroupCreated = true;
                allStudents.removeAll(newGroup);
            }
        }
    }
    DataManager.saveGroupsToCache(groups);
    return groups;
}

printGroups()

private void printGroups(List<List<Student>> lists) {
        System.out.println(lists);

    }

This statement responseFuture.cancel(true); does not kill thread where responseFuture is doing the methods. So what is the most elegant way to terminate CompletableFuture thread ?

like image 799
masterofdisaster Avatar asked Oct 20 '25 12:10

masterofdisaster


2 Answers

When you create a chain of CompletableFuture stages like b = a.thenApply(function), this handy method creates a setup of different components. Basically, these components refer to each other as a → function → b, so the completion of a will trigger the evaluation of function which will first pre-check whether b still is not completed, then evaluate your function and attempt to complete b with the result.

But b itself has no knowledge of function or the thread that will evaluate it. In fact, function is not special to b, anyone could call complete, completeExceptionally or cancel on it from any thread, the first one winning. Hence, the completable in the class name.

The only way to get hands on the threads evaluating the functions, is to be in control of them right from the start, e.g.

ExecutorService myWorkers = Executors.newFixedThreadPool(2);

CompletableFuture<FinalResultType> future
    = CompletableFuture.supplyAsync(() -> generateInitialValue(), myWorkers)
                       .thenApplyAsync(v -> nextCalculation(v), myWorkers)
                       .thenApplyAsync(v -> lastCalculation(v), myWorkers);
future.whenComplete((x,y) -> myWorkers.shutdownNow());

Now, the completion of future, e.g. via cancellation, will ensure that no new evaluation will be triggered by this chain and further makes an attempt to interrupt ongoing evaluations, if any.

So you can implement a timeout, e.g.

try {
    try {
        FinalResultType result = future.get(2, TimeUnit.SECONDS);
        System.out.println("got "+result);
    }
    catch(TimeoutException ex) {
        if(future.cancel(true)) System.out.println("cancelled");
        else System.out.println("got "+future.get());
    }
}
catch(ExecutionException|InterruptedException ex) {
    ex.printStackTrace();
}

Not that the rejection of tasks due to the shutdown of the thread pool may cause some of the intermediate future to never complete, but for this chain of stages, this is irrelevant. All that matters, is, that the final stage future is completed, which is guaranteed, as it is its completion which triggers the shutdown.

like image 161
Holger Avatar answered Oct 22 '25 00:10

Holger


The only way to terminate a thread is via interruption, which is a cooperative mechanism. This means the the thread must implement interruption logic, by handling the InterruptedException.

But it is a really bad practice to interrupt threads that you don't own, which I think is your case.

like image 45
vbuhlev Avatar answered Oct 22 '25 01:10

vbuhlev



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!