at the moment I'm writing a pure Rust MQTT5 library (I know there are existing ones out there, but I'm more trying to learn Rust) and I stumpled upon this problem.
I'm using latest stable rust with tokio 1.0.1.
When I send out a packet over the wire, I often expect a response from the server (example below PingReq/PingAck, Ping/Pong).
Leaving out a lot if logic regarding timeouts and packet clashes I wrote a simplified version of the logic in JavaScript (since I know that fairly well).
How would this logic translate to Rust and its futures? Or to be more clear: Can I somehow recreate the resolve() callback function behavior of awaitPackage + onIncomingPacket?
class Client {
awaitedPacketTypes = {};
/**
* a ping consist of a send ping and a receive pong
*/
async ping(){
await this.sendPacket("Ping");
return await this.awaitPackage("Pong");
}
async sendPacket(packetType) { /*...*/ }
/**
* This expects a specific packet type to be received in the future
* @param {*} packetType
*/
awaitPackage(packetType) {
return new Promise((resolve, reject) => {
this.awaitedPacketTypes[packetType] = {
resolve,
reject
};
});
}
/**
* This gets called for every packet from the network side and calls the correct resolver if something waits for this packet type
* @param {*} packet
*/
onIncomingPacket(packet) {
if(this.awaitedPacketTypes[packet.type]) {
this.awaitedPacketTypes[packet.type].resolve(packet);
this.awaitedPacketTypes[packet.type] = undefined;
} else {
/*...*/
}
}
}
Or to be more clear: Can I somehow recreate the resolve() callback function behavior of awaitPackage + onIncomingPacket?
Kinda? A rust Future is only "something which can be polled for readiness", it's a much lower-level concept than a JS promise.
There are libraries which claim to provide JS-style promises, but most every async library probably provides a similar object named differently e.g. in Tokio, you'd probably want a oneshot channel, that is a channel on which a single value can be sent, resulting in something along the lines of:
struct Packet { r#type: &'static str }
struct Client {
awaited: Mutex<HashMap<&'static str, Sender<Packet>>>
}
impl Client {
async fn ping(&self) -> Packet {
self.send_packet("Pong").await;
self.await_package("Pong").await.unwrap()
}
async fn send_packet(&self, _: &'static str) {}
fn await_package(&self, packet_type: &'static str) -> Receiver<Packet> {
let (tx, rx) = channel();
self.awaited.lock().unwrap().insert(packet_type, tx);
rx
}
fn on_incoming_packet(&self, packet: Packet) {
if let Some(tx) = self.awaited.lock().unwrap().remove(packet.r#type) {
tx.send(packet);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With