Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wait until a value in a map becomes available in Go

Tags:

go

mutex

wait

I have a program where I basically have three scenarios - set value for a key, get value if it's there, or wait until the value for a given key becomes available. My original idea - create a new type with a map[string]interface{} - where "persisted" values are stored. Besides that, for waiting on a value, I planned on using a map[string](chan struct{}). When a Set() method is invoked, I would write to that channel and anyone waiting on it would know the value is there.

I don't know the keys in advance - they're random. I'm not sure how to correctly implement the Wait() method.

type Map struct {
    sync.Mutex

    m    map[string]interface{}
    wait map[string]chan (struct{})
}


func (m *Map) Set(key string, value interface{}) {
    m.ensureWaitChan(key)

    m.Lock()
    defer m.Unlock()

    m.m[key] = value

    // Signal to all waiting.
    m.wait[key] <- struct{}{}
}


func (m *Map) Wait(key string) interface{} {
    m.ensureWaitChan(key)

    m.Lock()
    
    value, ok := m.m[key]
    if ok {
        m.Unlock()
        return value
    }

    m.Unlock()
    // <------ Unlocked state where something might happen.
    <-m.wait[key]

    value := m.m[key]

    return value    
}

// If the channel does not exist for those waiting - create it.
func (m *Map) ensureWaitChan(key string) {
    m.Lock()
    defer m.Unlock()

    _, ok := m.wait[key]
    if ok {
        return
    }

    m.wait[key] = make(chan struct{}, 100)
}

The problem is - there's a race condition in Wait() - after I release the mutex, and before I start listening on a channel for incoming value.

What would be the best way to handle this? Open to any other suggestions on how to implement this, I believe there must be a better way to do this. I would refrain from polling the value at fixed intervals or anything like that.

like image 915
Aerol Avatar asked Sep 03 '25 02:09

Aerol


1 Answers

What you are looking for is a mix between a synchronized map and a message broker. We can do this by leveraging the channels for communication as well as synchronization, so that subscribers can receive the messages as soon as they are published if they are not yet in the cache.

type Map struct {
    sync.Mutex

    m    map[string]any
    subs map[string][]chan any
}

func (m *Map) Set(key string, value any) {
    m.Lock()
    defer m.Unlock()

    m.m[key] = value

    // Send the new value to all waiting subscribers of the key
    for _, sub := range m.subs[key] {
        sub <- value
    }
    delete(m.subs, key)
}

func (m *Map) Wait(key string) any {
    m.Lock()
    // Unlock cannot be deferred so we can unblock Set() while waiting

    value, ok := m.m[key]
    if ok {
        m.Unlock()
        return value
    }

    // if there is no value yet, subscribe to any new values for this key
    ch := make(chan any)
    m.subs[key] = append(m.subs[key], ch)
    m.Unlock()

    return <-ch
}

Because subscribers must unlock the map mutex while they wait, they cannot safely access new messages added to the map. We send the new value directly to all subscribers over their own channel so that we don't need to add more synchronization within Set to ensure that all subscribers are satisfied before unlocking the map itself. Unlocking the map early would allow subscribers to read it directly, but would also allow new values to be inserted in the meantime causing inconsistent results.

A running version, also including a generic Map implementation with type parameters: https://go.dev/play/p/AN7VRSPdGmO

like image 161
JimB Avatar answered Sep 07 '25 00:09

JimB