Java并发编程实战之从线程池到CompletableFuture详解

 更新时间:2026年04月18日 14:30:45   作者:程序员鸭梨  
文章总结了Java并发编程的关键点,包括线程池的最佳实践和监控、CompletableFuture的异步编程、并发集合与工具类的使用、原子类与CAS、Fork/Join框架等内容,强调了理解happens-before规则和内存模型的重要性

一、线程池最佳实践

1.1 自定义线程池

@Configuration
public class ThreadPoolConfig {

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // 核心线程数
        executor.setCorePoolSize(10);
        
        // 最大线程数
        executor.setMaxPoolSize(50);
        
        // 队列容量
        executor.setQueueCapacity(200);
        
        // 线程活跃时间
        executor.setKeepAliveSeconds(60);
        
        // 线程名前缀
        executor.setThreadNamePrefix("task-");
        
        // 拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        
        // 优雅关闭
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        
        executor.initialize();
        return executor;
    }

    @Bean("ioExecutor")
    public Executor ioExecutor() {
        // IO 密集型任务使用更多线程
        return Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 2,
            new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build()
        );
    }

    @Bean("cpuExecutor")
    public Executor cpuExecutor() {
        // CPU 密集型任务使用核心数线程
        return Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors(),
            new ThreadFactoryBuilder().setNameFormat("cpu-pool-%d").build()
        );
    }
}

1.2 线程池监控

@Component
public class ThreadPoolMonitor {

    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;

    @Scheduled(fixedRate = 60000)
    public void monitor() {
        ThreadPoolExecutor executor = taskExecutor.getThreadPoolExecutor();
        
        log.info("Thread Pool Status - " +
            "Active: {}, " +
            "Pool Size: {}, " +
            "Core Pool Size: {}, " +
            "Max Pool Size: {}, " +
            "Task Count: {}, " +
            "Completed Task Count: {}, " +
            "Queue Size: {}",
            executor.getActiveCount(),
            executor.getPoolSize(),
            executor.getCorePoolSize(),
            executor.getMaximumPoolSize(),
            executor.getTaskCount(),
            executor.getCompletedTaskCount(),
            executor.getQueue().size()
        );
    }
}

二、CompletableFuture 异步编程

2.1 基础用法

@Service
public class AsyncOrderService {

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private NotificationService notificationService;

    // 异步创建订单
    public CompletableFuture<Order> createOrderAsync(OrderRequest request) {
        return CompletableFuture.supplyAsync(() -> validateRequest(request))
            .thenApplyAsync(this::reserveInventory)
            .thenApplyAsync(this::processPayment)
            .thenApplyAsync(this::saveOrder)
            .whenComplete((order, ex) -> {
                if (ex != null) {
                    log.error("Order creation failed", ex);
                    // 补偿操作
                    compensate(order);
                } else {
                    notificationService.sendOrderConfirmation(order);
                }
            });
    }

    // 并行处理
    public CompletableFuture<OrderDetails> getOrderDetailsAsync(String orderId) {
        CompletableFuture<Order> orderFuture = CompletableFuture
            .supplyAsync(() -> orderRepository.findById(orderId).orElseThrow());
        
        CompletableFuture<List<OrderItem>> itemsFuture = CompletableFuture
            .supplyAsync(() -> orderItemRepository.findByOrderId(orderId));
        
        CompletableFuture<ShippingInfo> shippingFuture = CompletableFuture
            .supplyAsync(() -> shippingService.getShippingInfo(orderId));
        
        // 等待所有完成
        return CompletableFuture.allOf(orderFuture, itemsFuture, shippingFuture)
            .thenApply(v -> {
                OrderDetails details = new OrderDetails();
                details.setOrder(orderFuture.join());
                details.setItems(itemsFuture.join());
                details.setShipping(shippingFuture.join());
                return details;
            });
    }

    // 带超时的异步操作
    public CompletableFuture<Order> createOrderWithTimeout(OrderRequest request) {
        return createOrderAsync(request)
            .orTimeout(30, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                log.error("Order creation timeout", ex);
                throw new OrderTimeoutException("Order creation timed out");
            });
    }
}

2.2 异常处理

@Service
public class AsyncExceptionHandlingService {

    public CompletableFuture<Result> processWithFallback(String input) {
        return CompletableFuture
            .supplyAsync(() -> riskyOperation(input))
            .exceptionally(ex -> {
                log.error("Primary operation failed", ex);
                return fallbackOperation(input);
            })
            .thenApply(this::transformResult);
    }

