Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Trying to implement a thread-safe cache

I'm trying to implement a shared-state, thread-safe cache, using a HashMap behind a RwLock. Ideally, I would like my interface to expose a single function, which -- given the key -- will return a reference to the value in the cache (populating it, first, if necessary). However, my intermediate-Rust intuition is telling me that this approach is going to suffer from at least the following problems:

  1. Is it even possible to return a reference to the value? AFAIK, the HashMap is free to reallocate when required(*), so the lifetime of the reference is not the same as the HashMap and, effectively, non-deterministic (as far as the compiler's concerned). The HashMap::entry API returns a reference, but of course, it wouldn't be allowed to live longer than its enclosing scope. Is it therefore necessary to return a concrete value, forcing me to implement Copy for the value type?

    (*) Is this still true if the values don't implement Copy?

  2. RwLock is new to me. I started with Mutex, but RwLock looks more appropriate, because of the expected access patterns to the cache (i.e., write once, per key, read many times). However, I'm not sure if one can "upgrade" a read lock into a read-write lock. The Rust documentation suggests that a read lock will block a write lock, which would lead to a deadlock in the way I am proposing/hoping it to work. Can I drop the read lock and replace it with the write lock, or is it easier to use a full Mutex from the outset?

As an aside, the function that fetches the value is async, so while I can use the HashMap::entry API, I can't use the Entry convenience functions (like or_insert)...

Anyway, maybe I'm a bit in over my head, but this is what I've got so far:

// NOTE The `Key` struct contains a reference, with lifetime `'key`
pub struct Cache<'key>(RwLock<HashMap<Key<'key>, Value>>);

impl<'cache, 'key> Cache<'key> {
    pub fn new() -> Arc<Self> {
        Arc::new(Self(RwLock::new(HashMap::new())))
    }

    // NOTE The key and value for the cache is derived from the `Payload` type
    pub async fn fetch(
        &'cache mut self,
        data: &'cache Payload<'key>
    ) -> Result<&'cache Value> {
        let cache = self.0.read()?;
        let key = data.into();

        Ok(match &mut cache.entry(key) {
            // Match arm type: &Value
            // FIXME This reference outlives the scope, which is a borrow violation
            Entry::Occupied(entry) => entry.get(),

            // Dragons be here...
            Entry::Vacant(ref mut slot) => {
                // FIXME How do I create a write lock here, without deadlocking on
                // the read lock that's still in scope?

                let value = data.get_value().await?;

                // FIXME `VacantEntry<'_, Key<'_>, Value>` doesn't implement `Copy`
                slot.insert(value)
            }
        })
    }
}
like image 242
Xophmeister Avatar asked Oct 12 '25 12:10

Xophmeister


1 Answers

About the second part of your question (upgrade a read-lock into a write-lock), I have nothing more than dropping the read-lock before acquiring the write-lock. :-(

However, about the first part (return references to a hash-map content while this hash-map is mutated), I have a clue.

In such a situation, the referenced elements are fundamentally shared, and the ownership is not only the matter of the cache itself. For example, if we decide to clear the cache at a certain point in the process, the elements which are still referenced must not be dropped. Hence, we need a shared ownership for each value in the cache. Instead of storing a Value we need to store an Arc<Value> in order to make the value outlive the cache if needed.

Please find below an attempt to implement this for an overly simplified situation (no generic type, hardcoded example): associate a string with a vector of codes for its chars. The serial implementation relies on RefCell and Rc, while the parallel implementation relies on their synchronised equivalents RwLock and Arc. In each demo, we can see that the address of the obtained slice stays the same when the same string is used for multiple fetches (the vector is cached).

mod serial {
    use std::{cell::RefCell, collections::HashMap, rc::Rc};

    pub struct Cache {
        data: RefCell<HashMap<String, Rc<Vec<u32>>>>,
    }

    impl Cache {
        pub fn new() -> Self {
            Self {
                data: RefCell::new(HashMap::new()),
            }
        }

        pub fn fetch(
            &self,
            key: &str,
        ) -> Rc<Vec<u32>> {
            if let Some(v) = self.data.borrow().get(key) {
                return Rc::clone(v);
            }
            let k = key.to_owned();
            let v = Rc::new(key.chars().map(|c| c as u32).collect());
            self.data.borrow_mut().insert(k, Rc::clone(&v));
            v
        }
    }

    pub fn demo() {
        println!("~~~~ serial demo ~~~~");
        let cache = Cache::new();
        for key in ["abc", "def", "ghi"] {
            for it in 0..2 {
                let v = cache.fetch(key);
                println!("iter {}: {:?} @ {:?}", it, v, v.as_ptr());
            }
        }
    }
}

mod parallel {
    use std::{collections::HashMap, sync::Arc, sync::RwLock};

    pub struct Cache {
        data: RwLock<HashMap<String, Arc<Vec<u32>>>>,
    }

    impl Cache {
        pub fn new() -> Self {
            Self {
                data: RwLock::new(HashMap::new()),
            }
        }

        pub fn fetch(
            &self,
            key: &str,
        ) -> Arc<Vec<u32>> {
            if let Some(v) = self.data.read().unwrap().get(key) {
                return Arc::clone(v);
            }
            let k = key.to_owned();
            let v = Arc::new(key.chars().map(|c| c as u32).collect());
            self.data.write().unwrap().insert(k, Arc::clone(&v));
            v
        }
    }

    pub fn demo() {
        println!("~~~~ parallel demo ~~~~");
        let cache = Cache::new();
        std::thread::scope(|s| {
            for it in 0..2 {
                s.spawn({
                    let cache = &cache;
                    move || {
                        for key in ["abc", "def", "ghi"] {
                            std::thread::yield_now();
                            let v = cache.fetch(key);
                            println!(
                                "thread {}: {:?} @ {:?}",
                                it,
                                v,
                                v.as_ptr()
                            );
                        }
                    }
                });
            }
        });
    }
}

fn main() {
    serial::demo();
    parallel::demo();
}
/*
~~~~ serial demo ~~~~
iter 0: [97, 98, 99] @ 0x557c23797bc0
iter 1: [97, 98, 99] @ 0x557c23797bc0
iter 0: [100, 101, 102] @ 0x557c23797c00
iter 1: [100, 101, 102] @ 0x557c23797c00
iter 0: [103, 104, 105] @ 0x557c23797c40
iter 1: [103, 104, 105] @ 0x557c23797c40
~~~~ parallel demo ~~~~
thread 1: [97, 98, 99] @ 0x7ff378000cc0
thread 0: [97, 98, 99] @ 0x7ff378000cc0
thread 1: [100, 101, 102] @ 0x7ff378000d00
thread 0: [100, 101, 102] @ 0x7ff378000d00
thread 1: [103, 104, 105] @ 0x7ff378000d40
thread 0: [103, 104, 105] @ 0x7ff378000d40
*/
like image 165
prog-fh Avatar answered Oct 14 '25 01:10

prog-fh



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!