diff --git a/go.mod b/go.mod index 6f547da90..d2582a4ca 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( codeberg.org/gruf/go-iotools v0.0.0-20230811115124-5d4223615a7f codeberg.org/gruf/go-kv v1.6.4 codeberg.org/gruf/go-logger/v2 v2.2.1 - codeberg.org/gruf/go-mutexes v1.3.0 + codeberg.org/gruf/go-mutexes v1.3.1 codeberg.org/gruf/go-runners v1.6.1 codeberg.org/gruf/go-sched v1.2.3 codeberg.org/gruf/go-store/v2 v2.2.4 diff --git a/go.sum b/go.sum index 7ce3d72b3..35adafbb1 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,8 @@ codeberg.org/gruf/go-mangler v1.2.3 h1:sj0dey2lF5GRQL7fXmCY0wPNaI5JrROiThb0VDbzF codeberg.org/gruf/go-mangler v1.2.3/go.mod h1:X/7URkFhLBAVKkTxmqF11Oxw3A6pSSxgPeHssQaiq28= codeberg.org/gruf/go-maps v1.0.3 h1:VDwhnnaVNUIy5O93CvkcE2IZXnMB1+IJjzfop9V12es= codeberg.org/gruf/go-maps v1.0.3/go.mod h1:D5LNDxlC9rsDuVQVM6JObaVGAdHB6g2dTdOdkh1aXWA= -codeberg.org/gruf/go-mutexes v1.3.0 h1:EJXLL1UCit/ZJtTZ/Q9MMFO5c8iCwS4bIesXu1CKGpQ= -codeberg.org/gruf/go-mutexes v1.3.0/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8= +codeberg.org/gruf/go-mutexes v1.3.1 h1:8ibAjWwx08GJSq5R+lM9nwtJw2aAhMPKSXbfJ9EpDsA= +codeberg.org/gruf/go-mutexes v1.3.1/go.mod h1:1j/6/MBeBQUedAtAtysLLnBKogfOZAxdym0E3wlaBD8= codeberg.org/gruf/go-runners v1.6.1 h1:0KNiEfGnmNUs9intqxEAWqIKUyxVOmYTtn3kPVOHsjQ= codeberg.org/gruf/go-runners v1.6.1/go.mod h1:QRcSExqXX8DM0rm8Xs6qX7baOzyvw0JIe4mu3TsQT+Y= codeberg.org/gruf/go-sched v1.2.3 h1:H5ViDxxzOBR3uIyGBCf0eH8b1L8wMybOXcdtUUTXZHk= diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index 511130d4f..8a73a96d9 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -2,6 +2,7 @@ import ( "sync" + "sync/atomic" "unsafe" ) @@ -24,11 +25,11 @@ // and performs self-eviction of keys. // // Under the hood this is achieved using a single mutex for the -// map, state tracking for individual keys, and some simple waitgroup -// type structures to park / block goroutines waiting for keys. +// map, state tracking for individual keys, and some sync.Cond{} +// like structures for sleeping / awaking awaiting goroutines. type MutexMap struct { mapmu sync.Mutex - mumap map[string]*rwmutexish + mumap map[string]*rwmutex mupool rwmutexPool count uint32 } @@ -36,7 +37,7 @@ type MutexMap struct { // checkInit ensures MutexMap is initialized (UNSAFE). func (mm *MutexMap) checkInit() { if mm.mumap == nil { - mm.mumap = make(map[string]*rwmutexish) + mm.mumap = make(map[string]*rwmutex) } } @@ -82,7 +83,7 @@ func (mm *MutexMap) lock(key string, lt uint8) func() { } } -func (mm *MutexMap) unlock(key string, mu *rwmutexish) { +func (mm *MutexMap) unlock(key string, mu *rwmutex) { // Get map lock. mm.mapmu.Lock() @@ -109,12 +110,12 @@ func (mm *MutexMap) unlock(key string, mu *rwmutexish) { // rwmutexPool is a very simply memory rwmutexPool. type rwmutexPool struct { - current []*rwmutexish - victim []*rwmutexish + current []*rwmutex + victim []*rwmutex } // Acquire will returns a rwmutexState from rwmutexPool (or alloc new). -func (p *rwmutexPool) Acquire() *rwmutexish { +func (p *rwmutexPool) Acquire() *rwmutex { // First try the current queue if l := len(p.current) - 1; l >= 0 { mu := p.current[l] @@ -130,12 +131,12 @@ func (p *rwmutexPool) Acquire() *rwmutexish { } // Lastly, alloc new. - mu := new(rwmutexish) + mu := new(rwmutex) return mu } // Release places a sync.rwmutexState back in the rwmutexPool. -func (p *rwmutexPool) Release(mu *rwmutexish) { +func (p *rwmutexPool) Release(mu *rwmutex) { p.current = append(p.current, mu) } @@ -146,20 +147,28 @@ func (p *rwmutexPool) GC() { p.victim = current } -// rwmutexish is a RW mutex (ish), i.e. the representation -// of one only to be accessed within -type rwmutexish struct { - tr trigger - ln int32 // no. locks - wn int32 // no. waiters - lt uint8 // lock type +// rwmutex represents a RW mutex when used correctly within +// a MapMutex. It should ONLY be access when protected by +// the outer map lock, except for the 'notifyList' which is +// a runtime internal structure borrowed from the sync.Cond{}. +// +// this functions very similarly to a sync.Cond{}, but with +// lock state tracking, and returning on 'Broadcast()' whether +// any goroutines were actually awoken. it also has a less +// confusing API than sync.Cond{} with the outer locking +// mechanism we use, otherwise all Cond{}.L would reference +// the same outer map mutex. +type rwmutex struct { + n notifyList // 'trigger' mechanism + l int32 // no. locks + t uint8 // lock type } // Lock will lock the mutex for given lock type, in the // sense that it will update the internal state tracker // accordingly. Return value is true on successful lock. -func (mu *rwmutexish) Lock(lt uint8) bool { - switch mu.lt { +func (mu *rwmutex) Lock(lt uint8) bool { + switch mu.t { case lockTypeRead: // already read locked, // only permit more reads. @@ -173,77 +182,83 @@ func (mu *rwmutexish) Lock(lt uint8) bool { return false default: - // Fully unlocked. - mu.lt = lt + // Fully unlocked, + // set incoming type. + mu.t = lt } // Update // count. - mu.ln++ + mu.l++ return true } -// Unlock will unlock the mutex, in the sense that -// it will update the internal state tracker accordingly. -// On any unlock it will awaken sleeping waiting threads. -// Returned boolean is if unlocked=true AND waiters=0. -func (mu *rwmutexish) Unlock() bool { - var ok bool - - switch mu.ln--; { - case mu.ln > 0 && mu.lt == lockTypeWrite: +// Unlock will unlock the mutex, in the sense that it +// will update the internal state tracker accordingly. +// On totally unlocked state, it will awaken all +// sleeping goroutines waiting on this mutex. +func (mu *rwmutex) Unlock() bool { + switch mu.l--; { + case mu.l > 0 && mu.t == lockTypeWrite: panic("BUG: multiple writer locks") - case mu.ln < 0: + case mu.l < 0: panic("BUG: negative lock count") - case mu.ln == 0: + + case mu.l == 0: // Fully unlocked. - mu.lt = 0 + mu.t = 0 - // Only return true - // with no waiters. - ok = (mu.wn == 0) + // Awake all blocked goroutines and check + // for change in the last notified ticket. + before := atomic.LoadUint32(&mu.n.notify) + runtime_notifyListNotifyAll(&mu.n) + after := atomic.LoadUint32(&mu.n.notify) + + // If ticket changed, this indicates + // AT LEAST one goroutine was awoken. + // + // (before != after) => (waiters > 0) + // (before == after) => (waiters = 0) + return (before == after) + + default: + // i.e. mutex still + // locked by others. + return false } - - // Awake all waiting - // goroutines for mu. - mu.tr.Trigger() - return ok } -// WaitRelock expects a mutex to be passed in already in -// the lock state. It incr the rwmutexish waiter count before -// unlocking the outer mutex and blocking on internal trigger. -// On awake it will relock outer mutex and decr wait count. -func (mu *rwmutexish) WaitRelock(outer *sync.Mutex) { - mu.wn++ +// WaitRelock expects a mutex to be passed in, already in the +// locked state. It incr the notifyList waiter count before +// unlocking the outer mutex and blocking on notifyList wait. +// On awake it will decr wait count and relock outer mutex. +func (mu *rwmutex) WaitRelock(outer *sync.Mutex) { + + // add ourselves to list while still + // under protection of outer map lock. + t := runtime_notifyListAdd(&mu.n) + + // Finished with + // outer map lock. outer.Unlock() - mu.tr.Wait() + + // Block until awoken by another + // goroutine within mu.Unlock(). + runtime_notifyListWait(&mu.n, t) + + // Relock! outer.Lock() - mu.wn-- } -// trigger uses the internals of sync.Cond to provide -// a waitgroup type structure (including goroutine parks) -// without such a heavy reliance on a delta value. -type trigger struct{ notifyList } - -func (t *trigger) Trigger() { - runtime_notifyListNotifyAll(&t.notifyList) -} - -func (t *trigger) Wait() { - v := runtime_notifyListAdd(&t.notifyList) - runtime_notifyListWait(&t.notifyList, v) -} - -// Approximation of notifyList in runtime/sema.go. +// unused fields left +// un-named for safety. type notifyList struct { - wait uint32 - notify uint32 - lock uintptr // key field of the mutex - head unsafe.Pointer - tail unsafe.Pointer + _ uint32 // wait uint32 + notify uint32 // notify uint32 + _ uintptr // lock mutex + _ unsafe.Pointer // head *sudog + _ unsafe.Pointer // tail *sudog } // See runtime/sema.go for documentation. @@ -260,3 +275,12 @@ func runtime_notifyListWait(l *notifyList, t uint32) // //go:linkname runtime_notifyListNotifyAll sync.runtime_notifyListNotifyAll func runtime_notifyListNotifyAll(l *notifyList) + +// Ensure that sync and runtime agree on size of notifyList. +// +//go:linkname runtime_notifyListCheck sync.runtime_notifyListCheck +func runtime_notifyListCheck(size uintptr) +func init() { + var n notifyList + runtime_notifyListCheck(unsafe.Sizeof(n)) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f8f808556..b0ccbe757 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -47,7 +47,7 @@ codeberg.org/gruf/go-mangler # codeberg.org/gruf/go-maps v1.0.3 ## explicit; go 1.19 codeberg.org/gruf/go-maps -# codeberg.org/gruf/go-mutexes v1.3.0 +# codeberg.org/gruf/go-mutexes v1.3.1 ## explicit; go 1.14 codeberg.org/gruf/go-mutexes # codeberg.org/gruf/go-runners v1.6.1