java整合socket通信的完整步骤记录

 更新时间:2025年07月25日 10:50:24   作者:kimloner  
在Java编程中Socket通信是实现网络应用程序之间通信的基础,Socket是TCP/IP协议的一部分,它提供了进程间的网络通信,这篇文章主要介绍了java整合socket通信的相关资料,需要的朋友可以参考下

前言

大家好,由于工作上业务的需要,在java项目中引入了socket通信,特此记录一下,用以备份,本文章中的socket通信实现了,服务端与客户端的双向通讯,以及二者之间的心跳通信,服务端重启之后,客户端的自动重连功能。

原理

Socket通信是计算机网络中常用的一种通信机制,它是基于TCP/IP协议实现的,提供了两个应用程序之间通过网络进行数据交换的能力。Socket本质上是一种抽象概念,为网络服务提供了一组API接口。

  • Socket通信模型

Socket通信模型通常包括客户端和服务器端两部分。
服务器端:负责在特定的端口监听来自客户端的连接请求,当一个请求到达时,服务器会与客户端建立连接,并为客户端提供相应的服务。
客户端:主动向服务器的特定IP地址和端口发起连接请求,连接成功后,客户端可以通过建立的连接向服务器发送请求并接收响应。

  • Socket通信过程

Socket通信过程一般包括以下几个步骤:

  • 服务器监听:

服务器通过socket()函数创建一个Socket,并通过bind()函数将其绑定到一个IP地址和端口上。然后,服务器调用listen()函数开始监听该端口上的连接请求。

  • 客户端请求连接:

客户端也通过socket()函数创建一个Socket,然后调用connect()函数尝试与服务器的指定IP地址和端口建立连接。

  • 服务器接受连接:

服务器在接收到客户端的连接请求后,通过accept()函数接受这个连接。如果成功,accept()函数会返回一个新的Socket(通常称为“子Socket”),用于与该客户端进行通信。

数据传输:连接建立成功后,客户端和服务器就可以通过新建立的Socket进行数据传输了。数据传输可以是单向的也可以是双向的。应用程序可以使用send(), write(), recv(), read()等函数进行数据发送和接收操作。

  • 断开连接:

当通信结束后,客户端和服务器都可以调用close()函数来关闭自己持有的Socket,从而断开两者之间的连接。
TCP vs UDP
在实际使用中,基于Socket的通信方式主要有两种:基于TCP和基于UDP。
TCP Socket:提供可靠、面向连接、基于字节流的通信方式。适用对数据完整性和顺序有要求的应用场景。
UDP Socket:提供无连接、不保证可靠性、基于消息(数据报)的通信方式。适用于对实时性要求高、容忍部分数据丢失或乱序的应用场景。

代码实现

服务端

服务端主体逻辑:和每个接入的客户端都会使用独立线程建立起长连接,二者之间使用心跳保持联系,使用clientSockets 存储了每个客户端的信息便于和客户端建立起联系。

package com.example.demo2.server.socket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @author kim
 */
@Component
public class TcpServer implements DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(TcpServer.class);

    private final SocketServerConfig config;
    private ServerSocket serverSocket;
    private ExecutorService executorService;
    private volatile boolean running = true;

    // 存储客户端连接
    private final Map<String, Socket> clientSockets = new ConcurrentHashMap<>();

    public TcpServer(SocketServerConfig config) {
        this.config = config;
    }

    @PostConstruct
    public void start() throws IOException {
        executorService = Executors.newFixedThreadPool(config.getMaxThreads());
        serverSocket = new ServerSocket(config.getPort());
        logger.info("平台socket服务已启动, 监听端口为 {}", config.getPort());

        new Thread(this::acceptConnections).start();
    }

    private void acceptConnections() {
        while (running) {
            try {
                Socket clientSocket = serverSocket.accept();
                String clientAddress = clientSocket.getInetAddress().getHostAddress();
                clientSockets.put(clientAddress, clientSocket);
                executorService.execute(new ClientHandler(clientSocket, clientAddress));
            } catch (IOException e) {
                if (running) {
                    logger.error("Connection accept error", e);
                }
            }
        }
    }

    // 用于发送消息到特定客户端
    public void sendMessageToClient(String clientAddress, String message) throws IOException {
        Socket socket = clientSockets.get(clientAddress);
        if (socket != null && !socket.isClosed()) {
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            out.println(message);
            logger.info("Sent message to {}: {}", clientAddress, message);
        } else {
            logger.warn("Client {} is not connected or socket is closed", clientAddress);
        }
    }

    @Override
    public void destroy() throws Exception {
        running = false;
        executorService.shutdown();
        for (Socket socket : clientSockets.values()) {
            if (!socket.isClosed()) {
                socket.close();
            }
        }
        if (serverSocket != null && !serverSocket.isClosed()) {
            serverSocket.close();
        }
        logger.info("TCP Server stopped");
    }

    private class ClientHandler implements Runnable {
        private final Socket clientSocket;
        private final String clientAddress;

        ClientHandler(Socket socket, String address) {
            this.clientSocket = socket;
            this.clientAddress = address;
        }

        @Override
        public void run() {
            try (BufferedReader in = new BufferedReader(
                    new InputStreamReader(clientSocket.getInputStream()));
                 PrintWriter out = new PrintWriter(
                         clientSocket.getOutputStream(), true)) {
                logger.info("Client connected: {}", clientAddress);
                String input;
                while ((input = in.readLine()) != null) {
                    logger.debug("Received: {}", input);
                    out.println(input);
                    logger.info("Client connected: {}", clientAddress);
                }
            } catch (IOException e) {
                logger.warn("Client connection closed: {}", e.getMessage());
            } finally {
                try {
                    clientSockets.remove(clientAddress);
                    clientSocket.close();
                } catch (IOException e) {
                    logger.error("Error closing socket", e);
                }
            }
        }


    }
}

