Here is my code:
class Processor implements Runnable {
private int id;
private Integer interaction;
private Set<Integer> subset;
private static volatile AtomicBoolean notRemoved = new AtomicBoolean(true);
public Object<E> dcp;
public Iterator<Integer> iterator;
public Processor(int id, Integer interaction, Set<Integer> subset, Object<E> dcp, Iterator<Integer> iterator) {
this.id = id;
this.interaction = interaction;
this.subset= subset;
this.dcp = dcp;
this.iterator = iterator;
}
public void run() {
while (Processor.notRemoved.get()){
System.out.println("Starting: " + this.id);
if (this.dcp.PA.contains(this.interaction)){
this.subset.add(this.interaction);
this.dcp.increaseScore(this.subset);
if (!this.subset.contains(this.interaction) && Processor.notRemoved.get()){
Processor.notRemoved.set(false);
iterator.remove();
}
}
System.out.println("Completed: " + this.id);
}
}
}
public class ConcurrentApp {
public void mainFunction (Object<E> dcp, int threads) {
ExecutorService executor = Executors.newFixedThreadPool(threads);
int i =1;
while ((dcp.PA.size() > i) && (i <= dcp.R)){
for (Iterator<Integer> iterator = dcp.PA.iterator(); iterator.hasNext();){
Integer interaction = iterator.next();
ArrayList<Integer> removed = new ArrayList<Integer>(dcp.PA);
removed.remove(interaction);
ArrayList<Set<Integer>> subsets = dcp.getSubsets(removed, i);
for (int j = 0; j< subsets.size(); j++){
executor.submit(new Processor(j, interaction, subsets.get(j), dcp, iterator));
}
executor.shutdown();
System.out.println("All tasks submitted");
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
System.out.println("HERE");
e.printStackTrace();
}
}
System.out.println("All tasks completed");
i++;
}
}
}
When I run mainFunction in ConcurrentApp, I get the following error: Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2c7b84de rejected from java.util.concurrent.ThreadPoolExecutor@3fee733d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 8]
I know this is because I'm not using executor.shutdown() correctly but I'm not sure why?
EDIT: I print when each thread starts and completes its task. Here is the console output:
Starting: 1
Starting: 2
All tasks submitted
Starting: 0
Completed: 2
Completed: 1
Completed: 0
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2c7b84de rejected from java.util.concurrent.ThreadPoolExecutor@3fee733d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 8]
This at least shows that the 3 threads in the thread pool complete their task before the error goes off.
I figured out the problem here! It was simply because I was calling executor.shutdown() within the while loop before all the tasks were completed. So the new code is:
public void multiRemoveParents (DirectCausalPredictor<BayesianScoresNew> dcp, int threads) {
ExecutorService executor = Executors.newFixedThreadPool(threads);
int i =1;
while ((dcp.PA.size() > i) && (i <= dcp.R)){
for (Iterator<Integer> iterator = dcp.PA.iterator(); iterator.hasNext();){
Integer interaction = iterator.next();
ArrayList<Integer> removed = new ArrayList<Integer>(dcp.PA);
removed.remove(interaction);
ArrayList<Set<Integer>> subsets = dcp.getSubsets(removed, i);
for (int j = 0; j< subsets.size(); j++){
try {
executor.submit(new Processor(j, interaction, subsets.get(j), dcp, iterator));
} catch (RejectedExecutionException e){
System.out.println("Task was rejected");
}
}
}
System.out.println("All tasks completed");
i++;
}
executor.shutdown();
System.out.println("All tasks submitted");
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With