    // 组合异常处理
    public CompletableFuture<Result> processWithMultipleFallbacks(String input) {
        return CompletableFuture
            .supplyAsync(() -> primaryOperation(input))
            .handle((result, ex) -> {
                if (ex != null) {
                    log.warn("Primary failed, trying fallback 1", ex);
                    return fallbackOperation1(input);
                }
                return result;
            })
            .thenCompose(result -> {
                if (result == null) {
                    return CompletableFuture.supplyAsync(() -> fallbackOperation2(input));
                }
                return CompletableFuture.completedFuture(result);
            });
    }
}

三、并发集合与工具类

3.1 ConcurrentHashMap 使用

@Service
public class CacheService {

    private final ConcurrentHashMap<String, CacheEntry> cache = new ConcurrentHashMap<>();

    public Object get(String key, Supplier<Object> loader) {
        CacheEntry entry = cache.get(key);
        
        if (entry != null && !entry.isExpired()) {
            return entry.getValue();
        }
        
        // 使用 computeIfAbsent 保证原子性
        return cache.computeIfAbsent(key, k -> {
            Object value = loader.get();
            return new CacheEntry(value, System.currentTimeMillis() + 60000);
        }).getValue();
    }

    public void put(String key, Object value, long ttlMillis) {
        cache.put(key, new CacheEntry(value, System.currentTimeMillis() + ttlMillis));
    }

    // 批量操作
    public Map<String, Object> getAll(Set<String> keys) {
        return cache.entrySet().stream()
            .filter(e -> keys.contains(e.getKey()) && !e.getValue().isExpired())
            .collect(Collectors.toMap(
                Map.Entry::getKey,
                e -> e.getValue().getValue()
            ));
    }

    // 定期清理
    @Scheduled(fixedRate = 60000)
    public void cleanup() {
        cache.entrySet().removeIf(e -> e.getValue().isExpired());
    }
}

3.2 其他并发工具

@Component
public class ConcurrentToolsDemo {

    // CountDownLatch - 等待多个任务完成
    public void processWithLatch(List<Task> tasks) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(tasks.size());
        
