Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I achieve shared application state with Warp async routes?

I have a Rust application using warp. It implements a RESTful CRUD API. I need each route handler (i.e., the function that ends up being ultimately called by the warp filters) to have access to, and (in most cases) mutate shared application state.

The only way I can get this to compile is by cloning an Arc<Mutex<State>> for each route:

    /* internal_state is loaded from a dump file earlier on and is of type `State` */

    let state: Arc<Mutex<State>> = Arc::new(Mutex::new(internal_state));
    let index_book_state: Arc<Mutex<State>> = state.clone();
    let create_book_state: Arc<Mutex<State>> = state.clone();
    let read_book_state: Arc<Mutex<State>> = state.clone();

    let create_order_state: Arc<Mutex<State>> = state.clone();
    let read_order_state: Arc<Mutex<State>> = state.clone();
    let update_order_state: Arc<Mutex<State>> = state.clone();
    let destroy_order_state: Arc<Mutex<State>> = state.clone();

/* define CRUD routes for order books */
    let book_prefix = warp::path!("book");
    let index_book_route = book_prefix
        .and(warp::get())
        .and(warp::any().map(move || index_book_state.clone()))
        .and_then(handler::index_book_handler);
    let create_book_route = book_prefix
        .and(warp::post())
        .and(warp::body::json())
        .and(warp::any().map(move || create_book_state.clone()))
        .and_then(handler::create_book_handler);
    let read_book_route = warp::path!("book" / String)
        .and(warp::get())
        .and(warp::any().map(move || read_book_state.clone()))
        .and_then(handler::read_book_handler);

    /* define CRUD routes for orders */
    let create_order_route = warp::path!("book" / String)
        .and(warp::post())
        .and(warp::body::json())
        .and(warp::any().map(move || create_order_state.clone()))
        .and_then(handler::create_order_handler);
    let read_order_route = warp::path!("book" / String / "order" / String)
        .and(warp::get())
        .and(warp::any().map(move || read_order_state.clone()))
        .and_then(handler::read_order_handler);
    let update_order_route = warp::path!("book" / String / "order" / String)
        .and(warp::put())
        .and(warp::body::json())
        .and(warp::any().map(move || update_order_state.clone()))
        .and_then(handler::update_order_handler);
    let destroy_order_route = warp::path!("book" / String / "order" / String)
        .and(warp::delete())
        .and(warp::any().map(move || destroy_order_state.clone()))
        .and_then(handler::destroy_order_handler);

    /* aggregate all of our order book routes */
    let book_routes =
        index_book_route.or(create_book_route).or(read_book_route);

    /* aggregate all of our order routes */
    let order_routes = create_order_route
        .or(read_order_route)
        .or(update_order_route)
        .or(destroy_order_route);

    /* aggregate all of our routes */
    let routes = book_routes.or(order_routes);
  1. I doubt that this is actually correct behaviour (despite compiling and running).

  2. This seems extremely ugly for what is a relatively simple requirement.

  3. Most importantly, inside my route handlers I will need to make calls to async functions, thus requiring the handlers themselves to be marked as async, etc. When I mark the handlers as async, the compiler complains due to futures being unable to be sent across threads.

How can I achieve shared application state while having route handlers themselves be async?

A signature of a route handler (they're all the same):

/* matches routes like POST `http://example.com/[market]/` */
pub async fn create_order_handler(market: String, request: CreateOrderRequest, state: Arc<Mutex<State>>, rpc_endpoint: String) -> Result<impl Reply, Rejection>
like image 444
sporejack Avatar asked Oct 18 '25 09:10

sporejack


1 Answers

You share state via shared ownership (such as an Arc) paired with thread-safe interior mutability (such as Mutex, RwLock, or an atomic):

use std::sync::{Arc, Mutex};
use warp::Filter;

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(0));

    let market = warp::path!("market" / String).map({
        let state = state.clone();
        move |market| {
            *state.lock().unwrap() += 1;
            format!("Market: {}", market)
        }
    });

    let plaza = warp::path!("plaza" / String).map({
        let state = state.clone();
        move |plaza| {
            let state = *state.lock().unwrap();
            format!("Plaza: {} ({})", plaza, state)
        }
    });

    let routes = market.or(plaza);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
% curl 127.0.0.1:3030/market/one
Market: one

% curl 127.0.0.1:3030/plaza/one
Plaza: one (1)

To perform asynchronous work, use Filter::and_then:

use std::{
    convert::Infallible,
    sync::{Arc, Mutex},
};
use warp::Filter;

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(0));

    let market = warp::path!("market" / String).and_then({
        let state = state.clone();
        move |market| {
            let state = state.clone();
            async move {
                *state.lock().unwrap() += 1;
                Ok::<_, Infallible>(format!("Market: {}", market))
            }
        }
    });

    let plaza = warp::path!("plaza" / String).and_then({
        let state = state.clone();
        move |plaza| {
            let state = state.clone();
            async move {
                let state = *state.lock().unwrap();
                Ok::<_, Infallible>(format!("Plaza: {} ({})", plaza, state))
            }
        }
    });

    let routes = market.or(plaza);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}

These can even be separate functions:

use std::{
    convert::Infallible,
    sync::{Arc, Mutex},
};
use warp::Filter;

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(0));

    let market = warp::path!("market" / String).and_then({
        let state = state.clone();
        move |m| market(m, state.clone())
    });

    let plaza = warp::path!("plaza" / String).and_then({
        let state = state.clone();
        move |p| plaza(p, state.clone())
    });

    let routes = market.or(plaza);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}

type State = Arc<Mutex<i32>>;

async fn market(market: String, state: State) -> Result<String, Infallible> {
    *state.lock().unwrap() += 1;
    Ok::<_, Infallible>(format!("Market: {}", market))
}

async fn plaza(plaza: String, state: State) -> Result<String, Infallible> {
    let state = *state.lock().unwrap();
    Ok::<_, Infallible>(format!("Plaza: {} ({})", plaza, state))
}

There's a second set of clones here because there are two distinct things owning data:

  1. The handler itself (the closure)
  2. The future returned by the closure (the async code)

See also:

  • Is there another option to share an Arc in multiple closures besides cloning it before each closure?
  • Dependency Injection in Rust Warp
  • Is there a way to do validation as part of a filter in Warp?

[dependencies]
warp = "0.3.0"
tokio = { version = "1.2.0", features = ["full"] }
like image 89
Shepmaster Avatar answered Oct 21 '25 01:10

Shepmaster