I have this piece of code:
private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue();
@Override
public void run(){
  while(!intervals.isEmpty()){
    //remove one interval
    //do calculations
    //add some intervals
  }
}
This code is being executed by a specific number of threads at the same time. As you see, loop should go on until there are no more intervals left in the collection, but there is a problem. In the beginning of each iteration an interval gets removed from collection and in the end some number of intervals might get added back into same collection.
Problem is, that while one thread is inside the loop the collection might become empty, so other threads that are trying to enter the loop won't be able to do that and will finish their work prematurely, even though collection might be filled with values after the first thread will finish the iteration. I want the thread count to remain constant (or not more than some number n) until all work is really finished.
That means that no threads are currently working in the loop and there are no elements left in the collection. What are possible ways of accomplishing that? Any ideas are welcomed.
One way to solve this problem in my specific case is to give every thread a different piece of the original collection. But after one thread would finish its work it wouldn't be used by the program anymore, even though it could help other threads with their calculations, so I don't like this solution, because it's important to utilize all cores of the machine in my problem.
This is the simplest minimal working example I could come up with. It might be to lengthy.
public class Test{   
   private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue();
   private int threadNumber;
   private Thread[] threads;
   private double result;
   public Test(int threadNumber){
      intervals.add(new Interval(0, 1));
      this.threadNumber = threadNumber;
      threads = new Thread[threadNumber];
   }
   public double find(){
      for(int i = 0; i < threadNumber; i++){
         threads[i] = new Thread(new Finder());
         threads[i].start();
      }
      try{
         for(int i = 0; i < threadNumber; i++){
            threads[i].join();
         }
      }
      catch(InterruptedException e){
         System.err.println(e);
      }
      return result;
   }   
   private class Finder implements Runnable{   
      @Override
      public void run(){
         while(!intervals.isEmpty()){
            Interval interval = intervals.poll();
            if(interval.high - interval.low > 1e-6){    
               double middle = (interval.high + interval.low) / 2;
               boolean something = true;
               if(something){
                  intervals.add(new Interval(interval.low + 0.1, middle - 0.1));
                  intervals.add(new Interval(middle + 0.1, interval.high - 0.1));
               }
               else{
                  intervals.add(new Interval(interval.low + 0.1, interval.high - 0.1));
               }
            }
         }
      }
   }
   private class Interval{
      double low;
      double high;
      public Interval(double low, double high){
         this.low = low;
         this.high = high;
      }
   }
}
What you might need to know about the program: After every iteration interval should either disappear (because it's too small), become smaller or split into two smaller intervals. Work is finished after no intervals are left. Also, I should be able to limit number of threads that are doing this work with some number n. The actual program looks for a maximum value of some function by dividing the intervals and throwing away the parts of those intervals that can't contain the maximum value using some rules, but this shouldn't really be relevant to my problem.
The CompletableFuture class is also an interesting solution for these kind of tasks. It automatically distributes workload over a number of worker threads.
static CompletableFuture<Integer> fibonacci(int n) {
  if(n < 2) return CompletableFuture.completedFuture(n);
  else {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread());
      CompletableFuture<Integer> f1 = fibonacci(n - 1);
      CompletableFuture<Integer> f2 = fibonacci(n - 2);
      return f1.thenCombineAsync(f2, (a, b) -> a + b);
    }).thenComposeAsync(f -> f);
  }
}
public static void main(String[] args) throws Exception {
  int fib = fibonacci(10).get();
  System.out.println(fib);
}
You can use atomic flag, i.e.:
private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue<>();
private AtomicBoolean inUse = new AtomicBoolean();
@Override
public void run() {
    while (!intervals.isEmpty() && inUse.compareAndSet(false, true)) {
        // work
        inUse.set(false);
    }
}
UPD
Question has been updated, so I would give you better solution. It is more "classic" solution using blocking queue;
private BlockingQueue<Interval> intervals = new ArrayBlockingQueue<Object>();
private volatile boolean finished = false;
@Override
public void run() {
    try {
        while (!finished) {
            Interval next = intervals.take();
            // put work there
            // after you decide work is finished just set finished = true
            intervals.put(interval); // anyway, return interval to queue
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
UPD2
Now it seems better to re-write solution and divide range to sub-ranges for each thread.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With