如何使用JCTools实现Java并发程序

 更新时间:2021年03月18日 14:25:03   作者:老K  
这篇文章主要介绍了如何使用JCTools实现Java并发程序,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下

概述

在本文中,我们将介绍JCTools(Java并发工具)库。

简单地说,这提供了许多适用于多线程环境的实用数据结构。

非阻塞算法

传统上,在可变共享状态下工作的多线程代码使用锁来确保数据一致性和发布(一个线程所做的更改对另一个线程可见)。

这种方法有许多缺点:

  • 线程在试图获取锁时可能会被阻塞,在另一个线程的操作完成之前不会取得任何进展—这有效地防止了并行性
  • 锁争用越重,JVM处理调度线程、管理争用和等待线程队列的时间就越多,实际工作就越少
  • 如果涉及多个锁,并且它们以错误的顺序获取/释放,则可能出现死锁
  • 优先级反转的危险是可能的——高优先级线程被锁定,试图获得由低优先级线程持有的锁
  • 大多数情况下,使用粗粒度锁会严重损害并行性—细粒度锁需要更仔细的设计,增加锁开销,并且更容易出错

另一种方法是使用非阻塞算法,即任何线程的故障或挂起都不会导致另一个线程的故障或挂起的算法。

如果所涉及的线程中至少有一个能够在任意时间段内取得进展,即在处理过程中不会出现死锁,则非阻塞算法是无锁的。

此外,如果保证每个线程的进程,这些算法是无等待的。

下面是一个非阻塞堆栈示例,它定义了基本状态:

public class ConcurrentStack<E> {

  AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

  private static class Node <E> {
    public E item;
    public Node<E> next;

    // standard constructor
  }
}

还有一些API方法:

public void push(E item){
  Node<E> newHead = new Node<E>(item);
  Node<E> oldHead;
  
  do {
    oldHead = top.get();
    newHead.next = oldHead;
  } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
  Node<E> oldHead;
  Node<E> newHead;
  do {
    oldHead = top.get();
    if (oldHead == null) {
      return null;
    }
    newHead = oldHead.next;
  } while (!top.compareAndSet(oldHead, newHead));
  
  return oldHead.item;
}

我们可以看到,该算法使用细粒度比较和交换(CAS)指令,并且是无锁的(即使多个线程调用top.compareAndSet()同时,它们中的一个保证会成功)但不能无等待,因为不能保证CAS最终会对任何特定线程成功。

依赖

首先,让我们将JCTools依赖项添加到pom.xml文件:

<dependency>
  <groupId>org.jctools</groupId>
  <artifactId>jctools-core</artifactId>
  <version>2.1.2</version>
</dependency>

请注意,Maven Central上提供了最新的可用版本。

JCTools队列

该库提供了许多队列以在多线程环境中使用,即一个或多个线程以线程安全的无锁方式写入队列,一个或多个线程以线程安全的无锁方式从队列中读取。

所有队列实现的通用接口是org.jctools.queues.MessagePassingQueue。

队列类型
所有队列都可以根据其生产者/消费者策略进行分类:

  • 单个生产者,单个消费者–此类类使用前缀Spsc命名,例如SpscArrayQueue
  • 单个生产者,多个消费者–使用Spmc前缀,例如SpmcArrayQueue
  • 多个生产者,单个消费者-使用Mpsc前缀,例如MpscArrayQueue
  • 多个生产者、多个消费者—使用Mpmc前缀,例如MpmcArrayQueue

需要注意的是,在内部没有策略检查,也就是说,如果使用不正确,队列可能会无声地发生故障。

例如,下面的测试从两个线程填充单个生产者队列并通过,即使不能保证使用者看到来自不同生产者的数据:

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

队列实现

