Redis Subscribe timeout 报错的问题解决

 更新时间:2025年08月15日 11:04:59   作者:一乐小哥  
最近系统偶尔报出org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)的错误,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

🐛 介绍

Redisson版本 2.8.2

最近公司系统偶尔报出org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)的错误,观察堆栈信息看到报错是一段使用Redisson的redis锁的地方,去除业务逻辑代码基本如下

public void mockLock(String phoneNum) {
log.info("{} - prepare lock", threadName);
RLock lock = redissonClient.getLock("redis_cache_test" + phoneNum);
try {
    lock.lock();
    log.info("{} - get lock", threadName);
    //睡眠10s
    Thread.sleep(10000);
} catch (Exception e) {
    log.info("{} - exception", threadName,e);
} finally {
    log.info("{} - unlock lock", threadName);
    lock.unlock();
}

导致报错的代码是lock.lock()的实现

@Override
public void syncSubscription(RFuture<?> future) {
    MasterSlaveServersConfig config = connectionManager.getConfig();
    try {
        int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts();
        if (!future.await(timeout)) {
            throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)");
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    future.syncUninterruptibly();
}

溯因

syncSubscription中的futureRedissonLock.subscribe(long threadId)方法

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}

这里可以看出大概是在PUBSUB中获取一个订阅,再往下看源码

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
    //监听持有
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    //获取锁订阅队列
    final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
    //订阅拒绝实现
    final RPromise<E> newPromise = new PromiseDelegator<E>(connectionManager.<E>newPromise()) {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
        //判断是否已经存在相同的entry
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            //没有则新建
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            //监听对应的entry
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            //订阅事件
            connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
        }
    };
    //用semaphore管理监听队列,因为可能存在多个线程等待一个锁
    semaphore.acquire(listener);
    //保证订阅拒绝逻辑
    listenerHolder.set(listener);
    
    return newPromise;
}

这里可以看到这个方法其实只是定义了一个名叫listener的Runnable, semaphore.acquire(listener);则保证了同一个channel仅会有一个线程去监听,其他的继续等待,而订阅逻辑还在connectionManager.subscribe里面

private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, 
        final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
    final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
    if (connEntry != null) {
        connEntry.addListener(channelName, listener);
        connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                lock.release();
                promise.trySuccess(connEntry);
            }
        });
        return;
    }

    freePubSubLock.acquire(new Runnable() {

        @Override
        public void run() {
            if (promise.isDone()) {
                return;
            }
            //如果没有获取到公共的连接直接返回
            final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
            if (freeEntry == null) {
                connect(codec, channelName, listener, promise, type, lock);
                return;
            }
            //entry有个计数器subscriptionsPerConnection
            如果为-1报错因为下面有0的判断
            int remainFreeAmount = freeEntry.tryAcquire();
            if (remainFreeAmount == -1) {
                throw new IllegalStateException();
            }
            
            final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
            if (oldEntry != null) {
                freeEntry.release();
                freePubSubLock.release();
                
                oldEntry.addListener(channelName, listener);
                oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                    @Override
                    public void operationComplete(Future<Void> future) throws Exception {
                        lock.release();
                        promise.trySuccess(oldEntry);
                    }
                });
                return;
            }
            //subscriptionsPerConnection为0时从公共连接池中吐出
            if (remainFreeAmount == 0) {
                freePubSubConnections.poll();
            }
            freePubSubLock.release();
            
            freeEntry.addListener(channelName, listener);
            freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    lock.release();
                    promise.trySuccess(freeEntry);
                }
            });
            
            if (PubSubType.PSUBSCRIBE == type) {
                freeEntry.psubscribe(codec, channelName);
            } else {
                freeEntry.subscribe(codec, channelName);
            }
        }
        
    });
}

这里在没有连接的情况下会进到connect(codec, channelName, listener, promise, type, lock);中去

private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
        final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
    final int slot = calcSlot(channelName);
    //根据subscriptionConnectionPoolSize获取下一个链接
    RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
    connFuture.addListener(new FutureListener<RedisPubSubConnection>() {

        @Override
        public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
            if (!future.isSuccess()) {
                freePubSubLock.release();
                lock.release();
                promise.tryFailure(future.cause());
                return;
            }

            RedisPubSubConnection conn = future.getNow();
            
            final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
            entry.tryAcquire();
            
            final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
            if (oldEntry != null) {
                releaseSubscribeConnection(slot, entry);
                
                freePubSubLock.release();
                
                oldEntry.addListener(channelName, listener);
                oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                    @Override
                    public void operationComplete(Future<Void> future) throws Exception {
                        lock.release();
                        promise.trySuccess(oldEntry);
                    }
                });
                return;
            }
            
            freePubSubConnections.add(entry);
            freePubSubLock.release();
            
            entry.addListener(channelName, listener);
            entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    lock.release();
                    promise.trySuccess(entry);
                }
            });
            
            if (PubSubType.PSUBSCRIBE == type) {
                entry.psubscribe(codec, channelName);
            } else {
                entry.subscribe(codec, channelName);
            }
            
        }
    });
}

这里的RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);最终会调用ClientConnectionsEntry#acquireSubscribeConnection方法的
freeSubscribeConnectionsCounter.acquire(runnable) 至此我们找到原因 当同时等待锁订阅消息达到subscriptionConnectionPoolSize*subscriptionsPerConnection个时,再多一个订阅消息,连接一直无法获取导致MasterSlaveConnectionManager中的freePubSubLock没有释放。 另外由于在超时场景下MasterSlaveConnectionManager向连接池获取连接后是直接缓存下来,不把分发订阅链接释返回给连接池的,因此导致freeSubscribeConnectionsCounter一直等待,出现死锁情况。

