I am invoking List of threads using invokeAll(). AFAIK invokeAll() will return only when all the threads completes its task.
ExecutorService threadExecutor = Executors.newFixedThreadPool(getThreadSize());
List<Future<Object>> future = w_threadExecutor.invokeAll(threadList);
this is called when all threads finishes
for (Future<Object> w_inProgressThread : w_future)
{
//
It stops the thread in which exception occured and not the remaining one. Is there a way to stop all the other threads if any of the thread throws exception? Or do I have to submit each task instead of invokeAll()??
I tried using invokeAny() instead on invokeAll() but does not cancell remaining task invokeAny() : If one of the tasks complete (or throws an exception), the rest of the Callable's are cancelled. Ref : http://tutorials.jenkov.com/java-util-concurrent/executorservice.html
Update :
CompletionService<Object> completionService = new ExecutorCompletionService<Object>(w_threadExecutor);
List<Future<Object>> futures = new ArrayList<Future<Object>>();
for(Thread w_mt : threadList)
{
futures.add(completionService.submit(w_mt));
}
for (int numTaken = 0; numTaken < futures.size(); numTaken++) {
Future f = completionService.take();
try {
Object result = f.get();
System.out.println(result); // do something with the normal result
} catch (Exception e) {
System.out.println("Catched ExecutionException, shutdown now!");
//threadExecutor.shutdownNow();
Thread.currentThread().interrupt();
for (Future<Object> inProgressThread : futures)
{
inProgressThread.cancel(true);
}
break;
}
Update 1:
As suggested by waltersu I tried
ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
CompletionService<Object> completionService = new ExecutorCompletionService<Object>(threadExecutor);
List<Future<Object>> futures = new ArrayList<Future<Object>>();
futures.add(completionService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
String s=null;
// Thread.sleep(1000);
for(int i=0; i < 1000000; i++){
int j =10 ;
if(i==100)
{
s.toString();
}
System.out.println("dazfczdsa :: " + i);
}
//throw new Exception("This is an expected Exception");
return s;
}
}));
futures.add(completionService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
for(int i=0; i < 1000000; i++){
int j =0 ;
j= j+2;
System.out.println("dasa :: " + i);
}
Thread.sleep(3000);
return "My First Result";
}
}));
while (futures.size() > 0) {
Future f = completionService.take();
futures.remove(f);
try {
Object result = f.get();
System.out.println(result); // do something with the normal result
} catch (ExecutionException e) {
System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!");
f.cancel(true);
threadExecutor.shutdownNow();
break;
}
}
System.out.println("Main exists");
this does not stop when exception occurs
You have to submit() one by one, instead of invokeAll(), then check if the Future has Exception.
public static void main(String[] args) throws InterruptedException {
ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
CompletionService<Object> completionService = new ExecutorCompletionService<>(threadExecutor);
List<Future<Object>> futures = new ArrayList<>();
futures.add(completionService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
Thread.sleep(1000);
throw new Exception("This is an expected Exception");
}
}));
futures.add(completionService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
Thread.sleep(3000);
return "My First Result";
}
}));
while (futures.size() > 0) {
Future f = completionService.take();
futures.remove(f);
try {
Object result = f.get();
System.out.println(result); // do something with the normal result
} catch (ExecutionException e) {
System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!");
threadExecutor.shutdownNow();
break;
}
}
System.out.println("Main exists");
}
Update 1: (To answer op's update 1 question)
That's because your task has a long loop which doesn't check the interruption, which makes your task noncancelable. Then how do you stop it? I think you have to modify your other tasks to make them cancelable. As the official doc says:
What if a thread goes a long time without invoking a method that throws InterruptedException? Then it must periodically invoke Thread.interrupted, which returns true if an interrupt has been received. For example:
for (int i = 0; i < inputs.length; i++) {
heavyCrunch(inputs[i]);
if (Thread.interrupted()) {
// We've been interrupted: no more crunching.
return;
}
}
What if you don't want to modify your task and also want it to stop quickly? There's a method you can stop a noncancelable thread. It's Thread.stop(). But, at first, you can't get the thread from threadpool without using reflection. Besides, it's deprecated because "it is inherently unsafe" according to the javadoc.
So, the best practice(I think) is to check the interruption in your task(or part of code) which is both noncancelable and spending a long time to finish.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With