Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

DelayQueue with capacity

I'm using a DelayQueue. I need to use this so as to only take from the queue when a delay has passed. I also want to enforce a capacity, much like a BlockingQueue. I can't seem to find a Collections implementation of this. Does one exist? If not, what's the best way of implementing it? A basic approach would be to do something like this:

public void addSomethingToQueue(Object somethingToAdd){
    int capacity = 4;

    while(queue.size() >= capacity){
        try{
            wait();
        }catch(InterruptedException e){
            e.printStackTrace();
        }
    }

    queue.add(somethingToAdd);
}

This would mean calling notify / notifyAll each time something was removed. It's quite a small class so that's doable. It doesn't sound great though. And I'm not sure if the wait / notify may cause further problems?

Would it be better to sub-class DelayQueue and mess around with its methods? It feels a bit dodgy...

like image 575
Spycho Avatar asked Dec 11 '25 11:12

Spycho


1 Answers

Why not compose a BlockingQueue and a DelayQueue? For e.g.:

class MyDelayBlockingQueue<T> implements Queue {
    private final DelayQueue<T> delayQ = ...
    private final BlockingQueue<T> blockingQ = ...

    public synchronized void offer(T obj) {
        blockingQ.offer(obj); // this will block if the Q is full
        delayQ.offer(obj);
    }

    public synchronized T poll() {
        T obj = delayQ.poll(); // This will handle the delay
        if (obj != null) {
            blockingQ.poll();
        }
        return obj;
    }

    // ...    
}

EDIT

The code above will deadlock. If the Q is full, offer will block in a synchronized block, and all future calls to poll will block to acquire the intrinsic lock of the Q - causing a deadlock. Try something like instead:

public class DelayBlockingQueue<E extends Delayed>
{
    private final DelayQueue<E> delayQ = new DelayQueue<E>();
    private final Semaphore available;

    public DelayBlockingQueue(int capacity)
    {
        available = new Semaphore(capacity, true);
    }

    public void offer(E e) throws InterruptedException
    {
        available.acquire();
        delayQ.offer(e);
    }

    public E poll()
    {
        E e = delayQ.poll();
        if (e != null)
        {
            available.release();
        }
        return e;
    }
}
like image 144
Binil Thomas Avatar answered Dec 14 '25 00:12

Binil Thomas



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!