Java并发编程之Semaphore详解

 更新时间:2023年11月20日 08:47:14   作者:西瓜游侠  
这篇文章主要介绍了Java并发编程之Semaphore详解,Semaphore信号量可以用来控制同时访问特定资源的线程数量,常用于限流场景,Semaphore接收一个int整型值,表示 许可证数量,需要的朋友可以参考下

1 概念

Semaphore(信号量,发音:三马佛儿),可以用来控制同时访问特定资源的线程数量,常用于限流场景。

Semaphore接收一个int整型值,表示 许可证数量。

线程通过调用acquire()获取许可证,执行完成之后通过调用release()归还许可证。只有获取到许可证的线程才能运行,获取不到许可证的线程将会阻塞。

Semaphore支持公平锁和非公平锁。

2 方法

Semaphore提供了一些方法,如下:

方法说明
acquire()获取一个许可证,在获取到许可证、或者被其他线程调用中断之前线程一直处于阻塞状态。
acquire(int permits)一次性获取多个许可证,在获取到多个许可证、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
acquireUninterruptibly()获取一个许可证,在获取到许可证之前线程一直处于阻塞状态(忽略中断)。
tryAcquire()尝试获取许可证,返回获取许可证成功或失败,不阻塞线程。
tryAcquire(long timeout, TimeUnit unit)尝试获取许可证,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
release()释放一个许可证,唤醒等待获取许可证的阻塞线程。
release(int permits)一次性释放多个许可证。
drainPermits()清空许可证,把可用许可证数置为0,返回清空许可证的数量。

3 例子

public class SemaphoreTest {

    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(5);
        System.out.println("初始总许可数 5");
        WorkerThread workerThread1 = new WorkerThread("worker-thread-1", semaphore);
        WorkerThread workerThread2 = new WorkerThread("worker-thread-2", semaphore);
        workerThread1.start();
        Thread.sleep(20);
        workerThread2.start();
    }
}

/**
 * 工作线程
 */
class WorkerThread extends Thread {

    private String name;
    private Semaphore semaphore;

    public WorkerThread(String name, Semaphore semaphore) {
        this. name = name;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            System.out.println(this.name + " 尝试获取许可.");
            // 获取许可证
            semaphore.acquire();
            System.out.println(this.name + " 获取许可成功,当前许可还剩 " + semaphore.availablePermits());
            Thread.sleep(3000);
            System.out.println(this.name + " 尝试释放许可.");
            // 释放许可证
            semaphore.release();
            System.out.println(this.name + " 释放许可成功,当前许可还剩 " + semaphore.availablePermits());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

初始总许可数 5
worker-thread-1 尝试获取许可.
worker-thread-2 尝试获取许可.
worker-thread-1 获取许可成功,当前许可还剩 4
worker-thread-2 获取许可成功,当前许可还剩 3
worker-thread-1 尝试释放许可.
worker-thread-1 释放许可成功,当前许可还剩 4
worker-thread-2 尝试释放许可.
worker-thread-2 释放许可成功,当前许可还剩 5
Process finished with exit code 0

4 源码解析

4.1 构造函数

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Semaphore有两个构造函数:

  • 第一个构造函数接收一个int型参数permits,表示初始化许可证的数量,并且默认使用非公平锁。
  • 第二个构造函数接收两个参数,第二个boolean型参数fair可以用来选择是使用公平锁还是非公平锁。

4.2 Sync、FairSync、NonfairSync

公平锁FairSync和非公平锁NonfairSync都继承了抽象类Sync。

Sync源码:

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        // 设置同步状态的值为初始化的许可证数量
        Sync(int permits) {
            setState(permits);
        }

        // 获取同步状态的值,也就是剩余可使用的许可证的数量
        final int getPermits() {
            return getState();
        }

        // 非公平锁尝试获取许可证
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        // 尝试释放许可证
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        // 尝试减少许可证的数量
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        // 将许可证数量清0
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

Sync继承AQS抽象类,构造函数调用的是AQS的setState(int newState)方法,将同步状态变量state的值设置为指定的初始化许可证的数量。

公平锁源码:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        // 公平锁获取许可证
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

非公平锁源码:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        // 非公平锁获取许可证
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

从源码中看出,公平锁和非公平锁在获取许可证的时候,逻辑是不一样的。

4.3 acquire获取许可证

调用Semaphore的acquire()函数可以获取许可证,源码如下:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

实际上,调用的是Sync对象的acquireSharedInterruptibly(int arg)方法,而Sync继承了AQS,并且没有重写这个方法,因此调用的是AQS的acquireSharedInterruptibly(int arg)方法,源码如下:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 如果中断,抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取同步变量
        if (tryAcquireShared(arg) < 0)
            // 如果获取失败,则将当前线程加入同步队列中去排队
            doAcquireSharedInterruptibly(arg);
    }

可以看出,调用tryAcquireShared(arg)来尝试获取同步变量,在这里也就是获取许可证。这个方法在AQS和Sync中都没有实现,但是被FairSync和NonfairSync分别实现了。 如果获取同步变量失败,则将当前线程放入同步队列中排队。

4.3.1 公平锁获取许可证

如果是FairSync公平锁,则实现如下:

