I have constructed a proof-of-concept where tokio::io::copy will hang forever when switching between Cellular / WiFi / Wired networks if the reader is a reqwest::async_impl::Response wrapped in a tokio_io::AsyncRead using FuturesAsyncReadCompatExt.
This happens on macOS and iOS, which are the platforms I have access to.
cargo run) (UPDATE: This now contains a fix/workaround as well)#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push("bbb.mp4");
    println!("File will be downloaded to {target_file:?}");
    let client = ClientBuilder::default()
        // Doesn't seem to help
        .tcp_keepalive(Some(Duration::from_secs(1)))
        // Doesn't seem to help
        .connect_timeout(Duration::from_secs(1))
        .build()
        .unwrap();
    let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) => println!("Everything OK"),
        Err(err) => eprintln!("{err}"),
    }
}
async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
    let download = response.bytes_stream();
    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();
    let download = download.compat();
    // Wrap download to be able to get progress in terminal
    let mut download = ProgressReadAdapter::new(download);
    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;
    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;
    // Code hangs here forever after a network switch
    tokio::io::copy(&mut download, &mut outfile)
        .await
        .wrap_api_err()?;
    outfile.flush().await.wrap_api_err()?;
    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;
    Ok(())
}
There are some concepts in the code above, such as wrap_api_err that can be found on GitHub, but I don't think they're important for analyzing the problem.
The main question is - How can I make response_to_file exit with an Err after switching networks?
The second question might be - If there is no easy way to fix this code, how do I make a streaming copy of a network resource to a temp file that actually exits cleanly when there is an error?
I was able to finally reach some conclusions.
This issue on curl's GitHub page led me to believe that the reason was not reqwest but rather the underlying network stack on macOS / iOS.
I asked this question on seanmonstar/reqwest, which was answered by Sean stating that there was already an issue for a similar idea (low/no-activity timeouts in reqwest).
Basically what I believe is happening is that the network stack has my outstanding response and tries to keep reading more data from the underlying TCP connection even though the original WiFi connection has been "disconnected". As per the curl discussion, this is a thing that just happens and TCP / HTTP isn't at fault so it can't really be detected by client libraries.
What client libraries can do is to detect that there is no data coming into the Response (in the case of reqwest, at least). Currently, reqwest doesn't have this functionality built in, but it can be emulated with a little bit of work.
Using this StackOverflow answer as a starting point I built an AsyncRead wrapper that detects a stalled Response and exits cleanly with an error after a given time has elapsed.
The full code can be found on my GitHub repo bes/network-switch-hang, which was originally the repo for the bug proof-of-concept, but is now also an answer.
For completeness, here are the most important parts of the code, at least until reqwest grows a native way of detecting stalled Responses.
#[tokio::main()]
async fn main() {
    let mut target_file = std::env::current_dir().unwrap();
    target_file.push("bbb.mp4");
    println!("File will be downloaded to {target_file:?}");
    let client = ClientBuilder::default()
        .connect_timeout(Duration::from_secs(5))
        .build()
        .unwrap();
    let response = client.get("http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_native_60fps_stereo_abl.mp4").send().await.unwrap();
    match response_to_file(response, target_file).await {
        Ok(_) => println!("Everything OK"),
        Err(err) => eprintln!("{err}"),
    }
}
async fn response_to_file(response: Response, path: PathBuf) -> Result<(), ApiError> {
    let download = response.bytes_stream();
    let download = download
        .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
        .into_async_read();
    let download = download.compat();
    // Wrap download to be able to detect stalled downloads
    let mut download = StalledReadMonitor::new(download);
    let temp_file = tokio::task::spawn_blocking(NamedTempFile::new)
        .await
        .wrap_api_err()?
        .wrap_api_err()?;
    let mut outfile = tokio::fs::File::create(temp_file.path())
        .await
        .wrap_api_err()?;
    // Code used to hang here, but will now exit with an error after being stalled for
    // more than 5 seconds. See StalledReadMonitor for details.
    tokio::io::copy(&mut download, &mut outfile)
        .await
        .wrap_api_err()?;
    outfile.flush().await.wrap_api_err()?;
    let _persisted_file: File = tokio::task::spawn_blocking(move || temp_file.persist(path))
        .await
        .wrap_api_err()?
        .wrap_api_err()?;
    Ok(())
}
/// This monitor can wrap an [AsyncRead] and make sure that it is making progress.
/// If the inner reader isn't making progress, we can stop the download.
/// The monitoring is done by keeping an [Interval] and measuring progress
/// by counting the number of bytes during each interval.
///
/// Please note that this monitor won't stop the download after _exactly_
/// five seconds of inactivity, but rather five seconds after the last interval
/// that had data. So the worst case is 10 seconds, and the averge will be 7.5 seconds.
#[pin_project]
pub struct StalledReadMonitor<R: AsyncRead> {
    #[pin]
    inner: R,
    interval: Interval,
    interval_bytes: usize,
}
impl<R: AsyncRead> StalledReadMonitor<R> {
    pub fn new(inner: R) -> Self {
        Self {
            inner,
            interval: interval_at(
                Instant::now().add(Duration::from_millis(5_000)),
                Duration::from_millis(5_000),
            ),
            interval_bytes: 0,
        }
    }
}
impl<R: AsyncRead> AsyncRead for StalledReadMonitor<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        let this = self.project();
        let before = buf.filled().len();
        let mut result = this.inner.poll_read(cx, buf);
        let after = buf.filled().len();
        *this.interval_bytes += after - before;
        match this.interval.poll_tick(cx) {
            Poll::Pending => {}
            Poll::Ready(_) => {
                if *this.interval_bytes == 0 {
                    println!("Rate is too low, aborting fetch");
                    result = Poll::Ready(Err(std::io::Error::new(
                        ErrorKind::TimedOut,
                        StalledError {},
                    )))
                }
                *this.interval_bytes = 0;
            }
        };
        result
    }
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With