I have a non-seekable file-like object. In particular it is a file of indeterminate size coming from an HTTP request.
import requests
fileobj = requests.get(url, stream=True)
I am streaming this file to a call to an Amazon AWS SDK function which is writing the contents to Amazon S3. This is working fine.
import boto3
s3 = boto3.resource('s3')
s3.bucket('my-bucket').upload_fileobj(fileobj, 'target-file-name')
However, at the same time as streaming it to S3 I want to also stream the data to another process. This other process may not need the entire stream and might stop listening at some point; this is fine and should not affect the stream to S3.
It's important I don't use too much memory, since some of these files could be enormous. I don't want to write anything to disk for the same reason.
I don't mind if either sink is slowed down due to the other being slow, as long as S3 eventually gets the entire file, and the data goes to both sinks (rather, to each one which still wants it).
What's the best way to go about this in Python (3)? I know I can't just pass the same file object to both sinks, such as
s3.bucket('my-bucket').upload_fileobj(fileobj, 'target-file-name')
# At the same time somehow as
process = subprocess.Popen(['myapp'], stdin=fileobj)
I think I could write a wrapper for the file-like object which passes any data read not only to the caller (which would be the S3 sink) but also to the other process. Something like
class MyFilewrapper(object):
    def __init__(self, fileobj):
        self._fileobj = fileobj
        self._process = subprocess.Popen(['myapp'], stdin=popen.PIPE)
    def read(self, size=-1):
        data = self._fileobj.read(size)
        self._process.stdin.write(data)
        return data
filewrapper = MyFilewrapper(fileobj)
s3.bucket('my-bucket').upload_fileobj(filewrapper, 'target-file-name')
But is there a better way to do it? Perhaps something like
streams = StreamDuplicator(fileobj, streams=2)
s3.bucket('my-bucket').upload_fileobj(streams[0], 'target-file-name')
# At the same time somehow as
process = subprocess.Popen(['myapp'], stdin=streams[1])
The discomfort regarding your MyFilewrapper solution arises, because the IO loop inside upload_fileobj is now in control of feeding the data to a subprocess that is strictly speaking unrelated to the upload.
A "proper" solution would involve an upload API that provides a file-like object for writing the upload stream with an outside loop. That would allow you to feed the data to both target streams "cleanly".
The following example shows the basic concept. The fictional startupload method provides the file-like object for uploading. Of cource you would need to add proper error handling etc.
fileobj = requests.get(url, stream=True)
upload_fd = s3.bucket('my-bucket').startupload('target-file-name')
other_fd = ... # Popen or whatever
buf = memoryview(bytearray(4046))
while True:
    r = fileobj.read_into(buf)
    if r == 0:
        break
    read_slice = buf[:r]
    upload_fd.write(read_slice)
    other_fd.write(read_slice)
Here is an implementation of StreamDuplicator with requested functionality and use model. I verified that it handles correctly the case when one of the sinks stops consuming the respective stream half-way.
Usage:
./streamduplicator.py <sink1_command> <sink2_command> ...
Example:
$ seq 100000 | ./streamduplicator.py "sed -n '/0000/ {s/^/sed: /;p}'" "grep 1234"
Output:
sed: 10000
1234
11234
12340
12341
12342
12343
12344
12345
12346
12347
12348
12349
21234
sed: 20000
31234
sed: 30000
41234
sed: 40000
51234
sed: 50000
61234
sed: 60000
71234
sed: 70000
81234
sed: 80000
91234
sed: 90000
sed: 100000
streamduplicator.py:
#!/usr/bin/env python3
import sys
import os
from subprocess import Popen
from threading import Thread
from time import sleep
import shlex
import fcntl
WRITE_TIMEOUT=0.1
def write_or_timeout(stream, data, timeout):
    data_to_write = data[:]
    time_to_sleep = 1e-6
    time_remaining = 1.0 * timeout
    while time_to_sleep != 0:
        try:
            stream.write(data_to_write)
            return True
        except BlockingIOError as ex:
            data_to_write = data_to_write[ex.characters_written:]
            if ex.characters_written == 0:
                time_to_sleep *= 2
            else:
                time_to_sleep = 1e-6
                time_remaining = timeout
        time_to_sleep = min(time_remaining, time_to_sleep)
        sleep(time_to_sleep)
        time_remaining -= time_to_sleep
    return False
class StreamDuplicator(object):
    def __init__(self, stream, n, timeout=WRITE_TIMEOUT):
        self.stream = stream
        self.write_timeout = timeout
        self.pipereadstreams = []
        self.pipewritestreams = []
        for i in range(n):
            (r, w) = os.pipe()
            readStream = open(r, 'rb')
            self.pipereadstreams.append(readStream)
            old_flags = fcntl.fcntl(w, fcntl.F_GETFL);
            fcntl.fcntl(w, fcntl.F_SETFL, old_flags|os.O_NONBLOCK)
            self.pipewritestreams.append(os.fdopen(w, 'wb'))
        Thread(target=self).start()
    def __call__(self):
        while True:
            data = self.stream.read(1024*16)
            if len(data) == 0:
                break
            surviving_pipes = []
            for p in self.pipewritestreams:
                if write_or_timeout(p, data, self.write_timeout) == True:
                    surviving_pipes.append(p)
            self.pipewritestreams = surviving_pipes
    def __getitem__(self, i):
        return self.pipereadstreams[i]
if __name__ == '__main__':
    n = len(sys.argv)
    streams = StreamDuplicator(sys.stdin.buffer, n-1, 3)
    for (i,cmd) in zip(range(n-1), sys.argv[1:]):
        Popen(shlex.split(cmd), stdin=streams[i])
Implementation limitations:
usage of fcntl to set a pipe writing file descriptor to non-blocking mode probably makes it unusable under Windows.
a closed/unsubscribed sink is detected through a write timeout.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With