配置类

package com.example.demo2.server.socket;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "socket.server")
@Data
public class SocketServerConfig {
    private int port;
    private int maxThreads;
    
    // Getters and Setters
}

配置文件

server:
  port: 8080

socket:
  server:
    port: 8088
    maxThreads: 50

向客户端发送测试信息

 @GetMapping("/send")
    public String sendMessage(String clientAddress) throws IOException {
       tcpServer.sendMessageToClient("192.168.3.8","77777777777");
       return "success";
    }

服务端发送日志

客户端接收日志

客户端

客户端主体逻辑,使用自己设计的心跳机制,监听服务端状态,如果服务端断开连接,则客户端会尝试重新建立联系。

package com.example.demo1.socketclient;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

@Service
public class TcpClientService implements ApplicationRunner, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(TcpClientService.class);

    private final SocketClientConfig config;
    private Socket socket;
    private PrintWriter out;
    private BufferedReader in;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final MessageListener messageListener;

    @Autowired
    public TcpClientService(SocketClientConfig config, MessageListener messageListener) {
        this.config = config;
        this.messageListener = messageListener;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        initializeConnection();
    }

    @Override
    public void destroy() throws Exception {
        running.set(false);
        closeResources();
        executor.shutdown();
    }

    private synchronized void initializeConnection() {
        new Thread(() -> {
            while (running.get()) {
                try {
                    socket = new Socket();
                    socket.setKeepAlive(true);
                    socket.setSoTimeout(config.getHeartbeatTimeout());
                    socket.connect(new InetSocketAddress(config.getHost(), config.getPort()), config.getTimeout());

                    out = new PrintWriter(socket.getOutputStream(), true);
                    in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                    logger.info("Connected to server {}:{}", config.getHost(), config.getPort());

                    executor.execute(this::listenToServer);

                    startHeartbeat();

                    while (!socket.isClosed() && socket.isConnected()) {
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    logger.warn("Connection error: {}", e.getMessage());
                } finally {
                    closeResources();
                    if (running.get()) {
                        logger.info("Attempting to reconnect in 5 seconds...");
                        sleepSafely(5000);
                    }
                }
            }
        }, "tcp-client-connector").start();
    }

    private void listenToServer() {
        try {
            String response;
            while (running.get() && !socket.isClosed()) {
                try {
                    response = in.readLine();
                    if (response == null) {
                        logger.warn("Server closed connection");
                        break; // 终止循环,表示连接已关闭
                    }
                    logger.debug("Received server message: {}", response);
                    messageListener.onMessage(response);
                } catch (SocketTimeoutException e) {
                    logger.debug("Socket read timeout");
                } catch (IOException e) {
                    if (!socket.isClosed()) {
                        logger.warn("Connection lost: {}", e.getMessage());
                        break; // 终止循环,表示连接已中断
                    }
                }
            }
        } finally {
            closeResources(); // 确保资源关闭
        }
    }

    private void startHeartbeat() {
        new Thread(() -> {
            while (running.get() && !socket.isClosed()) {
                try {
                    sendMessageInternal("HEARTBEAT");
                    sleepSafely(config.getHeartbeatInterval());
                } catch (Exception e) {
                    logger.warn("Heartbeat failed: {}", e.getMessage());
                    break;
                }
            }
        }, "heartbeat-thread").start();
    }

    public synchronized void sendMessage(String message) throws IOException {
        if (socket == null || !socket.isConnected()) {
            throw new IOException("Not connected to server");
        }
        out.println(message);
        logger.debug("Sent message: {}", message);
    }

    private synchronized void sendMessageInternal(String message) {
        try {
            if (socket != null && socket.isConnected()) {
                out.println(message);
            }
        } catch (Exception e) {
            logger.warn("Failed to send heartbeat");
        }
    }

    private synchronized void closeResources() {
        try {
            if (out != null) out.close();
            if (in != null) in.close();
            if (socket != null) socket.close();
        } catch (IOException e) {
            logger.warn("Error closing resources: {}", e.getMessage());
        }
    }

    private void sleepSafely(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public interface MessageListener {
        void onMessage(String message);
    }
}

消息监听:监听服务发送的消息

package com.example.demo1.socketclient;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class ServerMessageHandler implements TcpClientService.MessageListener {
    private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);

    @Override
    public void onMessage(String message) {
       if(StringUtils.isNotEmpty(message)){
           if (!message.contains("HEARTBEAT")){
               //处理其他逻辑
               System.out.println("接收服务端消息成功:"+message);
           }else{
               //心跳消息
               System.out.println(message);
           }
       }
    }


}

