Java并发编程之锁、并发容器、阻塞队列与异步编程实战代码

 更新时间:2026年06月02日 08:54:08   作者:fengxin_rou  
这篇文章主要介绍了Java并发编程之锁、并发容器、阻塞队列与异步编程的相关资料,分别是ReentrantLock的灵活锁机制、ConcurrentHashMap的高性能并发设计、BlockingQueue的生产者、消费者模式及CompletableFuture的异步编程能力,需要的朋友可以参考下

前言

在多核 CPU 成为主流的今天,并发编程已成为后端开发者必备的核心技能。本文系统梳理 Java 并发编程中的锁机制、并发容器、阻塞队列与异步编程四大核心模块,深入解析底层实现原理,结合实战代码帮助读者掌握高并发场景下的技术选型与避坑策略。

一、显式锁 ReentrantLock 深度解析

1.1 ReentrantLock 与 synchronized 核心区别

synchronized是 Java 原生的隐式锁,基于 JVM 实现,自动完成锁的获取与释放;而ReentrantLock是 JDK 提供的显式锁,基于 AQS(AbstractQueuedSynchronizer)框架实现,需要手动控制加锁与释放。

对比维度

synchronized

ReentrantLock

实现层面

JVM 层面

JDK API 层面

锁释放

自动释放

必须手动 unlock ()

可中断

不支持

支持 lockInterruptibly ()

可超时

不支持

支持 tryLock (timeout)

公平锁

仅非公平

支持公平 / 非公平

条件变量

仅 1 个等待队列

支持多个 Condition

synchronized在 JDK6 后进行了大量优化,包括偏向锁、轻量级锁、自旋锁等,性能已大幅提升。但在需要灵活控制锁行为的场景下,ReentrantLock 仍是首选。

1.2 可重入性实现原理

可重入锁指同一个线程可以多次获取同一把锁而不会产生死锁。ReentrantLock 通过 AQS 的 state 状态变量和 exclusiveOwnerThread 实现可重入。

当线程首次获取锁时,state 从 0 变为 1,并记录当前持有线程;当同一线程再次获取锁时,state 进行累加;释放锁时 state 递减,直到 state 归 0 才真正释放锁。

// ReentrantLock.NonfairSync.tryAcquire()核心逻辑
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 锁空闲,CAS尝试获取
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 同一线程重入,state累加
        int nextc = c + acquires;
        if (nextc < 0) throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

可重入性避免了同一线程反复获取锁导致的死锁,是递归调用场景的必备特性。

1.3 可中断锁实现原理

synchronized获取锁时不可中断,线程会一直阻塞直到获取锁;ReentrantLock 通过lockInterruptibly()方法支持中断响应。

当调用lockInterruptibly()时,如果线程在等待队列中被中断,会直接抛出 InterruptedException,不再继续等待锁。这为取消阻塞操作提供了可能,是实现超时获取锁的基础。

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

// AQS.acquireInterruptibly()
public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg); // 可中断的等待逻辑
}

1.4 可超时获取锁实现原理

tryLock(long timeout, TimeUnit unit)方法支持在指定时间内尝试获取锁,超时则返回 false。其实现基于 LockSupport.parkNanos () 进行限时等待,在等待过程中同时检测中断和超时。

// 超时获取锁使用示例
public boolean tryLockWithTimeout(ReentrantLock lock, long timeoutMs) {
    try {
        return lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return false;
    }
}

可超时特性在分布式系统中尤为重要,可以有效避免因网络波动导致的线程永久阻塞。

1.5 公平锁与非公平锁实现原理

ReentrantLock 默认采用非公平锁,可通过构造函数参数指定为公平锁:

ReentrantLock fairLock = new ReentrantLock(true);    // 公平锁
ReentrantLock unfairLock = new ReentrantLock(false); // 非公平锁(默认)

公平锁严格按照线程请求顺序分配锁,新线程必须加入等待队列尾部;非公平锁允许新线程在锁释放时直接尝试抢占,不考虑队列中等待的线程。

公平锁的tryAcquire()会额外检查hasQueuedPredecessors(),确保只有等待队列中没有前驱节点时才尝试获取锁。

非公平锁性能通常优于公平锁(吞吐量高约 30%),但可能产生线程饥饿。公平锁保证了顺序性,但增加了上下文切换开销。

