Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PipedInputStream always blocks when calling read() with an empty buffer. Is there any way of stopping this?

I've searched through all the questions I can find relating to PipedInputStreams and PipedOutputStreams and have not found anything that can help me. Hopefully someone here will have come across something similar.

Background:

I have a class that reads data from any java.io.InputStream. The class has a method called hasNext(), which checks the given InputStream for data, returning true if data is found, false otherwise. This hasNext() method works perfectly with other InputStreams but when I try to use a PipedInputStream (fed from a PipedOutputStream in a different Thread, encapsulated in the inputSupplier variable below), it hangs. After looking into how the hasNext() method works, I recreated the problem with the following code:

public static void main(String [] args){
    PipedInputStream inputSourceStream = new PipedInputStream(inputSupplier.getOutputStream());
    byte[] input = new byte[4096];
    int bytes_read = inputSourceStream.read(input, 0, 4096);
}

The inputSupplier is simply an instance of a small class I wrote that runs in its own thread with a local PipedOutputStream to avoid getting deadlocks.

The Problem So, my problem is that the hasNext() method calls PipedInputStream.read() method on the stream to ascertain whether there is any data to be read. This causes a blocking read operation that never exits, until some data arrives to be read. This means that my function of hasNext() will never return false (or at all) if the stream is empty.

Disclaimer: I know about the available() method but all that tells me is that there are no bytes available, not that we are at the end of the stream (whatever implementation of a Stream that may be), and so read() is required to check this.

[Edit] The whole purpose of me initially using a PipedInputStream was to simulate a "bursty" source of data. That is, I need to have a Stream that I can write to sporadically to see if my hasNext() method will detect that there is new data on the Stream upon reading it. If there is a better way of doing this then I would be thrilled to hear it!

like image 419
Jon Norman Avatar asked Dec 06 '25 03:12

Jon Norman


1 Answers

I hate to necro a question this old, but this is near the top of google's results, and I just found a solution for myself: this circular byte buffer exposes in and out streams, and the read method returns -1 immediately when no data is present. A little bit of threading, and your test classes can provide data exactly the way you want.

http://ostermiller.org/utils/src/CircularByteBuffer.java.html

Edit

Turns out I misunderstood the documentation of the above class, and it only returns -1 when a thread calling read() is interrupted. I made a quick mod to the read method that gives me what I want (original code commented out, the only new code is the substitution of an else for the else if:

@Override public int read(byte[] cbuf, int off, int len) throws IOException {
        //while (true){
            synchronized (CircularByteBuffer.this){
                if (inputStreamClosed) throw new IOException("InputStream has been closed; cannot read from a closed InputStream.");
                int available = CircularByteBuffer.this.available();
                if (available > 0){
                    int length = Math.min(len, available);
                    int firstLen = Math.min(length, buffer.length - readPosition);
                    int secondLen = length - firstLen;
                    System.arraycopy(buffer, readPosition, cbuf, off, firstLen);
                    if (secondLen > 0){
                        System.arraycopy(buffer, 0, cbuf, off+firstLen,  secondLen);
                        readPosition = secondLen;
                    } else {
                        readPosition += length;
                    }
                    if (readPosition == buffer.length) {
                        readPosition = 0;
                    }
                    ensureMark();
                    return length;
                //} else if (outputStreamClosed){
                } else {  // << new line of code
                    return -1;
                }
            }
            //try {
            //    Thread.sleep(100);
            //} catch(Exception x){
            //    throw new IOException("Blocking read operation interrupted.");
            //}
        //}
    }

```

like image 139
Chris Meyer Avatar answered Dec 08 '25 17:12

Chris Meyer