    protected int tryAcquireShared(int acquires) {
        // 自旋
        for (;;) {
            // 判断有没有等待获取同步状态的线程,有则直接返回-1
            if (hasQueuedPredecessors())
                return -1;
            // 没有线程在等待获取同步状态,那么当前线程去获取同步状态
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                // 通过CAS更新同步状态的值
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

公平锁获取许可证的原理大致如下:

  1. 首先查看有没有线程在同步队列中排队等待获取许可证,如果有排队的,那么直接返回-1,这样将会执行doAcquireSharedInterruptibly(arg),将当前线程加入同步队列中去排队;
  2. 如果没有线程在排队,那么当前线程获取同步状态的值available,减掉想要获取的资源值acquires,也就是想要获取的许可证的数量,得到剩余的资源量remaining。如果remaining < 0,说明资源不够,本次获取失败,返回remaining值(这个时候返回的是< 0的值),外层代码会调用doAcquireSharedInterruptibly(arg)将当前线程排队;如果remaining > 0,说明资源是够用的,那么直接通过CAS原理更新同步状态的值。

4.3.2 非公平锁获取许可证

如果是NonfairSync非公平锁,则实现如下:

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }

调用的是父类Sync的nonfairTryAcquireShared(int acquires)方法:

    final int nonfairTryAcquireShared(int acquires) {
        // 自旋
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

非公平锁相对来说去掉了查看同步队列的逻辑。也就是说,在非公平锁的实现中,当前线程获取许可证的时候,不用去查看同步队列是否有线程在等待获取同步状态,而是直接去尝试获取许可证(改变同步状态的值)。

当然,如果remaining < 0,说明当前线程没能获取到期望数量的许可证,获取失败,返回< 0的值,在外部逻辑中,将会调用doAcquireSharedInterruptibly(arg)使当前线程进入同步队列中进行等待;如果remaining > 0,则通过CAS原理更新同步状态的值。

4.4 release释放许可证

通过调用Semaphore的release()方法可以释放许可证。

    public void release() {
        sync.releaseShared(1);
    }

实际上,调用的是Sync的releaseShared(int arg),而Sync并没有重写这个方法,因此调用的是AQS的releaseShared(int arg)方法:

    public final boolean releaseShared(int arg) {
        // 尝试释放同步变量
        if (tryReleaseShared(arg)) {
            // 如果成功,则唤醒后继节点
            doReleaseShared();
            return true;
        }
        return false;
    }

通过tryReleaseShared(arg)尝试释放同步变量,如果成功,则通过doReleaseShared()唤醒后继节点。

AQS并没有实现tryReleaseShared(arg)方法,而是被Semaphore的Sync实现了:

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

这里通过CAS改变同步状态的值,释放了许可证。

下面来看看doReleaseShared()是如何唤醒后继节点的:

    private void doReleaseShared() {
        for (;;) {
            // 首先获取头节点
            Node h = head;
            // 如果头节点存在
            if (h != null && h != tail) {
                // 获取头节点的状态
                int ws = h.waitStatus;
                // 如果头节点的状态是Node.SIGNAL,说明头节点的后继节点正等待被唤醒
                if (ws == Node.SIGNAL) {
                    // 将头节点的状态设置为初始状态
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒后继节点
                    unparkSuccessor(h);
                }
                // 如果头节点的状态已经是0了,则设置头节点状态为Node.PROPAGATE
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

尝试唤醒后继节点的逻辑比较简单:

  1. 首先获取同步队列中的头节点;
  2. 如果头节点存在,并且不是尾节点,接着获取头节点的状态;
  3. 如果头节点的状态是Node.SIGNAL,说明他的后继节点正等待着被他唤醒。这个时候通过CAS原理将头节点的状态置为0,如果成功了,则通过调用unparkSuccessor(h)唤醒后继节点,最后实际上调用的是LockSupport.unpark(Thread thread)方法唤醒线程的。

到此这篇关于Java并发编程之Semaphore详解的文章就介绍到这了,更多相关Semaphore详细解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 如何用java给文件加密的简单实现

    如何用java给文件加密的简单实现

    文件加密,简单来说就是把文件读取出来,把读取出来的字节码数组进行遍历,把每一个码值和一个秘钥(随便一个数)进行异或运算,将运算后的结果全部写入到文件里,这篇文章主要介绍了如何用java给文件加密的简单实现,需要的朋友可以参考下
    2023-12-12
  • JavaWeb文件上传下载实例讲解(酷炫的文件上传技术)

    JavaWeb文件上传下载实例讲解(酷炫的文件上传技术)

    在Web应用系统开发中,文件上传功能是非常常用的功能,今天来主要讲讲JavaWeb中的文件上传功能的相关技术实现,本文给大家介绍的非常详细,具有参考借鉴价值,感兴趣的朋友一起看看吧
    2016-11-11
  • 关于ZooKeeper的会话机制Session解读

    关于ZooKeeper的会话机制Session解读

    这篇文章主要介绍了关于ZooKeeper的会话机制Session解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • Mybatis实现动态SQL编写详细代码示例

    Mybatis实现动态SQL编写详细代码示例

    这篇文章主要为大家详细介绍了Mybatis中动态SQL的编写使用,动态SQL技术是一种根据特定条件动态拼装SQL语句的功能,它存在的意义是为了解决拼接SQL语句字符串时的痛点问题,感兴趣想要详细了解可以参考下文
    2023-05-05
  • Mybatis实现自动生成增删改查代码

    Mybatis实现自动生成增删改查代码

    这篇文章主要为大家详细介绍了Mybatis如何实现自动生成增删改查代码的功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-01-01
  • 轻松掌握Java策略模式

    轻松掌握Java策略模式

    这篇文章主要帮助大家轻松掌握Java策略模式,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-09-09
  • Springboot启动执行特定代码的方式汇总

    Springboot启动执行特定代码的方式汇总

    这篇文章主要介绍了Springboot启动执行特定代码的几种方式,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-12-12
  • nacos配置在代码中引用的方法讲解

    nacos配置在代码中引用的方法讲解

    这篇文章主要介绍了nacos配置在代码中如何引用,如果主配置中配置的内容和拓展配置的内容重复则按主配置的配置 ,如果拓展配置中的内容和另一个拓展配置中的内容重复,则按下标大的配置作为最终的配置,对nacos配置代码引用相关知识感兴趣朋友一起看看吧
    2022-12-12
  • Java多线程之等待队列DelayQueue详解

    Java多线程之等待队列DelayQueue详解

    这篇文章主要介绍了Java多线程之等待队列DelayQueue详解,    DelayQueue被称作"等待队列"或"JDK延迟队列",存放着实现了Delayed接口的对象,对象需要设置到期时间,当且仅当对象到期,才能够从队列中被取走(并非一定被取走),需要的朋友可以参考下
    2023-12-12
  • 浅析Java ReentrantLock锁的原理与使用

    浅析Java ReentrantLock锁的原理与使用

    这篇文章主要为大家详细介绍了Java中ReentrantLock锁的原理与使用,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以了解下
    2023-08-08

最新评论