1.6 实战代码示例

/**
 * ReentrantLock完整使用示例
 */
public class ReentrantLockDemo {
    private final ReentrantLock lock = new ReentrantLock(true);
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final Queue<Integer> queue = new LinkedList<>();
    private static final int CAPACITY = 10;

    public void produce(int data) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == CAPACITY) {
                notFull.await(); // 队列满,生产者等待
            }
            queue.offer(data);
            notEmpty.signal(); // 唤醒消费者
        } finally {
            lock.unlock(); // 必须在finally中释放锁
        }
    }

    public int consume() throws InterruptedException {
        lock.lockInterruptibly(); // 可中断方式获取锁
        try {
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            int data = queue.poll();
            notFull.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }
}

注意事项

  • unlock()必须放在 finally 块中,防止异常导致锁泄漏

  • 使用多个 Condition 可以精确控制唤醒条件,这是 synchronized 不具备的能力

二、ConcurrentHashMap 底层原理与演进

2.1 JDK7 分段锁设计原理

JDK7 的 ConcurrentHashMap 采用 \\ 分段锁(Segment)\\ 设计,整个哈希表被拆分为 16 个 Segment 数组,每个 Segment 独立加锁。

核心数据结构:

ConcurrentHashMap
└── Segment[] (默认16个)
    └── HashEntry[] (每个Segment独立的哈希表)
        └── HashEntry链表

Segment 继承自 ReentrantLock,每次 put 操作只锁定对应的 Segment,其他 Segment 的读写不受影响。理论上最大并发度等于 Segment 数量(默认 16)。

但分段锁存在明显缺陷:

  1. 并发度固定,无法随数组扩容动态提升

  2. 空间浪费,每个 Segment 都需要独立的锁和数据结构

  3. 跨段操作(如 size ())需要加锁所有 Segment,性能较差

2.2 JDK8 架构演进:CAS + synchronized

JDK8 彻底放弃分段锁,采用数组 + 链表 + 红黑树结构,与 HashMap 保持一致。锁的粒度从 Segment 级别细化到每个哈希桶(Node)级别。

核心改进:

  • 使用synchronized替代 ReentrantLock,JVM 对 synchronized 的优化更成熟

  • 无锁竞争时使用 CAS 进行无锁化更新

  • 仅在哈希桶发生哈希冲突时才对首节点加锁

  • 链表长度超过 8 时自动转为红黑树,解决哈希碰撞攻击

// JDK8 ConcurrentHashMap.putVal()核心逻辑
final V putVal(K key, V value, boolean onlyIfAbsent) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        int i = (n - 1) & hash;
        Node<K,V> f = tabAt(tab, i);
        
        if (f == null) {
            // 桶为空,CAS直接插入
            if (casTabAt(tab, i, null, new Node(hash, key, value)))
                break;
        } else {
            synchronized (f) { // 仅锁定桶的首节点
                if (tabAt(tab, i) == f) { // 双重检查
                    // 链表或红黑树插入逻辑
                }
            }
        }
    }
}

2.3 放弃分段锁的深层原因

JDK8 放弃分段锁主要基于以下考量:

1. 锁粒度更细

  • 分段锁最小粒度是 Segment,JDK8 最小粒度是哈希桶

  • 并发度随数组容量动态提升,理论最大并发度等于数组长度

2. synchronized 性能优化

  • JDK6 后 synchronized 引入偏向锁、轻量级锁、自适应自旋

  • 无锁竞争时偏向锁性能优于 ReentrantLock

  • JVM 可以对 synchronized 进行逃逸分析等深度优化

3. 减少内存开销

  • 消除 Segment 对象的内存占用

  • 数据结构与 HashMap 统一,代码复用性更高

4. 红黑树引入

  • 解决哈希碰撞导致的链表过长问题

  • 极端情况下时间复杂度从 O (n) 降至 O (logn)

2.4 扩容机制深度解析

ConcurrentHashMap 的扩容是并发协作式的,支持多线程共同参与扩容,这是其核心亮点。

扩容触发条件

  • 元素数量达到阈值(容量 × 加载因子)

  • 单桶链表长度超过 8 但数组容量小于 64