总结以上分类,以下是JCTools队列列表:

  • SpscArrayQueue–单个生产者,单个消费者,在内部使用一个数组,限制容量
  • SpscLinkedQueue–单个生产者,单个消费者,内部使用链表,未绑定容量
  • SpscChunkedArrayQueue–单生产商、单消费者,从初始容量开始,一直增长到最大容量
  • SpscGrowableArrayQueue–单生产者、单消费者,从初始容量开始,一直增长到最大容量。这与SpscChunkedArrayQueue是相同的契约,唯一的区别是内部块管理。建议使用SpscChunkedArrayQueue,因为它有一个简化的实现
  • SpscUnboundedArrayQueue–单个生产者,单个消费者,在内部使用数组,未绑定容量
  • SpmcArrayQueue–单个生产者、多个使用者,在内部使用一个阵列,限制容量
  • MpscArrayQueue—多个生产者、单个消费者在内部使用一个阵列,限制容量
  • MpscLinkedQueue–多个生产者,单个消费者,在内部使用链表,未绑定容量
  • MpmcArrayQueue—多个生产者、多个消费者在内部使用一个阵列,限制容量

原子队列

前面提到的所有队列都使用sun.misc.Unsafe. 然而,随着java9和JEP-260的出现,这个API在默认情况下变得不可访问。

因此,有其他队列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能较差)而不是sun.misc.Unsafe.

它们是从上面的队列生成的,它们的名称中间插入了单词Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。

如果可能,建议使用“常规”队列,并且仅在sun.misc.Unsafe像Hot Java9+和JRockit一样被禁止/无效。

容量

所有JCTools队列也可能具有最大容量或未绑定。当队列已满且受容量限制时,它将停止接受新元素。

在以下示例中,我们:

  • 填满队列
  • 确保在此之后停止接受新元素
  • 从中排出,并确保之后可以添加更多元素

请注意,为了可读性,删除了几个代码语句。

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
  IntStream.range(0, queue.capacity()).forEach(i -> {
    assertThat(queue.offer(i)).isTrue();
  });
  assertThat(queue.offer(queue.capacity())).isFalse();
  startConsuming.countDown();
  awakeProducer.await();
  assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
 IntStream.range(0, 17).boxed().collect(toSet()));

其他数据结构工具

JCTools还提供了一些非队列数据结构。

它们都列在下面:

  • NonBlockingHashMap–一个无锁的ConcurrentHashMap替代方案,具有更好的伸缩性和通常更低的突变成本。它是实现sun.misc.Unsafe,因此,不建议在Java9+或JRockit环境中使用此类
  • NonBlockingHashMapLong–与NonBlockingHashMap类似,但使用基本长键
  • NonBlockingHashSet–一个简单的包装器,围绕着像JDK的java.util.Collections.newSetFromMap()一样的NonBlockingHashMap
  • NonBlockingIdentityHashMap–与NonBlockingHashMap类似,但按标识比较键。
  • NonBlockingSetInt–一个多线程位向量集,实现为一个原始long数组。在无声自动装箱的情况下工作无效

性能测试

让我们使用JMH来比较JDK的ArrayBlockingQueue和JCTools队列的性能。JMH是Sun/Oracle JVM gurus提供的一个开源微基准框架,它保护我们不受编译器/JVM优化算法的不确定性的影响。

请注意,为了提高可读性,下面的代码段遗漏了几个语句。

public class MpmcBenchmark {

  @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
  public volatile String implementation;

  public volatile Queue<Long> queue;

  @Benchmark
  @Group(GROUP_NAME)
  @GroupThreads(PRODUCER_THREADS_NUMBER)
  public void write(Control control) {
    // noinspection StatementWithEmptyBody
    while (!control.stopMeasurement && !queue.offer(1L)) {
      // intentionally left blank
    }
  }

  @Benchmark
  @Group(GROUP_NAME)
  @GroupThreads(CONSUMER_THREADS_NUMBER)
  public void read(Control control) {
    // noinspection StatementWithEmptyBody
    while (!control.stopMeasurement && queue.poll() == null) {
      // intentionally left blank
    }
  }
}

结果:

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

我们可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了两倍。

使用JCTools的缺点

使用JCTools有一个重要的缺点——不可能强制正确使用库类。例如,考虑在我们的大型成熟项目中开始使用MpscArrayQueue的情况(注意,必须有一个使用者)。

不幸的是,由于项目很大,有可能有人出现编程或配置错误,现在从多个线程读取队列。这个系统看起来像以前一样工作,但现在有可能消费者错过了一些信息。这是一个真正的问题,可能会有很大的影响,是很难调试。