        for (Task task : tasks) {
            CompletableFuture.runAsync(() -> {
                try {
                    task.execute();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await(5, TimeUnit.MINUTES);
    }

    // CyclicBarrier - 多阶段任务同步
    public void multiPhaseProcessing(List<Worker> workers) throws Exception {
        CyclicBarrier barrier = new CyclicBarrier(workers.size());
        
        for (Worker worker : workers) {
            new Thread(() -> {
                try {
                    // 阶段 1
                    worker.phase1();
                    barrier.await();
                    
                    // 阶段 2
                    worker.phase2();
                    barrier.await();
                    
                    // 阶段 3
                    worker.phase3();
                } catch (Exception e) {
                    log.error("Worker failed", e);
                }
            }).start();
        }
    }

    // Semaphore - 限流
    public void limitedConcurrentOperation(List<Task> tasks, int maxConcurrent) {
        Semaphore semaphore = new Semaphore(maxConcurrent);
        
        for (Task task : tasks) {
            CompletableFuture.runAsync(() -> {
                try {
                    semaphore.acquire();
                    task.execute();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release();
                }
            });
        }
    }

    // Exchanger - 线程间数据交换
    public void exchangeData() {
        Exchanger<Data> exchanger = new Exchanger<>();
        
        // 生产者
        new Thread(() -> {
            try {
                Data data = produceData();
                Data processed = exchanger.exchange(data);
                // 使用处理后的数据
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        // 消费者
        new Thread(() -> {
            try {
                Data data = exchanger.exchange(null);
                Data processed = processData(data);
                exchanger.exchange(processed);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

四、原子类与 CAS

@Component
public class AtomicOperationsDemo {

    // 原子计数器
    private final AtomicLong counter = new AtomicLong(0);

    // 原子累加器(高并发下性能更好)
    private final LongAdder longAdder = new LongAdder();

    // 原子引用
    private final AtomicReference<Config> configRef = new AtomicReference<>(new Config());

    public long incrementAndGet() {
        return counter.incrementAndGet();
    }

    public void add(long value) {
        longAdder.add(value);
    }

    public long getSum() {
        return longAdder.sum();
    }

    // CAS 更新配置
    public boolean updateConfig(Config newConfig) {
        Config current;
        do {
            current = configRef.get();
            if (!shouldUpdate(current, newConfig)) {
                return false;
            }
        } while (!configRef.compareAndSet(current, newConfig));
        return true;
    }

    // LongAccumulator - 自定义累加逻辑
    private final LongAccumulator maxAccumulator = new LongAccumulator(Long::max, 0);

    public void updateMax(long value) {
        maxAccumulator.accumulate(value);
    }

    public long getMax() {
        return maxAccumulator.get();
    }
}

五、Fork/Join 框架

@Component
public class ForkJoinDemo {

    // 递归任务示例
    public long parallelSum(long[] array) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            return pool.invoke(new SumTask(array, 0, array.length));
        } finally {
            pool.shutdown();
        }
    }

    // 递归任务
    private static class SumTask extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10000;
        
        private final long[] array;
        private final int start;
        private final int end;

        SumTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                // 直接计算
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            }

            // 拆分任务
            int mid = (start + end) / 2;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);
            
            left.fork();
            long rightResult = right.compute();
            long leftResult = left.join();
            
            return leftResult + rightResult;
        }
    }

    // 递归动作示例
    public void parallelProcess(List<Task> tasks) {
        ForkJoinPool.commonPool().invoke(new ProcessTask(tasks, 0, tasks.size()));
    }

    private static class ProcessTask extends RecursiveAction {
        private static final int THRESHOLD = 100;
        
        private final List<Task> tasks;
        private final int start;
        private final int end;

        ProcessTask(List<Task> tasks, int start, int end) {
            this.tasks = tasks;
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            if (end - start <= THRESHOLD) {
                for (int i = start; i < end; i++) {
                    tasks.get(i).execute();
                }
                return;
            }

            int mid = (start + end) / 2;
            ProcessTask left = new ProcessTask(tasks, start, mid);
            ProcessTask right = new ProcessTask(tasks, mid, end);
            
            invokeAll(left, right);
        }
    }
}

六、总结

Java 并发编程的关键点:

  1. 线程池:合理配置核心参数,做好监控
  2. 异步编程:CompletableFuture 让异步代码更优雅
  3. 并发集合:选择合适的线程安全集合
  4. 原子操作:CAS 实现无锁编程
  5. 任务分解:Fork/Join 处理大数据量任务

这其实可以更优雅一点。并发编程的核心是理解 happens-before 规则和内存模型。

参考资源:

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java 10 局部变量类型推断浅析

    Java 10 局部变量类型推断浅析

    这篇文章主要介绍了Java 10 局部变量类型推断浅析,Java 10 引进一种新的闪闪发光的特性叫做局部变量类型推断。文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,,需要的朋友可以参考下
    2019-06-06
  • 基于RestTemplate的使用方法(详解)

    基于RestTemplate的使用方法(详解)

    下面小编就为大家带来一篇基于RestTemplate的使用方法(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-07-07
  • SpringBoot 文件或图片上传与下载功能的实现

    SpringBoot 文件或图片上传与下载功能的实现

    这篇文章主要介绍了SpringBoot 文件或图片上传与下载功能的实现,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-02-02
  • jvm调优常用命令行工具详解

    jvm调优常用命令行工具详解

    这篇文章主要介绍了jvm调优常用命令行工具的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-03-03
  • 一文了解Java读写锁ReentrantReadWriteLock的使用

    一文了解Java读写锁ReentrantReadWriteLock的使用

    ReentrantReadWriteLock称为读写锁,它提供一个读锁,支持多个线程共享同一把锁。这篇文章主要讲解一下ReentrantReadWriteLock的使用和应用场景,感兴趣的可以了解一下
    2022-10-10
  • Java中计算时间差的方法

    Java中计算时间差的方法

    这篇文章主要介绍了Java中计算时间差的方法,实例分析了java常见的三种计算时间差的技巧,需要的朋友可以参考下
    2015-06-06
  • Java 11 正式发布,这 8 个逆天新特性教你写出更牛的代码

    Java 11 正式发布,这 8 个逆天新特性教你写出更牛的代码

    美国当地时间9月25日,Oracle 官方宣布 Java 11 (18.9 LTS) 正式发布,可在生产环境中使用!这是自 Java 8 后的首个长期支持版本
    2018-09-09
  • Java输入数据的知识点整理

    Java输入数据的知识点整理

    在本篇文章里小编给大家整理的是关于Java如何输入数据的相关知识点内容,有兴趣的朋友们学习参考下。
    2020-01-01
  • Java中文件写入内容的几种常见方法

    Java中文件写入内容的几种常见方法

    本文主要介绍了Java中文件写入内容的几种常见方法,主要包括使用NIO的Files工具类、通过commons-io的FileUtils工具类、RandomAccessFile、PrintWriter和BufferedWriter这几种,具有一定的参考价值,感兴趣的可以了解一下
    2023-10-10
  • Java聊天室之实现使用Socket传递音频

    Java聊天室之实现使用Socket传递音频

    这篇文章主要为大家详细介绍了Java简易聊天室之使用Socket实现传递音频功能,文中的示例代码讲解详细,具有一定的借鉴价值,需要的可以了解一下
    2022-10-10

最新评论