扩容核心流程

  1. 扩容准备:创建 nextTable,容量为原数组 2 倍

  2. 扩容标记:sizeCtl 设为负数,标记正在扩容

  3. 任务分配:每个线程负责连续的 16 个桶的迁移

  4. 并发迁移

    1. 处理完的桶设置为 ForwardingNode

    2. 遇到 ForwardingNode 自动跳过或协助扩容

  5. 扩容完成:table 指向 nextTable,重置 sizeCtl

// 扩容时的ForwardingNode标记
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

并发扩容的精妙之处

  • 读操作遇到 ForwardingNode 会转发到新数组,不阻塞

  • 写操作遇到 ForwardingNode 会主动协助扩容,实现 "多线程帮忙"

  • 迁移过程采用 "复制 + 清除" 方式,保证数据一致性

2.5 完整工作流程梳理

以 put 操作为例,完整执行流程:

  1. 计算 key 的哈希值,定位哈希桶位置

  2. 如果数组未初始化,CAS 触发初始化

  3. 如果目标桶为空,CAS 直接插入新节点

  4. 如果遇到 ForwardingNode,协助扩容后重试

  5. 否则,对桶首节点加 synchronized 锁

  6. 遍历链表或红黑树,key 存在则更新,不存在则追加

  7. 链表长度超过 8,触发树化或扩容

  8. 释放锁,CAS 更新元素计数,检查是否需要扩容

整个过程中,锁的持有时间极短,大部分操作都是无锁的 CAS 操作,这是 ConcurrentHashMap 高性能的根本原因。

三、BlockingQueue 阻塞队列实战

3.1 阻塞队列核心作用

BlockingQueue是 Java 并发包中最重要的数据结构之一,专门解决生产者 - 消费者模式的线程协作问题。

核心特性:

  • 队列满时,生产者线程自动阻塞,直到有消费者消费

  • 队列空时,消费者线程自动阻塞,直到有生产者生产

  • 所有操作都是线程安全的,内部通过锁和条件变量实现

四种核心操作模式:

操作方式

抛出异常

返回特殊值

阻塞等待

超时等待

插入

add(e)

offer(e)

put(e)

offer(e, time, unit)

移除

remove()

poll()

take()

poll(time, unit)

检查

element()

peek()

-

-

阻塞队列是线程池的核心组件,也是解耦生产消费速率不匹配的标准解决方案。

3.2 ArrayBlockingQueue 详解

ArrayBlockingQueue是基于数组实现的有界阻塞队列,创建时必须指定容量。

核心特点:

  • 有界队列,容量固定不可扩容

  • 单锁双 Condition 机制(notEmpty + notFull)

  • 支持公平 / 非公平模式

  • 读写共用同一把锁,并发度较低

// ArrayBlockingQueue核心结构
public class ArrayBlockingQueue<E> {
    final Object[] items;      // 存储数组
    int takeIndex;             // 取元素位置
    int putIndex;              // 放元素位置
    int count;                 // 元素数量
    final ReentrantLock lock;  // 单锁
    private final Condition notEmpty;
    private final Condition notFull;
}

适用场景:队列大小可预估、对内存占用敏感的场景。

3.3 LinkedBlockingQueue 详解

LinkedBlockingQueue是基于链表实现的阻塞队列,默认容量为 Integer.MAX_VALUE。

核心特点:

  • 双锁分离设计(takeLock + putLock),读写互不阻塞

  • 默认无界(实际最大 2^31-1),也可指定容量

  • 吞吐量高于 ArrayBlockingQueue

  • 内存占用相对较高

// LinkedBlockingQueue双锁设计
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

注意:无界模式下如果生产速度远大于消费速度,可能导致 OOM。

适用场景:生产消费速率差异较大、追求高吞吐量的场景。

3.4 SynchronousQueue 详解

SynchronousQueue是一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作。

核心特点:

  • 内部没有缓冲区,队列容量始终为 0

  • 支持公平 / 非公平模式

  • 直接传递,不存储元素

  • 是 Executors.newCachedThreadPool () 的默认队列

// SynchronousQueue典型用法
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
// 生产者线程
new Thread(() -> {
    queue.put(1); // 会阻塞直到有消费者take
}).start();
// 消费者线程
new Thread(() -> {
    queue.take(); // 会阻塞直到有生产者put
}).start();