最终表现就是org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)

复现

Redis配置

public RedissonClient redissonClient(RedisConfig redisConfig) {

    Config config = new Config();
    config.useSingleServer()
        .setAddress(redisConfig.getHost() + ":" + redisConfig.getPort())
        .setPassword(redisConfig.getPassword())
        .setDatabase(redisConfig.getDatabase())
        .setConnectTimeout(redisConfig.getConnectionTimeout())
        .setTimeout(redisConfig.getTimeout())
        //把两个配置项设置为1
        .setSubscriptionConnectionPoolSize(1)
        .setSubscriptionsPerConnection(1);
    return Redisson.create(config);
}

测试方法

void contextLoads() throws InterruptedException {
    Runnable runnable = () -> {
        redissonLock.tryRedissonLock();
    };
    new Thread(runnable, "线程1").start();
    new Thread(runnable, "线程12").start();
    new Thread(runnable, "线程23").start();
    new Thread(runnable, "线程21").start();
    
    Thread.sleep(200000);
}

结果

org.redisson.client.RedisTimeoutException: Subscribe timeout: (5500ms)
	at org.redisson.command.CommandAsyncService.syncSubscription(CommandAsyncService.java:126) ~[redisson-2.8.2.jar:na]
	at org.redisson.RedissonLock.lockInterruptibly(RedissonLock.java:121) ~[redisson-2.8.2.jar:na]
	at org.redisson.RedissonLock.lockInterruptibly(RedissonLock.java:108) ~[redisson-2.8.2.jar:na]
	at org.redisson.RedissonLock.lock(RedissonLock.java:90) ~[redisson-2.8.2.jar:na]
	at com.rick.redislock.lock.RedissonLock.registerPersonalMember(RedissonLock.java:30) ~[classes/:na]
	at com.rick.redislock.RedisLockApplicationTests.lambda$contextLoads$0(RedisLockApplicationTests.java:15) [test-classes/:na]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]

符合预期

到此这篇关于Redis Subscribe timeout 报错的问题解决的文章就介绍到这了,更多相关Redis Subscribe timeout 报错内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Redis凭啥可以这么快

    Redis凭啥可以这么快

    本文详细的介绍了为啥使用Redis的时候,可以做到非常快的读取速度,对于大家学习Redis非常有帮助,希望大家喜欢
    2021-02-02
  • Redis 脚本和连接命令示例详解

    Redis 脚本和连接命令示例详解

    Redis脚本是一种可以实现复杂任务的脚本语言,可以用来快速履行复杂任务,灵活处理数据管理和管理复杂的利用场景,这篇文章主要介绍了Redis 脚本和连接命令,需要的朋友可以参考下
    2023-09-09
  • redis快速部署为docker容器的方法实现

    redis快速部署为docker容器的方法实现

    部署 Redis 作为 Docker 容器是一种快速、灵活且可重复使用的方式,特别适合开发、测试和部署环境,本文主要介绍了redis快速部署为docker容器的方法实现,具有一定的参考价值,感兴趣的可以了解一下
    2024-05-05
  • Redis的数据类型和内部编码详解

    Redis的数据类型和内部编码详解

    Redis是通过Key-Value的形式来组织数据的,而Key的类型都是String,而Value的类型可以有很多,在Redis中最通用的数据类型大致有这几种:String、List、Set、Hash、Sorted Set,下面通过本文介绍Redis数据类型和内部编码,感兴趣的朋友一起看看吧
    2024-04-04
  • 详解Redis SCAN命令实现有限保证的原理

    详解Redis SCAN命令实现有限保证的原理

    这篇文章主要介绍了Redis SCAN命令实现有限保证的原理,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值 ,需要的朋友可以参考下
    2019-07-07
  • Redis集群扩容的实现示例

    Redis集群扩容的实现示例

    本文介绍了在虚拟机上新建Redis集群,并将新增节点加入现有集群,通过配置文件和`redis-cli`命令,成功实现了Redis集群的扩容,感兴趣的可以了解一下
    2025-02-02
  • Windows安装Redis的几种方式与测试流程总结

    Windows安装Redis的几种方式与测试流程总结

    本文系统梳理了在 Windows 系统上安装和使用 Redis 的多种方式,涵盖通过端口号识别运行中的 Redis 实例、进程定位方法,并提供了 Java 环境下的连接与测试示例,同时还介绍了常见的图形化管理工具,便于可视化管理与调试,需要的朋友可以参考下
    2025-05-05
  • Redis的五种基本类型和业务场景和使用方式

    Redis的五种基本类型和业务场景和使用方式

    Redis是一种高性能的键值存储数据库,支持多种数据结构如字符串、列表、集合、哈希表和有序集合等,它提供丰富的API和持久化功能,适用于缓存、消息队列、排行榜等多种场景,Redis能够实现高速读写操作,尤其适合需要快速响应的应用
    2024-10-10
  • 从MySQL到Redis的简单数据库迁移方法

    从MySQL到Redis的简单数据库迁移方法

    这篇文章主要介绍了从MySQL到Redis的简单数据库迁移方法,注意Redis数据库基于内存,并不能代替传统数据库,需要的朋友可以参考下
    2015-06-06
  • Redis报错UnrecognizedPropertyException: Unrecognized field问题

    Redis报错UnrecognizedPropertyException: Unrecognized 

    在使用SpringBoot访问Redis时,报错提示识别不了属性headPart,经过排查,发现并非Serializable或getset方法问题,而是存在一个方法getHeadPart,但无headPart属性,解决方案是将getHeadPart改为makeHeadPart
    2024-10-10

最新评论