java Socket编程实现I/O多路复用的示例

 更新时间:2023年09月05日 14:28:41   作者:nika_yo_nihao  
本文主要介绍了java Socket编程实现I/O多路复用的示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

I/O多路复用

最基本的Socket模型:可以让进程跨主机通信.

具体流程是这样:服务端开辟一个Socket,为这个Socket绑定IP地址(找到机器的网卡)和端口号(找到进程),通过listen()函数进行监听。在客户端通过connect()函数发起连接后,服务端的Socket会维护两个队列,一个是还没有完全建立连接的队列,叫TCP半连接队列;另一个是已经建立连接(完成了三次握手)的队列,叫全连接队列。服务端会通过accept()函数从全连接队列拿出一个socket,后续传输都用这个Socket。

多进程模型

为了让一个服务端服务多个客户端,由此产生了多进程模型。

本质就是让一个父进程去处理和多个客户端的连接,然后每连一个客户端,都fork一个父进程的子进程,让这个子进程用于和新增的客户端进行数据的读写。

但这种方式弊端还是很大的,因为创建进程的开销是很大的,需要为这个进程分配虚拟内存,栈,全局变量等信息,在这个进程进行上下文切换的时候是很费劲的。
因此这个模型也无法支持太多的并发量。

多线程模型

既然进程这个单位是很重且开销很大的,因此我们可以用线程去替代线程。

同一个进程的多个线程可以去共享进程中的部分资源,如文件描述符列表,全局数据,堆等,这些共享资源是无需切换的。需要切换的只有线程的私有数据,如帮助线程上下文切换的寄存器。所以线程的创建的开销是很小的。

多线程模型的过程是这样的,父进程同样负责跟多个客户端创建连接,然后将已经创建好连接的socket放入到队列中。在这个父进程中的多个线程负责拿锁去从这个队列里面取出socket,然后往对应的取出的socket中进行读写。

但就算是这样,创建线程的开销即使不大,也没办法支撑太多的并发,只是比多进程模型更优秀。

既然为每个请求分配一个线程或者进程的方式开销都有些大,那么有可能只用一个进程取维护多个Socket呢?

下面我们就要说说I/O复用技术了,其实I/O多路复用技术优点类似于CPU时间片的利用。虽然一个进程同一时间内只能处理一个socket,但是如果说这个socket的处理时间只有1ms,那么放到1s内来看,它其实也是有100的并发量的。

select/poll/epoll是三个多路复用的接口。

select/poll/epoll

select实现多路复用的方式是,把已经连接的socket的文件描述符放入内核中,内核负责遍历这些socket,当检查有读或者写事件的时候,就把这个socket标记为可读或者可写,接着把这些文件描述符集合拷贝回用户态。用户态再遍历取出可读或者可写的socket并对其进行处理。

文件描述符集合就是每一项都指向一个打开的文件。

poll与select的区别就是poll是用链表存储。

epoll的内核对socket的存储结构和select/poll就不同了,它们是线性的存储结构,每次执行这两个方法的时候都是把整个socket传给内核。而epoll是用红黑树存储,当有一个新的需要检测的socket来临时,只需要传一个即可,大幅减少了内核和用户空间的拷贝过程。

此外,epoll的内核还维护了一个链表来记录就绪事件,当检测的socket有就绪事件发生时,就会通过回调函数把这个事件加入这个就绪事件链表当中,当用户态需要这个就绪链表的时候,只会返回有事件发生的文件描述符列表,拷贝回用户态,无需像select/poll一样,无论这个socket是否有事件发生,都全部拷贝回去。

Socket编程实现I/O多路复用模型

Server端:主线程负责处理连接事件,读线程交给异步支线线程处理,达到I/O多路复用的效果,同时这也是Netty框架的部分底层思想实现噢~

