mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-02-05 07:37:02 +01:00
93 lines
2.4 KiB
Go
93 lines
2.4 KiB
Go
|
package xsync
|
||
|
|
||
|
import (
|
||
|
"sync/atomic"
|
||
|
)
|
||
|
|
||
|
// A SPSCQueue is a bounded single-producer single-consumer concurrent
|
||
|
// queue. This means that not more than a single goroutine must be
|
||
|
// publishing items to the queue while not more than a single goroutine
|
||
|
// must be consuming those items.
|
||
|
//
|
||
|
// SPSCQueue instances must be created with NewSPSCQueue function.
|
||
|
// A SPSCQueue must not be copied after first use.
|
||
|
//
|
||
|
// Based on the data structure from the following article:
|
||
|
// https://rigtorp.se/ringbuffer/
|
||
|
type SPSCQueue struct {
|
||
|
cap uint64
|
||
|
pidx uint64
|
||
|
//lint:ignore U1000 prevents false sharing
|
||
|
pad0 [cacheLineSize - 8]byte
|
||
|
pcachedIdx uint64
|
||
|
//lint:ignore U1000 prevents false sharing
|
||
|
pad1 [cacheLineSize - 8]byte
|
||
|
cidx uint64
|
||
|
//lint:ignore U1000 prevents false sharing
|
||
|
pad2 [cacheLineSize - 8]byte
|
||
|
ccachedIdx uint64
|
||
|
//lint:ignore U1000 prevents false sharing
|
||
|
pad3 [cacheLineSize - 8]byte
|
||
|
items []interface{}
|
||
|
}
|
||
|
|
||
|
// NewSPSCQueue creates a new SPSCQueue instance with the given
|
||
|
// capacity.
|
||
|
func NewSPSCQueue(capacity int) *SPSCQueue {
|
||
|
if capacity < 1 {
|
||
|
panic("capacity must be positive number")
|
||
|
}
|
||
|
return &SPSCQueue{
|
||
|
cap: uint64(capacity + 1),
|
||
|
items: make([]interface{}, capacity+1),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// TryEnqueue inserts the given item into the queue. Does not block
|
||
|
// and returns immediately. The result indicates that the queue isn't
|
||
|
// full and the item was inserted.
|
||
|
func (q *SPSCQueue) TryEnqueue(item interface{}) bool {
|
||
|
// relaxed memory order would be enough here
|
||
|
idx := atomic.LoadUint64(&q.pidx)
|
||
|
nextIdx := idx + 1
|
||
|
if nextIdx == q.cap {
|
||
|
nextIdx = 0
|
||
|
}
|
||
|
cachedIdx := q.ccachedIdx
|
||
|
if nextIdx == cachedIdx {
|
||
|
cachedIdx = atomic.LoadUint64(&q.cidx)
|
||
|
q.ccachedIdx = cachedIdx
|
||
|
if nextIdx == cachedIdx {
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
q.items[idx] = item
|
||
|
atomic.StoreUint64(&q.pidx, nextIdx)
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// TryDequeue retrieves and removes the item from the head of the
|
||
|
// queue. Does not block and returns immediately. The ok result
|
||
|
// indicates that the queue isn't empty and an item was retrieved.
|
||
|
func (q *SPSCQueue) TryDequeue() (item interface{}, ok bool) {
|
||
|
// relaxed memory order would be enough here
|
||
|
idx := atomic.LoadUint64(&q.cidx)
|
||
|
cachedIdx := q.pcachedIdx
|
||
|
if idx == cachedIdx {
|
||
|
cachedIdx = atomic.LoadUint64(&q.pidx)
|
||
|
q.pcachedIdx = cachedIdx
|
||
|
if idx == cachedIdx {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
item = q.items[idx]
|
||
|
q.items[idx] = nil
|
||
|
ok = true
|
||
|
nextIdx := idx + 1
|
||
|
if nextIdx == q.cap {
|
||
|
nextIdx = 0
|
||
|
}
|
||
|
atomic.StoreUint64(&q.cidx, nextIdx)
|
||
|
return
|
||
|
}
|