golang实现无锁队列的三种方式
更新时间:2026年04月14日 10:04:00 作者:游学四方
本文主要介绍了golang实现无锁队列的三种方式,包括基于CAS操作的简单有界队列、Michael-Scott算法的无锁链表队列及Go简化指针操作方法,感兴趣的可以了解一下
1. 基于 CAS 的简单有界队列
使用固定大小的环形缓冲区,通过原子索引实现无锁。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// LockFreeQueue 基于 CAS 的有界无锁队列
type LockFreeQueue struct {
buffer []interface{}
head uint64 // 读取位置
tail uint64 // 写入位置
cap uint64
}
func NewLockFreeQueue(capacity int) *LockFreeQueue {
return &LockFreeQueue{
buffer: make([]interface{}, capacity),
cap: uint64(capacity),
}
}
// Enqueue 入队
func (q *LockFreeQueue) Enqueue(val interface{}) bool {
for {
tail := atomic.LoadUint64(&q.tail)
head := atomic.LoadUint64(&q.head)
// 队列已满
if tail-head >= q.cap {
return false
}
// 尝试 CAS 更新 tail
if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
idx := tail % q.cap
q.buffer[idx] = val
return true
}
// CAS 失败,重试
}
}
// Dequeue 出队
func (q *LockFreeQueue) Dequeue() (interface{}, bool) {
for {
head := atomic.LoadUint64(&q.head)
tail := atomic.LoadUint64(&q.tail)
// 队列为空
if head == tail {
return nil, false
}
// 尝试 CAS 更新 head
if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
idx := head % q.cap
val := q.buffer[idx]
q.buffer[idx] = nil // 帮助 GC
return val, true
}
// CAS 失败,重试
}
}
func main() {
q := NewLockFreeQueue(100)
var wg sync.WaitGroup
// 生产者
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for j := 0; j < 100; j++ {
q.Enqueue(n*100 + j)
}
}(i)
}
// 消费者
var count int64
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if val, ok := q.Dequeue(); ok {
atomic.AddInt64(&count, 1)
_ = val
} else {
// 空队列时短暂休眠避免忙等
// 实际生产可用 runtime.Gosched()
}
}
}()
}
wg.Wait()
fmt.Printf("Processed: %d\n", atomic.LoadInt64(&count))
}2. 无界链表队列(Michael-Scott 算法)
经典的无锁队列算法,使用链表实现,支持动态扩容。
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// node 链表节点
type node struct {
value interface{}
next unsafe.Pointer // *node
}
// LockFreeListQueue 基于 Michael-Scott 算法的无锁队列
type LockFreeListQueue struct {
head unsafe.Pointer // *node
tail unsafe.Pointer // *node
}
func NewLockFreeListQueue() *LockFreeListQueue {
n := unsafe.Pointer(&node{})
return &LockFreeListQueue{
head: n,
tail: n,
}
}
// Enqueue 入队
func (q *LockFreeListQueue) Enqueue(val interface{}) {
newNode := &node{value: val}
newNodePtr := unsafe.Pointer(newNode)
for {
tail := (*node)(atomic.LoadPointer(&q.tail))
next := (*node)(atomic.LoadPointer(&tail.next))
// 再次检查 tail 是否变化
if tail != (*node)(atomic.LoadPointer(&q.tail)) {
continue
}
if next == nil {
// 尝试将新节点链接到尾部
if atomic.CompareAndSwapPointer(&tail.next, unsafe.Pointer(nil), newNodePtr) {
// 尝试更新 tail 指针
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), newNodePtr)
return
}
} else {
// 帮助推进 tail 指针
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
}
}
}
// Dequeue 出队
func (q *LockFreeListQueue) Dequeue() (interface{}, bool) {
for {
head := (*node)(atomic.LoadPointer(&q.head))
tail := (*node)(atomic.LoadPointer(&q.tail))
next := (*node)(atomic.LoadPointer(&head.next))
if head != (*node)(atomic.LoadPointer(&q.head)) {
continue
}
if head == tail {
if next == nil {
return nil, false // 空队列
}
// 帮助推进 tail
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
} else {
val := next.value
if atomic.CompareAndSwapPointer(&q.head, unsafe.Pointer(head), unsafe.Pointer(next)) {
return val, true
}
}
}
}
func main() {
q := NewLockFreeListQueue()
var wg sync.WaitGroup
var count int64
// 生产者
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
q.Enqueue(n*1000 + j)
}
}(i)
}
// 消费者
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if val, ok := q.Dequeue(); ok {
atomic.AddInt64(&count, 1)
_ = val
}
}
}()
}
wg.Wait()
fmt.Printf("Total dequeued: %d\n", atomic.LoadInt64(&count))
}3、使用 sync/atomic 的简化版本(Go 1.19+)
Go 1.19 引入了 atomic.Pointer,可以简化指针操作:
locklessqueue.go
//locklessqueue.go
package lockless
import (
"sync/atomic"
)
type LockFreeQueue struct {
buf []interface{}
len int32
head int32
tail int32
}
func NewQueue(n int32) *LockFreeQueue {
q := &LockFreeQueue{buf: make([]interface{}, n+1, n+1), len: n + 1}
return q
}
func (s *LockFreeQueue) PushBack(v interface{}) {
for {
tail := atomic.LoadInt32(&s.tail)
n := (tail + 1) % s.len
if atomic.CompareAndSwapInt32(&s.head, n, n) {
continue // 队列满了
}
if !atomic.CompareAndSwapInt32(&s.tail, tail, n) {
continue // 获取失败
}
s.buf[tail] = v
break
}
}
func (s *LockFreeQueue) PopFront() interface{} {
for {
tail := atomic.LoadInt32(&s.tail)
head := atomic.LoadInt32(&s.head)
if tail == head {
continue
}
n := (head + 1) % s.len
if !atomic.CompareAndSwapInt32(&s.head, head, n) {
continue
}
return s.buf[head]
}
}测试代码
locklessqueue_test.go
//locklessqueue_test.go
package lockless
import (
"sync"
"testing"
)
func TestName(t *testing.T) {
lq := NewQueue(10)
w := sync.WaitGroup{}
for i := 0; i < 100; i++ {
w.Add(1)
go func(gi int) {
lq.PushBack(gi)
w.Done()
}(i)
}
go func() {
for {
lq.PopFront()
// time.Sleep(1 * time.Second)
}
}()
w.Wait()
}
var ch = make(chan interface{}, 50000)
func BenchmarkGo_Chan(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ch <- 123
go func() {
<-ch
}()
}
}
func BenchmarkGo_LockFree(b *testing.B) {
lq := NewQueue(1000000000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
lq.PushBack(123)
go func() {
lq.PopFront()
}()
}
}执行命令
PS D:\golang\src\Test\Test> go test -v locklessqueue_test.go locklessqueue.go === RUN TestName --- PASS: TestName (0.00s) PASS ok command-line-arguments 0.173s
性能测试
D:\golang\src\Test\Test> go test -v -bench="." locklessqueue_test.go locklessqueue.go === RUN TestName --- PASS: TestName (0.00s) goos: windows goarch: amd64 cpu: Intel(R) Core(TM) i5-9500F CPU @ 3.00GHz BenchmarkGo_Chan BenchmarkGo_Chan-6 2957491 414.7 ns/op BenchmarkGo_LockFree BenchmarkGo_LockFree-6 4356723 272.4 ns/op PASS ok command-line-arguments 85.410s
到此这篇关于golang实现无锁队列的文章就介绍到这了,更多相关golang 无锁队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
golang结构化日志log/slog包之slog.Record的用法简介
这篇文章主要为大家详细介绍了golang结构化日志log/slog包中slog.Record结构体的使用方法和需要注意的点,文中的示例代码讲解详细,需要的可以学习一下2023-10-10


最新评论