public class SimpleServer {
    public static void main(String[] args) throws IOException {
​
        System.out.println("这里是服务端");
​
        //创建服务端Channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
​
        //设置非阻塞
        serverChannel.configureBlocking(false);
​
        //创建Selector
        Selector selector = Selector.open();
​
        //0代表不对任何事件感兴趣
        SelectionKey selectionKey = serverChannel.register(selector, 0, serverChannel);
​
        //对连接接收事件感兴趣
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
​
        //绑定端口
        serverChannel.bind(new InetSocketAddress(8080));
​
        NioEventLoop executor = new NioEventLoop();
        //主线程负责连接
        while (true){
            //当没有事件到来的时候,这里是阻塞的,有事件的时候会自动运行
            selector.select();
            //如果有事件到来,这里可以得到注册到该selector上的所有的key,每一个key上都有一个channel
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //得到集合的迭代器
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()){
                //得到每一个key
                SelectionKey key = keyIterator.next();
                //首先要从集合中把key删除,否则会一直报告该key
                keyIterator.remove();
                //接下来就要处理事件,判断selector轮询到的是什么事件,并根据事件作出回应
                //如果是连接事件
                if(key.isAcceptable()){
                    //之前把服务端channel注册到selector上时候,把serverChannel放进来了
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    //接收客户端channel
                    SocketChannel clientChannel = channel.accept();
                    clientChannel.configureBlocking(false);
                    SelectionKey clientSocketKey = clientChannel.register(selector, 0, clientChannel);
                    //将客户端channel设置为可读事件
                    clientSocketKey.interestOps(SelectionKey.OP_READ);
                    System.out.println("客户端连接成功"+System.currentTimeMillis());
                    //worker线程开始从客户端读数据,把客户端的channel交给worker
                    executor.register(clientChannel,executor);
                    //用channel写回一条信息
                    clientChannel.write(ByteBuffer.wrap("服务端写回客户端成功".getBytes()));
                    System.out.println("向客户端发送数据成功"+System.currentTimeMillis());
                }
            }
        }
​
    }
}
public class NioEventLoop extends SingleThreadEventLoop{
​
    protected Selector selector ;
​
    public NioEventLoop() throws IOException {
        this.selector = Selector.open();
    }
​
    public Selector getSelector(){
        return this.selector;
    }
​
    //循环阻塞,如果有事件发生,或者队列有东西,就放行
    private void select() throws IOException {
        while (true){
            //阻塞等待事件,如果3s都没有事件过来,可能是没有初始化。
            int select = selector.select(3000);
            if(select != 0 || hasTasks()){
                break;
            }
        }
    }
​
    private void runAllTasks() {
​
        for (;;){
            Runnable task = tasksQueue.poll();
            if(task == null){
                break;
            }
            System.out.println("开始处理注册事件");
            task.run();
        }
    }
​
    private void processSelectedKeys() throws IOException {
        System.out.println("开始处理I/O事件");
        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
        while (iterator.hasNext()){
            SelectionKey key = iterator.next();
            iterator.remove();
            //由于连接事件被处理完了,只剩下只读事件了
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int len = 0;
                len = socketChannel.read(buffer);
                if (len == -1) {
                    socketChannel.close();
                    break;
                } else {
                    buffer.flip();
                    System.out.println(Charset.defaultCharset().decode(buffer).toString() + System.currentTimeMillis());
                }
            }
        }
    }
