关于redigo中PubSub的一点小坑分析

 更新时间:2019年01月03日 10:41:02   作者:Chen Jiehua  
这篇文章主要给大家介绍了关于redigo中PubSub的一点小坑的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

前言

最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库。然后在使用 Pub/Sub 的时候,却发现了一个小坑……

Redis Client

首先,我们来初始化一个带连接池的 Redis Client:

import (
	"github.com/gomodule/redigo/redis"
)

type RedisClient struct {
	pool *redis.Pool
}

func NewRedisClient(addr string, db int, passwd string) *RedisClient {
	pool := &redis.Pool{
		MaxIdle:  10,
		IdleTimeout: 300 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
			if err != nil {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t) < time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	log.Printf("new redis pool at %s", addr)
	client := &RedisClient{
		pool: pool,
	}
	return client
}

Publish

然后我们可以简单的实现一个 publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {
	c := r.pool.Get()
	defer c.Close()
	n, err := redis.Int(c.Do("PUBLISH", channel, message))
	if err != nil {
		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
	}
	return n, nil
}

Subscribe

接下来就是一个稍微复杂点的带有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

最后,我们写一个简单地 main 函数来调用 publish & subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}


咋一看之下,好像并没有什么异常?然而,如果我们这时候去看 redis 的 tcp 连接,就可以发现一些猫腻:

$sudo netstat -antp | grep redis
tcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一个连接,而 connection pool 似乎没有什么作用。

更进一步地调试,我们发现在 defer psc.Close() 的时候就卡住了,也就是上面的 10 个 goroutine 其实并没有正常退出。

Concurrent

排查许久之后,终于定位到了问题!引用 redigo 的说明

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是说,虽然一个连接可以在不同的 goroutine 并发调用 Receive() 和 Subscribe()(subscribe调用了send和flush) ,但是却不能再有其他并发操作(比如 Close())。

其他相似的问题还可以参考 issue

Fix

知道了上面的原因之后,我们稍微修改一下 defer psc.Close() 的位置即可解决问题:

	// start a new goroutine to receive message
	go func() {
		// IMPORTANT!
		defer psc.Close()
		for {
			switch msg := psc.Receive().(type) {
			case error:

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

相关文章

  • Redis高并发超卖问题解决方案图文详解

    Redis高并发超卖问题解决方案图文详解

    Redis是一种基于内存的数据存储系统,被广泛用于解决高并发问题,下面这篇文章主要给大家介绍了关于Redis高并发超卖问题解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-02-02
  • Redis开启键空间通知实现超时通知的步骤详解

    Redis开启键空间通知实现超时通知的步骤详解

    这篇文章主要介绍了Redis开启键空间通知实现超时通知的步骤,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-06-06
  • Redis常用的数据结构及实际应用场景

    Redis常用的数据结构及实际应用场景

    本文介绍了Redis中常用的数据结构,包括字符串、列表、集合、哈希表、有序集合和Bitmap,并详细说明了它们在各种场景下的使用,需要的朋友可以参考下
    2024-05-05
  • Redis 数据类型Streams详解

    Redis 数据类型Streams详解

    Redis Streams是Redis 5.0新增的数据类型,提供了一种日志结构化数据存储方式,这种类型适合用于构建消息队列、事件日志和处理时间序列数据的应用,本文介绍Redis 数据类型Streams相关知识,感兴趣的朋友一起看看吧
    2024-10-10
  • Redis处理高并发之布隆过滤器详解

    Redis处理高并发之布隆过滤器详解

    这篇文章主要为大家介绍了Redis处理高并发之布隆过滤器详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • Redis远程连接Redis客户端的实现步骤

    Redis远程连接Redis客户端的实现步骤

    本文主要介绍了Redis远程连接Redis客户端的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • 关于Redis中bitmap的原理和使用详解

    关于Redis中bitmap的原理和使用详解

    这篇文章主要介绍了关于Redis中bitmap的原理和使用详解,BitMap即位图,使用每个位表示某种状态,适合处理整型的海量数据,本质上是哈希表的一种应用实现,需要的朋友可以参考下
    2023-05-05
  • 浅谈Redis哨兵模式的使用

    浅谈Redis哨兵模式的使用

    这篇文章主要介绍了浅谈Redis哨兵模式的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • 使用Redis实现分布式锁的方法

    使用Redis实现分布式锁的方法

    为了保证我们线上服务的并发性和安全性,目前我们的服务一般抛弃了单体应用,采用的都是扩展性很强的分布式架构,这篇文章主要介绍了使用Redis实现分布式锁的方法,需要的朋友可以参考下
    2022-06-06
  • redis热key问题怎样解决

    redis热key问题怎样解决

    这篇文章主要介绍了redis热key问题怎样解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04

最新评论