理想情况下,应该可以运行具有特定系统属性的系统,该属性强制JCTools确保线程访问策略。例如,本地/测试/暂存环境(而不是生产环境)可能已启用它。遗憾的是,JCTools没有提供这样的属性。

另一个需要考虑的问题是,尽管我们确保JCTools比JDK的对应工具快得多,但这并不意味着我们的应用程序获得了与我们开始使用自定义队列实现时相同的速度。大多数应用程序不会在线程之间交换很多对象,而且大多是I/O绑定的。

结论

现在,我们对JCTools提供的实用程序类有了基本的了解,并了解了它们在重载下与JDK的对应类相比的性能。

总之,只有当我们在线程之间交换大量对象时,才有必要使用该库,即使这样,也有必要非常小心地保留线程访问策略。

以上示例的完整源代码地址:https://github.com/eugenp/tutorials/tree/master/libraries-5

JCTools git地址:https://github.com/JCTools/JCTools

以上就是如何使用JCTools实现Java并发程序的详细内容,更多关于使用JCTools实现Java并发程序的资料请关注脚本之家其它相关文章!

相关文章

  • 新手了解java 泛型基础知识

    新手了解java 泛型基础知识

    这篇文章主要给大家介绍了关于Java中泛型使用的简单方法,文中通过示例代码介绍的非常详细,对大家学习或者使用Java具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2021-07-07
  • springboot返回前端中文乱码的解决

    springboot返回前端中文乱码的解决

    这篇文章主要介绍了springboot返回前端中文乱码的解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • IntelliJ IDEA : .java文件左下角显示

    IntelliJ IDEA : .java文件左下角显示"J"图标的问题

    IntelliJ IDEA : .java文件 左下角显示“J”,并且不能执行代码,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2020-10-10
  • Java guava monitor监视器线程的使用详解

    Java guava monitor监视器线程的使用详解

    工作中的场景中是否存在类似这样的场景,需要提交的线程在某个触发条件下执行。本文主要就是使用guava中的monitor来优雅的实现带监视器的线程
    2021-11-11
  • SpringBoot4.5.2 整合HikariCP 数据库连接池操作

    SpringBoot4.5.2 整合HikariCP 数据库连接池操作

    这篇文章主要介绍了SpringBoot4.5.2 整合HikariCP 数据库连接池操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java中的包(package)是什么和使用方法

    Java中的包(package)是什么和使用方法

    包是Java中一种强大的组织代码的工具,它们帮助开发者将代码分组,防止命名冲突,并通过控制访问级别来增强代码的安全性,这篇文章主要介绍了Java中的包(package)是什么和如何使用它们,需要的朋友可以参考下
    2024-07-07
  • SpringBoot集成Redis使用Cache缓存的实现方法

    SpringBoot集成Redis使用Cache缓存的实现方法

    SpringBoot通过配置RedisConfig类和使用Cache注解可以轻松集成Redis实现缓存,主要包括@EnableCaching开启缓存,自定义key生成器,改变序列化规则,以及配置RedisCacheManager,本文为使用SpringBoot与Redis处理缓存提供了详实的指导和示例,感兴趣的朋友一起看看吧
    2024-10-10
  • 基于kafka实现Spring Cloud Bus消息总线

    基于kafka实现Spring Cloud Bus消息总线

    消息总线是一种通信工具,可以在机器之间互相传输消息、文件等,这篇文章主要介绍了如何利用kafka实现SpringCloud Bus消息总线,感兴趣的可以学习一下
    2022-04-04
  • MybatisPlus BaseMapper 实现对数据库增删改查源码

    MybatisPlus BaseMapper 实现对数据库增删改查源码

    MybatisPlus 是一款在 Mybatis 基础上进行的增强 orm 框架,可以实现不写 sql 就完成数据库相关的操作,这篇文章主要介绍了MybatisPlus BaseMapper 实现对数据库增删改查源码解析,需要的朋友可以参考下
    2023-01-01
  • 2021年最新Redis面试题汇总(4)

    2021年最新Redis面试题汇总(4)

    在程序员面试过程中redis相关的知识是常被问到的话题。这篇文章主要介绍了几道Redis面试题,整理一下分享给大家,感兴趣的小伙伴们可以参考一下
    2021-07-07

最新评论