SpringBoot+netty-socketio实现服务器端消息推送

 更新时间:2021年03月17日 10:48:00   作者:ATwill...  
这篇文章主要介绍了SpringBoot+netty-socketio实现服务器端消息推送,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

首先:因为工作需要,需要对接socket.io框架对接,所以目前只能使用netty-socketio。websocket是不支持对接socket.io框架的。

netty-socketio顾名思义他是一个底层基于netty'实现的socket。

在springboot项目中的集成,请看下面的代码

maven依赖

<dependency>
 <groupId>com.corundumstudio.socketio</groupId>
 <artifactId>netty-socketio</artifactId>
 <version>1.7.11</version>
</dependency>

 下面就是代码了

首先是配置参数

#socketio配置
socketio:
 host: localhost
 port: 9099
 # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
 maxFramePayloadLength: 1048576
 # 设置http交互最大内容长度
 maxHttpContentLength: 1048576
 # socket连接数大小(如只监听一个端口boss线程组为1即可)
 bossCount: 1
 workCount: 100
 allowCustomRequests: true
 # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
 upgradeTimeout: 1000000
 # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
 pingTimeout: 6000000
 # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
 pingInterval: 25000

上面的注释写的很清楚。下面是config代码

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * kcm
 */
@Component
public class PushServer implements InitializingBean {

  @Autowired
  private EventListenner eventListenner;

  @Value("${socketio.port}")
  private int serverPort;

  @Value("${socketio.host}")
  private String serverHost;

  @Value("${socketio.bossCount}")
  private int bossCount;

  @Value("${socketio.workCount}")
  private int workCount;

  @Value("${socketio.allowCustomRequests}")
  private boolean allowCustomRequests;

  @Value("${socketio.upgradeTimeout}")
  private int upgradeTimeout;

  @Value("${socketio.pingTimeout}")
  private int pingTimeout;

  @Value("${socketio.pingInterval}")
  private int pingInterval;

  @Override
  public void afterPropertiesSet() throws Exception {
    Configuration config = new Configuration();
    config.setPort(serverPort);
    config.setHostname(serverHost);
    config.setBossThreads(bossCount);
    config.setWorkerThreads(workCount);
    config.setAllowCustomRequests(allowCustomRequests);
    config.setUpgradeTimeout(upgradeTimeout);
    config.setPingTimeout(pingTimeout);
    config.setPingInterval(pingInterval);

    SocketConfig socketConfig = new SocketConfig();
    socketConfig.setReuseAddress(true);
    socketConfig.setTcpNoDelay(true);
    socketConfig.setSoLinger(0);
    config.setSocketConfig(socketConfig);

    SocketIOServer server = new SocketIOServer(config);
    server.addListeners(eventListenner);
    server.start();
    System.out.println("启动正常");
  }
}

在就是监听代码

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.apache.commons.lang3.StringUtils;
import org.bangying.auth.JwtSupport;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

@Component
public class EventListenner {
  @Resource
  private ClientCache clientCache;

  @Resource
  private JwtSupport jwtSupport;

  /**
   * 客户端连接
   *
   * @param client
   */
  @OnConnect
  public void onConnect(SocketIOClient client) {
    String userId = client.getHandshakeData().getSingleUrlParam("userId");
//    userId = jwtSupport.getApplicationUser().getId().toString();
//    userId = "8";
    UUID sessionId = client.getSessionId();
    clientCache.saveClient(userId, sessionId, client);
    System.out.println("建立连接");
  }

  /**
   * 客户端断开
   *
   * @param client
   */
  @OnDisconnect
  public void onDisconnect(SocketIOClient client) {
    String userId = client.getHandshakeData().getSingleUrlParam("userId");
    if (StringUtils.isNotBlank(userId)) {
      clientCache.deleteSessionClient(userId, client.getSessionId());
      System.out.println("关闭连接");
    }
  }

  //消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息
  // 暂未使用
  @OnEvent("messageevent")
  public void onEvent(SocketIOClient client, AckRequest request) {
  }
}

本地缓存信息

import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * kcm
 */
@Component
public class ClientCache {

  //本地缓存
  private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
  /**
   * 存入本地缓存
   * @param userId 用户ID
   * @param sessionId 页面sessionID
   * @param socketIOClient 页面对应的通道连接信息
   */
  public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
    if(StringUtils.isNotBlank(userId)){
      HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
      if(sessionIdClientCache==null){
        sessionIdClientCache = new HashMap<>();
      }
      sessionIdClientCache.put(sessionId,socketIOClient);
      concurrentHashMap.put(userId,sessionIdClientCache);
    }
  }
  /**
   * 根据用户ID获取所有通道信息
   * @param userId
   * @return
   */
  public HashMap<UUID, SocketIOClient> getUserClient(String userId){
    return concurrentHashMap.get(userId);
  }
  /**
   * 根据用户ID及页面sessionID删除页面链接信息
   * @param userId
   * @param sessionId
   */
  public void deleteSessionClient(String userId,UUID sessionId){
    concurrentHashMap.get(userId).remove(sessionId);
  }
}

