I have a Python function (implemented in C++) that reads from a file descriptor (wrapped in FILE* on the C++ side) and I need to feed the function from an asyncio.StreamReader. Specifically, the reader is the content of a HTTP response: aiohttp.ClientResponse.content.
I thought I might open a pipe, pass the read-end to the C++ function and connect the write-end to asyncio's event loop. However, how can I move the data from the stream reader to the pipe, with proper flow control and as little copying as possible?
The skeleton of the code with the missing parts is as following:
# obtain the StreamReader from aiohttp
content = aiohttp_client_response.content
# create a pipe
(pipe_read_fd, pipe_write_fd) = os.pipe()
# now I need a suitable protocol to manage the pipe transport
protocol = ?
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd)
# the protocol should start reading from `content` and writing into the pipe
return pipe_read_fd
From the subprocess_attach_write_pipe asyncio example:
rfd, wfd = os.pipe()
pipe = open(wfd, 'wb', 0)
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe)
transport.write(b'data')
EDIT - For write flow control, see the following methods:
Here's a possible FlowControl implementation, inspired by StreamWriter.drain:
class FlowControl(asyncio.streams.FlowControlMixin):
async def drain(self):
await self._drain_helper()
Usage:
transport, protocol = await loop.connect_write_pipe(FlowControl, pipe)
transport.write(b'data')
await protocol.drain()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With