Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pump bytes from asyncio StreamReader into a file descriptor

I have a Python function (implemented in C++) that reads from a file descriptor (wrapped in FILE* on the C++ side) and I need to feed the function from an asyncio.StreamReader. Specifically, the reader is the content of a HTTP response: aiohttp.ClientResponse.content.

I thought I might open a pipe, pass the read-end to the C++ function and connect the write-end to asyncio's event loop. However, how can I move the data from the stream reader to the pipe, with proper flow control and as little copying as possible?

The skeleton of the code with the missing parts is as following:

# obtain the StreamReader from aiohttp
content = aiohttp_client_response.content
# create a pipe
(pipe_read_fd, pipe_write_fd) = os.pipe()

# now I need a suitable protocol to manage the pipe transport
protocol = ?
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd)

# the protocol should start reading from `content` and writing into the pipe
return pipe_read_fd
like image 465
Jan Špaček Avatar asked Dec 10 '25 13:12

Jan Špaček


1 Answers

From the subprocess_attach_write_pipe asyncio example:

rfd, wfd = os.pipe()
pipe = open(wfd, 'wb', 0)
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe)
transport.write(b'data')

EDIT - For write flow control, see the following methods:

  • WriteTransport.set_write_buffer_limits
  • BaseProtocol.pause_writing
  • BaseProtocol.resume_writing

Here's a possible FlowControl implementation, inspired by StreamWriter.drain:

class FlowControl(asyncio.streams.FlowControlMixin):
    async def drain(self):
        await self._drain_helper()

Usage:

transport, protocol = await loop.connect_write_pipe(FlowControl, pipe)
transport.write(b'data')
await protocol.drain()
like image 178
Vincent Avatar answered Dec 13 '25 02:12

Vincent



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!