springboot基于Redis发布订阅集群下WebSocket的解决方案

 更新时间:2021年01月28日 11:15:20   作者:毅大师  
这篇文章主要介绍了springboot基于Redis发布订阅集群下WebSocket的解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一、背景

单机节点下,WebSocket连接成功后,可以直接发送消息。而多节点下,连接时通过nginx会代理到不同节点。

假设一开始用户连接了node1的socket服务。触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点。这就导致了消息发送失败。

为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送。

二、解决方案(springboot 基于 Redis发布订阅)

1、依赖

<!-- redis -->    
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websocket --> 
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、创建业务处理类 Demo.class,该类可以实现MessageListener接口后重写onMessage方法,也可以不实现,自己写方法。

import com.alibaba.fastjson.JSON;
import com.dy.service.impl.OrdersServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
 
/**
 * @program: 
 * @description: redis消息订阅-业务处理
 * @author: zhang yi
 * @create: 2021-01-25 16:46
 */
@Component
public class Demo implements MessageListener {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  @Override
  public void onMessage(Message message, byte[] pattern) {
    logger.info("消息订阅成功---------");
    logger.info("内容:"+message.getBody());
    logger.info("交换机:"+message.getChannel());
  }
}

3、创建PubSubConfig配置类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 
/**
 * @program: 
 * @description: redis发布订阅配置
 * @author: zhang yi
 * @create: 2021-01-25 16:49
 */
@Configuration
@EnableCaching
public class PubSubConfig {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  //如果是多个交换机,则参数为(RedisConnectionFactory connectionFactory,
  //              MessageListenerAdapter listenerAdapter,
  //              MessageListenerAdapter listenerAdapter2)
  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                      MessageListenerAdapter listenerAdapter) {
 
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    // 可以添加多个 messageListener,配置不同的交换机
    container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));
    //container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));
    return container;
  }
 
  /**
   * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
   * @param demo 第一步的业务处理类
   * @return
   */
  @Bean
  MessageListenerAdapter listenerAdapter(Demo demo) {
    logger.info("----------------消息监听器加载成功----------------");
    // onMessage 就是方法名,基于反射调用
    return new MessageListenerAdapter(demo, "onMessage");
  }
 
  /**
   * 多个交换机就多写一个
   * @param subCheckOrder
   * @return
   */
  //@Bean
  //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {
  //  logger.info("----------------消息监听器加载成功----------------");
  //  return new MessageListenerAdapter(subCheckOrder, "onMessage");
  //}
 
  @Bean
  StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }
}

4、消息发布

@Autowired
private RedisTemplate<String, Object> redisTemplate;
 
redisTemplate.convertAndSend("channel:demo", "我是内容");

三、具体用法

  • socket连接成功。
  • socket消息推送时,把信息发布到redis中。socket服务订阅redis的消息,订阅成功后进行推送。集群下的socket都能订阅到消息,但是只有之前连接成功的节点能推送成功,其余的无法推送。

相关文章

  • java HashMap的keyset实例

    java HashMap的keyset实例

    简单地说,在keyset方法返回的set上做修改会改变原来hashmap,这也许不是你想要的,于是形成一个隐藏的bug
    2013-04-04
  • 一文掌握JVM Safe Point

    一文掌握JVM Safe Point

    关于 Safe Point 是 JVM 中很关键的一个概念,但我估计有不少同学不是很懂,于是今天跟大家来深入聊聊 Safe Point,通过本文学习你会了解什么是 Safe Point?为啥需要 Safe Point?Safe Point 与 Stop the World 的关系?感兴趣的朋友一起看看吧
    2022-10-10
  • IDEA中设置Tab健为4个空格的方法

    IDEA中设置Tab健为4个空格的方法

    这篇文章给大家介绍了代码缩进用空格还是Tab?(IDEA中设置Tab健为4个空格)的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-03-03
  • SpringBoot3解决跨域请求的方案小结

    SpringBoot3解决跨域请求的方案小结

    解决跨域请求,主要有JSONP,iframe,window.name,CORS等方式,其中CORS方式是最常用的跨域实现方式,而且是对各种请求方法、各种数据请求类型都是完美支持的,本文介绍了SpringBoot3解决跨域请求的方案小结,需要的朋友可以参考下
    2024-07-07
  • 前端往后端传递参数的方式有哪些举例详解

    前端往后端传递参数的方式有哪些举例详解

    这篇文章主要介绍了前端向后端传递参数的多种方式,包括URL参数(查询参数、路径参数)、请求体(JSON数据、表单数据、文件上传)、请求头和Cookie,并总结了每种方式的适用场景,需要的朋友可以参考下
    2025-03-03
  • 关于break和continue以及label的区别和作用(详解)

    关于break和continue以及label的区别和作用(详解)

    下面小编就为大家带来一篇关于break和continue以及label的区别和作用(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • IDEA “Cannot resolve symbol”爆红问题解决

    IDEA “Cannot resolve symbol”爆红问题解决

    最近发现个问题,IDEA 无法识别同一个 package 里的其他类,将其显示为红色,本文就来介绍一下IDEA “Cannot resolve symbol”爆红问题解决,感兴趣的可以了解一下
    2023-10-10
  • Java CompletableFuture的使用详解

    Java CompletableFuture的使用详解

    这篇文章主要介绍了Java CompletableFuture的使用详解,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下
    2021-03-03
  • Java线程中的线程本地变量ThreadLocal详解

    Java线程中的线程本地变量ThreadLocal详解

    这篇文章主要介绍了Java线程中的线程本地变量ThreadLocal详解,ThreadLocal存放的值是线程内共享的,线程间互斥的,主要用于线程内共享一些数据,避免通过参数来传递,这样处理后,能够优雅的解决一些实际问题,需要的朋友可以参考下
    2023-11-11
  • Java 深入探究讲解工厂方法模式

    Java 深入探究讲解工厂方法模式

    工厂方法模式(FACTORY METHOD)是一种常用的类创建型设计模式,此模式的核心精神是封装类中变化的部分,提取其中个性化善变的部分为独立类,通过依赖注入以达到解耦、复用和方便后期维护拓展的目的。它的核心结构有四个角色,分别是抽象工厂、具体工厂、抽象产品、具体产品
    2022-04-04

最新评论