Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Interrupt all threads if exception occurs in any of the thread among the thread list

I am invoking List of threads using invokeAll(). AFAIK invokeAll() will return only when all the threads completes its task.

ExecutorService threadExecutor = Executors.newFixedThreadPool(getThreadSize());
List<Future<Object>> future = w_threadExecutor.invokeAll(threadList);

this is called when all threads finishes

for (Future<Object> w_inProgressThread : w_future)
{
//

It stops the thread in which exception occured and not the remaining one. Is there a way to stop all the other threads if any of the thread throws exception? Or do I have to submit each task instead of invokeAll()??

I tried using invokeAny() instead on invokeAll() but does not cancell remaining task invokeAny() : If one of the tasks complete (or throws an exception), the rest of the Callable's are cancelled. Ref : http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

Update :

CompletionService<Object> completionService = new ExecutorCompletionService<Object>(w_threadExecutor);
                List<Future<Object>> futures = new ArrayList<Future<Object>>();
                for(Thread w_mt : threadList)
                {
                 futures.add(completionService.submit(w_mt));
                }
                for (int numTaken = 0; numTaken < futures.size(); numTaken++) {
                    Future f = completionService.take();
                    try {
                      Object result = f.get();
                      System.out.println(result);  // do something with the normal result
                    } catch (Exception e) {
                      System.out.println("Catched ExecutionException, shutdown now!");
                      //threadExecutor.shutdownNow();
                      Thread.currentThread().interrupt();

                      for (Future<Object> inProgressThread : futures)
                         {
                             inProgressThread.cancel(true);
                         } 
                      break;
                    }

Update 1:

As suggested by waltersu I tried

ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
              CompletionService<Object> completionService = new ExecutorCompletionService<Object>(threadExecutor);
              List<Future<Object>> futures = new ArrayList<Future<Object>>();
              futures.add(completionService.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    String s=null;
                //  Thread.sleep(1000);
                  for(int i=0; i < 1000000; i++){
                        int j =10 ;
                        if(i==100)
                        {

                        s.toString();
                        }

                        System.out.println("dazfczdsa :: " + i);
                    }
                  //throw new Exception("This is an expected Exception");
                return s;
                }
              }));
              futures.add(completionService.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    for(int i=0; i < 1000000; i++){
                        int j =0 ;
                        j= j+2;
                        System.out.println("dasa :: " + i);
                    }
                  Thread.sleep(3000);

                  return "My First Result";
                }
              }));

              while (futures.size() > 0) {
                Future f = completionService.take();
                futures.remove(f);
                try {
                  Object result = f.get();
                  System.out.println(result);  // do something with the normal result
                } catch (ExecutionException e) {
                  System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!");
                  f.cancel(true);
                  threadExecutor.shutdownNow();
                  break;
                }
              }
              System.out.println("Main exists");

this does not stop when exception occurs

like image 910
happy Avatar asked Oct 21 '25 06:10

happy


1 Answers

You have to submit() one by one, instead of invokeAll(), then check if the Future has Exception.

public static void main(String[] args) throws InterruptedException {
  ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
  CompletionService<Object> completionService = new ExecutorCompletionService<>(threadExecutor);
  List<Future<Object>> futures = new ArrayList<>();
  futures.add(completionService.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      Thread.sleep(1000);
      throw new Exception("This is an expected Exception");
    }
  }));
  futures.add(completionService.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
      Thread.sleep(3000);
      return "My First Result";
    }
  }));

  while (futures.size() > 0) {
    Future f = completionService.take();
    futures.remove(f);
    try {
      Object result = f.get();
      System.out.println(result);  // do something with the normal result
    } catch (ExecutionException e) {
      System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!");
      threadExecutor.shutdownNow();
      break;
    }
  }
  System.out.println("Main exists");
}

Update 1: (To answer op's update 1 question)

That's because your task has a long loop which doesn't check the interruption, which makes your task noncancelable. Then how do you stop it? I think you have to modify your other tasks to make them cancelable. As the official doc says:

What if a thread goes a long time without invoking a method that throws InterruptedException? Then it must periodically invoke Thread.interrupted, which returns true if an interrupt has been received. For example:

for (int i = 0; i < inputs.length; i++) {
    heavyCrunch(inputs[i]);
    if (Thread.interrupted()) {
        // We've been interrupted: no more crunching.
        return;
    }
}

What if you don't want to modify your task and also want it to stop quickly? There's a method you can stop a noncancelable thread. It's Thread.stop(). But, at first, you can't get the thread from threadpool without using reflection. Besides, it's deprecated because "it is inherently unsafe" according to the javadoc.

So, the best practice(I think) is to check the interruption in your task(or part of code) which is both noncancelable and spending a long time to finish.

like image 130
waltersu Avatar answered Oct 22 '25 19:10

waltersu