下面是存储客户端连接信息

import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * kcm
 */
@Component
public class ClientCache {

  //本地缓存
  private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
  /**
   * 存入本地缓存
   * @param userId 用户ID
   * @param sessionId 页面sessionID
   * @param socketIOClient 页面对应的通道连接信息
   */
  public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
    if(StringUtils.isNotBlank(userId)){
      HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
      if(sessionIdClientCache==null){
        sessionIdClientCache = new HashMap<>();
      }
      sessionIdClientCache.put(sessionId,socketIOClient);
      concurrentHashMap.put(userId,sessionIdClientCache);
    }
  }
  /**
   * 根据用户ID获取所有通道信息
   * @param userId
   * @return
   */
  public HashMap<UUID, SocketIOClient> getUserClient(String userId){
    return concurrentHashMap.get(userId);
  }
  /**
   * 根据用户ID及页面sessionID删除页面链接信息
   * @param userId
   * @param sessionId
   */
  public void deleteSessionClient(String userId,UUID sessionId){
    concurrentHashMap.get(userId).remove(sessionId);
  }
}

控制层推送方法

@RestController
@RequestMapping("/push")
public class PushController {
  @Resource
  private ClientCache clientCache;

  @Autowired
  private JwtSupport jwtSupport;

  @GetMapping("/message")
  public String pushTuUser(@Param("id") String id){
    Integer userId = jwtSupport.getApplicationUser().getId();
    HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(String.valueOf(userId));
    userClient.forEach((uuid, socketIOClient) -> {
      //向客户端推送消息
      socketIOClient.sendEvent("chatevent","服务端推送消息");
    });
    return "success";
  }
}

到此这篇关于SpringBoot+netty-socketio实现服务器端消息推送的文章就介绍到这了,更多相关SpringBoot netty-socketio服务器端推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java转换流(InputStreamReader/OutputStreamWriter)的使用

    Java转换流(InputStreamReader/OutputStreamWriter)的使用

    本文主要介绍了Java转换流(InputStreamReader/OutputStreamWriter)的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • springboot中server.ssl.key-store配置路径的问题小结

    springboot中server.ssl.key-store配置路径的问题小结

    这篇文章主要介绍了springboot中server.ssl.key-store配置路径的问题,文中还记录了Spring Boot SSL(https)实例,介绍在web程序中使用自签名的SSL(HTTPS)证书及创建SSL认证,感兴趣的朋友跟随小编一起看看吧
    2024-02-02
  • Java三个类加载器及它们的相互关系

    Java三个类加载器及它们的相互关系

    Java在需要使用类别的时候,才会将类别加载,Java的类别载入是由类别载入器(Class loader)来达到的,预设上,在程序启动之后,主要会有三个类别加载器,文中详细介绍了这三个类加载器,需要的朋友可以参考下
    2021-06-06
  • Jdbc连接数据库基本步骤详解

    Jdbc连接数据库基本步骤详解

    这篇文章主要为大家详细介绍了Jdbc连接数据库的基本步骤,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-10-10
  • java中如何实现 zip rar 7z 压缩包解压

    java中如何实现 zip rar 7z 压缩包解压

    这篇文章主要介绍了java中如何实现 zip rar 7z 压缩包解压问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • rabbitmq中routingkey的作用说明

    rabbitmq中routingkey的作用说明

    这篇文章主要介绍了rabbitmq中routingkey的作用说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • springboot整合redis实现发送邮箱并验证

    springboot整合redis实现发送邮箱并验证

    大家好,本篇文章主要讲的是springboot整合redis实现发送邮箱并验证,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-01-01
  • java 单播、广播、组播详解及实例代码

    java 单播、广播、组播详解及实例代码

    这篇文章主要介绍了java 单播、广播、组播详解及实例代码的相关资料,需要的朋友可以参考下
    2017-02-02
  • SpringCloud Gateway实现API接口加解密

    SpringCloud Gateway实现API接口加解密

    这篇文章主要为大家介绍了SpringCloud Gateway如何实现API接口加解密的,文中的示例代码讲解详细,对我们学习有一定的帮助,需要的可以参考一下
    2022-06-06
  • Java实现中文算数验证码的实现示例(算数运算+-*/)

    Java实现中文算数验证码的实现示例(算数运算+-*/)

    这篇文章主要介绍了Java实现中文算数验证码的实现示例(算数运算+-*/),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07

最新评论