适用场景:任务必须立即处理、不允许排队的场景,如 CachedThreadPool。

3.5 DelayQueue 原理与延时任务应用

DelayQueue是支持延时获取元素的无界阻塞队列,元素必须实现 Delayed 接口。

核心原理:

  • 内部基于 PriorityQueue 实现,按过期时间排序

  • 只有当元素的延迟时间到期后才能被取出

  • 队首元素永远是最早过期的元素

// 延时任务元素定义
public class DelayedTask implements Delayed {
    private final long executeTime;
    private final Runnable task;

    public DelayedTask(long delayMs, Runnable task) {
        this.executeTime = System.currentTimeMillis() + delayMs;
        this.task = task;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.executeTime, ((DelayedTask)other).executeTime);
    }
}

典型应用场景

  1. 订单超时自动取消

  2. 会话超时清理

  3. 定时任务调度

  4. 缓存过期失效

// 延时队列使用示例
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
delayQueue.put(new DelayedTask(30000, () -> System.out.println("30秒后执行")));
delayQueue.put(new DelayedTask(60000, () -> System.out.println("60秒后执行")));

// 消费者线程
while (true) {
    DelayedTask task = delayQueue.take(); // 阻塞直到任务到期
    task.run();
}

3.6 四者对比与选型建议

特性

ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue

DelayQueue

容量

有界

可选有界 / 无界

0

无界

数据结构

数组

链表

直接传递

优先级队列

锁机制

单锁

双锁分离

CAS

单锁

公平性

支持

不支持

支持

不支持

吞吐量

中等

极高

典型应用

固定大小线程池

固定大小线程池

缓存线程池

延时任务

选型建议

  • 需要控制队列大小 → ArrayBlockingQueue

  • 追求高吞吐量 → LinkedBlockingQueue

  • 任务必须立即执行 → SynchronousQueue

  • 需要延时执行 → DelayQueue

四、CompletableFuture 异步编程

4.1 异步编程背景

传统 Future 接口的局限性:

  • 无法手动完成

  • 不支持链式调用

  • 不支持异常处理

  • 无法组合多个 Future

CompletableFuture在 Java8 中引入,实现了 CompletionStage 接口,提供了丰富的异步编程能力,支持函数式编程风格。

4.2 核心 API 分类详解

1. 创建类 API

// 使用默认线程池
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("异步任务"));
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "返回结果");

// 使用自定义线程池(推荐)
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "自定义线程池", executor);

// 手动完成
CompletableFuture<String> future4 = new CompletableFuture<>();
future4.complete("手动设置结果");
future4.completeExceptionally(new RuntimeException("手动异常"));

注意:默认使用 ForkJoinPool.commonPool (),所有 CompletableFuture 共享,CPU 密集型任务建议使用自定义线程池。

2. 链式转换类 API

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")           // 同步转换
    .thenApplyAsync(s -> s.toUpperCase())   // 异步转换
    .thenAccept(s -> System.out.println(s)) // 消费结果
    .thenRun(() -> System.out.println("执行完成")); // 仅执行,不消费结果
  • thenApply:输入 T,输出 U,类似 map

  • thenAccept:输入 T,无输出,消费型

  • thenRun:无输入无输出,仅执行动作

3. 组合类 API

// AND组合:两个都完成才执行
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World");

f1.thenCombine(f2, (s1, s2) -> s1 + " " + s2)
  .thenAccept(System.out::println); // 输出 Hello World

// OR组合:任意一个完成就执行
CompletableFuture<String> fast = f1.applyToEither(f2, s -> s + " faster");

4. 异常处理 API

CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("出错了");
    return "正常";
})
.exceptionally(ex -> {
    System.out.println("捕获异常: " + ex.getMessage());
    return "默认值"; // 异常时返回默认值
})
.handle((result, ex) -> {
    if (ex != null) {
        return "处理异常";
    }
    return result;
})
.whenComplete((result, ex) -> {
    // 无论成功失败都会执行,不改变结果
    System.out.println("执行完成");
});

5. 多任务组合

// 所有任务都完成
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);

// 任意一个任务完成
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);

4.3 实战示例:并行调用优化

/**
 * 并行调用多个服务,聚合结果
 */