​
    @Override
    public void run(){
        while (true) {
            try {
                //等待任务
                select();
                //I/O任务来了先处理I/O任务
                processSelectedKeys();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }finally {
                //最后处理队列任务。第一遍走过来的时候会先处理队列任务
                runAllTasks();
            }
        }
    }
}
public abstract class SingleThreadEventExecutor implements Executor {
​
    private volatile boolean isSingle = false;
​
    private RejectedExecutionHandler rejectedExecutionHandler;
​
    protected Queue<Runnable> tasksQueue;
​
    protected Thread thread;
​
    //初始化的时候,1.构建线程池的属性,如队列,拒绝策略。2.构建当前线程的Selector
    public SingleThreadEventExecutor() {
        this.tasksQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
        this.rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
    }
​
    protected final void reject(Runnable task) {
//        rejectedExecutionHandler.rejectedExecution(task, this);
    }
​
    protected boolean inEventLoop(Thread thread){
        return this.thread == thread;
    }
​
    protected boolean hasTasks(){
        return this.tasksQueue.isEmpty();
    }
​
​
    @Override
    public void execute(Runnable task) {
​
        //此时仍然是主线程。先把任务放入队列,后续处理
        addTask(task);
        System.out.println("任务添加完成");
        startThread(task);
    }
​
    protected final void addTask(Runnable task){
        this.tasksQueue.add(task);
    }
​
    private void startThread(Runnable task) {
        if (isSingle) {
            return;
        }
        isSingle = true;
        System.out.println("新线程任务成功创建");
        //这是个异步线程,单线程执行器的核心
        new Thread(()->{
            thread = Thread.currentThread();
            SingleThreadEventExecutor.this.run();
            System.out.println("新线程任务跑完了");
        }).start();
    }
​
​
    protected abstract void run();
​
​
}
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor{
​
    private void register0(SocketChannel socketChannel,NioEventLoop nioEventLoop){
        try {
            socketChannel.configureBlocking(false);
            socketChannel.register(nioEventLoop.getSelector(), SelectionKey.OP_READ);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
​
    //注册socketChannel到当前的selector上
    public void register(SocketChannel socketChannel,NioEventLoop nioEventLoop){
        //先判断当前线程是否是执行器线程,如果不是,说明是主线程,那么就还没被构造。
        if(inEventLoop(Thread.currentThread())){
            register0(socketChannel,nioEventLoop);
        }else {
            //否则是第一次注册,就先构造线程即可。
            execute(()->{
                register0(socketChannel,nioEventLoop);
                System.out.println("executor执行完成");
            });
        }
    }
}

到此这篇关于java Socket编程实现I/O多路复用的示例的文章就介绍到这了,更多相关jav I/O多路复用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java获取Process子进程进程ID方法详解

    Java获取Process子进程进程ID方法详解

    这篇文章主要介绍了Java获取Process子进程进程ID方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2022-12-12
  • rocketmq如何修改存储路径

    rocketmq如何修改存储路径

    这篇文章主要介绍了rocketmq如何修改存储路径的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • Spring Boot 2.4新特性减少95%内存占用问题

    Spring Boot 2.4新特性减少95%内存占用问题

    这篇文章主要介绍了Spring Boot 2.4新特性减少95%内存占用问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • 七段小代码解决Java程序常见的崩溃场景

    七段小代码解决Java程序常见的崩溃场景

    这篇文章主要为大家介绍了七段小代码解决Java程序常见的崩溃场景,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • 关于SpringSecurity配置403权限访问页面的完整代码

    关于SpringSecurity配置403权限访问页面的完整代码

    本文给大家分享SpringSecurity配置403权限访问页面的完整代码,配置之前和配置之后的详细介绍,代码简单易懂,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-06-06
  • spring boot拦截器的使用场景示例详解

    spring boot拦截器的使用场景示例详解

    这篇文章主要给大家介绍了关于spring boot拦截器的使用场景,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2020-05-05
  • SpringBoot整合Druid数据源的方法实现

    SpringBoot整合Druid数据源的方法实现

    Druid是阿里开发的一款开源的数据源,被很多人认为是Java语言中最好的数据库连接池,本文主要介绍了SpringBoot整合Druid数据源的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • springboot整合mybatis-plus代码生成器的配置解析

    springboot整合mybatis-plus代码生成器的配置解析

    这篇文章主要介绍了springboot整合mybatis-plus代码生成器的配置解析,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • 详解Elastic Search搜索引擎在SpringBoot中的实践

    详解Elastic Search搜索引擎在SpringBoot中的实践

    本篇文章主要介绍了Elastic Search搜索引擎在SpringBoot中的实践,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-01-01
  • java使用MulticastSocket实现组播

    java使用MulticastSocket实现组播

    这篇文章主要为大家详细介绍了java使用MulticastSocket实现组播,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-01-01

最新评论