Java AQS信号量Semaphore的使用

 更新时间:2023年02月02日 10:00:24   作者:飞奔的小付  
Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为一种多线程并发控制工具来使用。本文将详解其原理与使用方法,感兴趣的可以学习一下

一.什么是Semaphore

Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。

Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。

PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
①S减1;
②若S减1后仍大于或等于0,则进程继续执行;
③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V操作的主要动作是:
①S加1;
②若相加后结果大于0,则进程继续执行;
③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。

二.Semaphore的使用

构造器

  public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
  public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
   }

permits 表示许可证的数量(资源数)

fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

常用方法

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)

  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

应用场景

可以用于做流量控制,特别是公用资源有限的应用场景

限流

/**
     * 实现一个同时只能处理5个请求的限流器
     */
    private static Semaphore semaphore = new Semaphore(5);
    /**
     * 定义一个线程池
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
    public static void exec() {
        try {
            semaphore.acquire(1);
            // 模拟真实方法执行
            System.out.println("执行exec方法" );
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        {
            for (; ; ) {
                Thread.sleep(100);
                // 模拟请求以10个/s的速度
                executor.execute(() -> exec());
            }
        }
    }

三.Semaphore源码分析

主要关注 Semaphore的加锁解锁(共享锁)逻辑实现,线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现

	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //尝试获取共享锁,大于等于0则直接获取锁成功,小于0时则共享锁阻塞
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared的实现

   final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                // 当减一之后的值小于0 或者 
                // compareAndSetState成功,把state变为remaining,即将状态减一
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

入队阻塞

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入队,创建节点 使用共享模式
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            //获取当前节点的前躯节点
                final Node p = node.predecessor();
                //如果节点为head节点
                if (p == head) {
                	//阻塞动作比较重,通常会再尝试获取资源,没有获取到返回负数
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //判断是否可以阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

入队操作

private Node addWaiter(Node mode) {
		//构建节点,模式是共享模式
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
        	//设置前一节点为tail 
            node.prev = pred;
            //设置当前节点为尾节点
            if (compareAndSetTail(pred, node)) {
            // 前一节点的next为当前节点
                pred.next = node;
                return node;
            }
        }
        //创建队列
        enq(node);
        return node;
    }

创建队列,经典的for循环创建双向链表

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
            	//节点为空 则new一个节点 设置头节点
                if (compareAndSetHead(new Node()))
                //把这个节点的头节点赋值给尾节点
                    tail = head;
            } else {
            // 如果尾节点存在 就将该节点的前节点指向tail
                node.prev = t;
                //设置当前节点为tail
                if (compareAndSetTail(t, node)) {
                //前一个节点的next指向当前节点,入队操作就完成了
                    t.next = node;
                    return t;
                }
            }
        }
    }

设置waitStatus状态及获取waitStatus状态,waitStatus为-1时可以唤醒后续节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 状态是-1 就可以唤醒后续节点
             *
             */
            return true;
        if (ws > 0) {
            /*
             * 前置任务已取消,删掉节点
             *
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * cas设置waitstatus状态,设置为-1
             * 
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

阻塞 调用LockSupport.park

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

释放锁的逻辑

	public void release() {
        sync.releaseShared(1);
    }
	public final boolean releaseShared(int arg) {
		//cas成功则进行释放共享锁
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

state状态+1操作,cas成功,返回true

		protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //头节点不为空并且不是尾节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //waitstatus为-1
                if (ws == Node.SIGNAL) {
                //将SIGNAL置为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;         
                    //唤醒   
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)                  
                break;
        }
    }

唤醒操作

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //ws小于0就将其设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        //当前节点的下一个节点为空或者ws大于0
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //s不为空 则进行唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

唤醒下一个线程之后,要把上一个节点移除队列

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //下一个线程进来,如果前置节点是头节点,则将前置节点出队
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //cas获取资源成功
                    if (r >= 0) {
                    	//出队操作
                        setHeadAndPropagate(node, r);
                        //将p.next移除
                        p.next = null;
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

出队操作

private void setHeadAndPropagate(Node node, int propagate) {
		//设置当前节点为head节点,前一节点的head属性被删除
        Node h = head; 
        setHead(node);
        //如果是传播属性
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
           //并且是共享模式,可以持续唤醒下一个,
           //只要资源数充足 就可以一直往下唤醒,提高并发量
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
 private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

至此,线程的阻塞唤醒核心逻辑就这么多,共享锁与独占锁的区别是可以唤醒后续的线程,如果资源数充足的话,可以一直往下唤醒,提高了并发量。

到此这篇关于Java AQS信号量Semaphore的使用的文章就介绍到这了,更多相关Java信号量Semaphore内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • IntelliJ IDEA失焦自动重启服务的解决方法

    IntelliJ IDEA失焦自动重启服务的解决方法

    在使用 IntelliJ IDEA运行 SpringBoot 项目时,你可能会遇到一个令人困扰的问题,一旦你的鼠标指针离开当前IDE窗口,点击其他位置时, IDE 窗口会失去焦点,你的 SpringBoot 服务就会自动重启,所以本文给大家介绍了IntelliJ IDEA失焦自动重启服务的解决方法
    2023-10-10
  • 解析SpringBoot自定义参数校验注解

    解析SpringBoot自定义参数校验注解

    这篇文章主要介绍了SpringBoot自定义参数校验注解,引入依赖,spring validation是在hibernate-validator上做了一层封装,文中提到了定义参数校验注解与处理器的示例代码,感兴趣的朋友跟随小编一起看看吧
    2023-10-10
  • SpringBoot项目启动时预加载操作方法

    SpringBoot项目启动时预加载操作方法

    Spring Boot是一种流行的Java开发框架,它提供了许多方便的功能来简化应用程序的开发和部署,这篇文章主要介绍了SpringBoot项目启动时预加载,需要的朋友可以参考下
    2023-09-09
  • java字符串压缩解压示例

    java字符串压缩解压示例

    这篇文章主要介绍了java字符串压缩解压示例,先压缩,再加密,再压缩,数据越大,压缩比例越高,需要的朋友可以参考下
    2014-03-03
  • JavaCV获取视频文件时长的方法

    JavaCV获取视频文件时长的方法

    这篇文章主要为大家详细介绍了JavaCV获取视频文件时长的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-07-07
  • java接口自动化测试框架及断言详解

    java接口自动化测试框架及断言详解

    这篇文章主要介绍了java接口自动化测试框架及断言详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • java微信公众号企业付款开发

    java微信公众号企业付款开发

    这篇文章主要为大家详细介绍了java微信公众号企业付款开发,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-09-09
  • Spring的组合注解和元注解原理与用法详解

    Spring的组合注解和元注解原理与用法详解

    这篇文章主要介绍了Spring的组合注解和元注解原理与用法,结合实例形式详细分析了spring组合注解和元注解相关功能、原理、配置及使用方法,需要的朋友可以参考下
    2019-11-11
  • Java开发中synchronized的定义及用法详解

    Java开发中synchronized的定义及用法详解

    这篇文章主要介绍了Java开发中synchronized的定义及用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • spring boot中使用RabbitMQ routing路由详解

    spring boot中使用RabbitMQ routing路由详解

    本篇文章主要介绍了spring boot中使用RabbitMQ routing路由详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-03-03

最新评论