public class CompletableFutureDemo {
    public UserInfo getUserInfo(Long userId) {
        // 并行调用三个接口
        CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getUser(userId));
        CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> orderService.getOrders(userId));
        CompletableFuture<List<Address>> addrFuture = CompletableFuture.supplyAsync(() -> addressService.getAddresses(userId));

        // 等待所有完成,聚合结果
        return CompletableFuture.allOf(userFuture, orderFuture, addrFuture)
            .thenApply(v -> {
                UserInfo info = new UserInfo();
                info.setUser(userFuture.join());
                info.setOrders(orderFuture.join());
                info.setAddresses(addrFuture.join());
                return info;
            })
            .exceptionally(ex -> {
                log.error("获取用户信息失败", ex);
                return null;
            })
            .join();
    }
}

通过 CompletableFuture,原本串行 3 秒的调用可以优化到 1 秒完成,这是微服务架构下的常用优化手段。

结语

本文系统解析了 Java 并发编程的四大核心模块:ReentrantLock 的灵活锁机制、ConcurrentHashMap 的高性能并发设计、BlockingQueue 的生产者 - 消费者模式、CompletableFuture 的异步编程能力。

这些技术是构建高并发系统的基石。建议结合实际项目深入实践,重点关注各组件的适用场景与性能特性,避免在生产环境中出现并发安全问题。进阶可深入研究 AQS 框架、JMM 内存模型与无锁算法。

到此这篇关于Java并发编程之锁、并发容器、阻塞队列与异步编程实战代码的文章就介绍到这了,更多相关Java并发编程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Springboot整合log4j2日志全解总结

    Springboot整合log4j2日志全解总结

    这篇文章主要介绍了Springboot整合log4j2日志全解总结,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-12-12
  • 解决springmvc使用@PathVariable路径匹配问题

    解决springmvc使用@PathVariable路径匹配问题

    这篇文章主要介绍了解决springmvc使用@PathVariable路径匹配问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • Java中Agent的使用详解

    Java中Agent的使用详解

    Java Agent是一种特殊类型的软件组件,它允许在Java虚拟机(JVM)运行时修改应用程序的字节码,下面我们就来一起深入了解一下Agent的具体使用吧
    2023-12-12
  • Java EasyExcel利用填充模版动态生成多个sheet页

    Java EasyExcel利用填充模版动态生成多个sheet页

    这篇文章主要为大家详细介绍了Java EasyExcel如何利用填充模版动态生成多个sheet页,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-12-12
  • Java中Json字符串和Java对象的互转

    Java中Json字符串和Java对象的互转

    本文介绍了JSON作为一种轻量级的数据交换格式,广泛应用于前后端数据传输,本文就来详细的介绍一下Java中Json字符串和Java对象的互转,感兴趣的可以了解一下
    2025-12-12
  • Spring Boot 中启用定时任务的操作方法

    Spring Boot 中启用定时任务的操作方法

    文章主要介绍了如何在Spring Boot中启用定时任务,包括使用@EnableScheduling注解、配置项控制定时任务是否开启以及如何关闭cron定时任务,感兴趣的朋友跟随小编一起看看吧
    2024-11-11
  • Java中时间戳的获取和转换的示例分析

    Java中时间戳的获取和转换的示例分析

    这篇文章主要介绍了Java中时间戳的获取和转换的示例分析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • Java中高效获取IP地域信息方案全解析

    Java中高效获取IP地域信息方案全解析

    在当今互联网应用中,IP地域信息分析已成为许多业务场景的核心需求,本文将全面解析Java中获取IP地域信息的各种方案,大家可以根据需要进行选择
    2025-08-08
  • java程序打包成exe与jar的图文教程

    java程序打包成exe与jar的图文教程

    这篇文章主要介绍了java程序打包成exe与jar的图文教程,有需要的朋友可以参考一下
    2014-01-01
  • SpringBoot读取配置文件的四种方式

    SpringBoot读取配置文件的四种方式

    在 Spring Boot 中,application.yml 文件用于配置应用程序的属性,Spring Boot 默认会从 src/main/resources 目录下的 application.properties 或 application.yml 文件中读取配置,本文介绍了SpringBoot读取配置文件的四种方式,需要的朋友可以参考下
    2024-08-08

最新评论