Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I simultaneously read messages from multiple Tokio channels in a single task?

I'd like to both read and process messages from two channels and construct another message and send this message via another channel.

Messages from the two channels are received at different frequencies (as per sleep).

Example: "foo1" and "bar1" are received, so we process them and form "foo1bar1". "foo2" is received ("bar2" will be received in 2sec), so we will process it as "foo2bar1". "foo3" is received, so "foo3bar1" is constructed. When "bar2" is received, then we get "foo4bar2" and so on.

In the current implementation, since the two tasks don't communicate with one another, I cannot do the "fooNbarM" construction.

use std::time::Duration;
use tokio;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::time::sleep;
use futures::future::join_all;

async fn message_sender(msg: &'static str, foo_tx: UnboundedSender<Result<&str, Box<dyn std::error::Error + Send>>>) {
    loop {
        match foo_tx.send(Ok(msg)) {
            Ok(()) => {
                if msg == "foo" {
                    sleep(Duration::from_millis(1000)).await;
                } else {
                    sleep(Duration::from_millis(3000)).await;
                }
            }
            Err(_) => {
                println!("failed to send foo");
                break;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let result: Vec<&str> = vec![];

    let (foo_tx, mut foo_rx): (
        UnboundedSender<Result<&str, Box<dyn std::error::Error + Send>>>,
        UnboundedReceiver<Result<&str, Box<dyn std::error::Error + Send>>>,
    ) = tokio::sync::mpsc::unbounded_channel();
    let (bar_tx, mut bar_rx): (
        UnboundedSender<Result<&str, Box<dyn std::error::Error + Send>>>,
        UnboundedReceiver<Result<&str, Box<dyn std::error::Error + Send>>>,
    ) = tokio::sync::mpsc::unbounded_channel();

    let foo_sender_handle = tokio::spawn(async move {
        message_sender("foo", foo_tx).await;
    });

    let foo_handle = tokio::spawn(async move {
        while let Some(v) = foo_rx.recv().await {
            println!("{:?}", v);
        }
    });

    let bar_sender_handle = tokio::spawn(async move {
        message_sender("bar", bar_tx).await;
    });

    let bar_handle = tokio::spawn(async move {
        while let Some(v) = bar_rx.recv().await {
            println!("{:?}", v);
        }
    });

    let handles = vec![foo_sender_handle, foo_handle, bar_sender_handle, bar_handle];
    join_all(handles.into_iter()).await;
}

Cargo.toml

[package]
name = "play"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.16.1", features = ["full"] }
futures = "0.3.21"
like image 319
user270199 Avatar asked Sep 07 '25 07:09

user270199


1 Answers

Use tokio::select to wait for either channel to become ready:

use futures::future; // 0.3.19
use std::time::Duration;
use tokio::{
    sync::mpsc::{self, UnboundedSender},
    time,
}; // 1.16.1

async fn message_sender(msg: &'static str, foo_tx: UnboundedSender<String>) {
    for count in 0.. {
        let message = format!("{msg}{count}");
        foo_tx.send(message).unwrap();

        if msg == "foo" {
            time::sleep(Duration::from_millis(100)).await;
        } else {
            time::sleep(Duration::from_millis(300)).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let (foo_tx, mut foo_rx) = mpsc::unbounded_channel();
    let (bar_tx, mut bar_rx) = mpsc::unbounded_channel();

    let foo_sender_handle = tokio::spawn(message_sender("foo", foo_tx));
    let bar_sender_handle = tokio::spawn(message_sender("bar", bar_tx));

    let receive_handle = tokio::spawn(async move {
        let mut foo = None;
        let mut bar = None;

        loop {
            tokio::select! {
                f = foo_rx.recv() => foo = f,
                b = bar_rx.recv() => bar = b,
            }

            if let (Some(foo), Some(bar)) = (&foo, &bar) {
                println!("{foo}{bar}");
            }
        }
    });

    future::join_all([foo_sender_handle, bar_sender_handle, receive_handle]).await;
}

You also have to handle the case where only one message has been received yet, so Option comes in useful.

like image 108
Shepmaster Avatar answered Sep 10 '25 13:09

Shepmaster