配置类

package com.example.demo1.socketclient;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "socket.client")
@Data
public class SocketClientConfig {
    private String host;
    private int port;
    private int timeout;
    private int heartbeatInterval;
    private int heartbeatTimeout;

    // Getters and Setters
}

发送测试方法

 @GetMapping("/send")
    public ResponseEntity<String> sendMessage() {
        try {
            tcpClient.sendMessage("客户端发送信息");
            return ResponseEntity.ok("Message sent");
        } catch (IOException e) {
            return ResponseEntity.status(503).body("Server unavailable");
        }
    }

配置文件

socket:
  client:
    host: 192.168.3.8 #服务端ip地址
    port: 8088 #监听端口
    timeout: 5000
    heartbeat-interval: 3000    # 心跳间隔30秒
    heartbeat-timeout: 6000     # 心跳超时60秒
server:
  port: 8082


客户端发送信息后,服务端会接收到信息。

总结

以上就是java接入socket通信服务端与客户端的全部代码,二者实现了互相通信,具体的业务场景则需要小伙伴们在此基础上额外的设计逻辑了,有其他疑问或者想要测试demo的可以后台私我,看到会回复。

到此这篇关于java整合socket通信的文章就介绍到这了,更多相关java整合socket通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java框架Struts2实现图片上传功能

    Java框架Struts2实现图片上传功能

    这篇文章主要为大家详细介绍了Java框架Struts2实现图片上传功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-08-08
  • Java性能优化之关于大对象复用的目标和注意点

    Java性能优化之关于大对象复用的目标和注意点

    这篇文章主要介绍了Java性能优化之关于大对象复用的目标和注意点,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-03-03
  • Spring的UnsatisfiedDependencyException异常的解决

    Spring的UnsatisfiedDependencyException异常的解决

    在使用Spring框架开发应用程序时,我们经常会遇到各种异常,本文主要介绍了Spring的UnsatisfiedDependencyException异常的解决,感兴趣的可以了解一下
    2023-11-11
  • Jmeter BeanShell 内置变量vars、props、prev的使用详解

    Jmeter BeanShell 内置变量vars、props、prev的使用详解

    这篇文章主要介绍了Jmeter BeanShell 内置变量vars、props、prev的使用 ,文中给大家介绍了Jmeter中关于BeanShell的相关知识,结合实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-10-10
  • 基于SpringBoot解决CORS跨域的问题(@CrossOrigin)

    基于SpringBoot解决CORS跨域的问题(@CrossOrigin)

    这篇文章主要介绍了基于SpringBoot解决CORS跨域的问题(@CrossOrigin),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • JDBC连接SQL Server数据库实现增删改查的全过程

    JDBC连接SQL Server数据库实现增删改查的全过程

    实际开发中手动的输入SQL语句是少之又少,大多数情况下是通过编译代码进行来控制自动执行,下面这篇文章主要给大家介绍了关于JDBC连接SQL Server数据库实现增删改查的相关资料,需要的朋友可以参考下
    2023-04-04
  • Spring JPA之save方法示例详解

    Spring JPA之save方法示例详解

    这篇文章主要为大家介绍了Spring JPA之save方法示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • mybatis mapper.xml 注释带参数的坑及解决

    mybatis mapper.xml 注释带参数的坑及解决

    这篇文章主要介绍了mybatis mapper.xml 注释带参数的坑及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • java有序二叉树的删除节点方式

    java有序二叉树的删除节点方式

    文章描述了在二叉树中删除节点的三种情况及其对应的操作步骤,通过递归找到节点及其父节点,并根据节点的子树情况(无子树、单子树、双子树)进行相应的删除操作,文章还提供了一个测试类来验证删除操作的正确性
    2024-12-12
  • Java进阶学习之如何开启远程调式

    Java进阶学习之如何开启远程调式

    Java开发中的远程调试是一项至关重要的技能,特别是在处理生产环境的问题或者协作开发时,这篇文章主要介绍了Java进阶学习之如何开启远程调式的相关资料,需要的朋友可以参考下
    2025-03-03

最新评论