用JAVA实现一套背压机制
Reactive Streams:一种支持背压的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。
Reactive Streams主要解决背压(back-pressure)问题。当传入的任务速率大于系统处理能力时,数据处理将会对未处理数据产生一个缓冲区。
背压依我的理解来说,是指订阅者能和发布者交互(通过代码里面的调用request和cancel方法交互),可以调节发布者发布数据的速率,解决把订阅者压垮的问题。关键在于上面例子里面的订阅关系Subscription这个接口,他有request和cancel 2个方法,用于通知发布者需要数据和通知发布者不再接受数据。
我们重点理解背压在jdk9里面是如何实现的。关键在于发布者Publisher的实现类SubmissionPublisher的submit方法是block方法。订阅者会有一个缓冲池,默认为Flow.defaultBufferSize() = 256。当订阅者的缓冲池满了之后,发布者调用submit方法发布数据就会被阻塞,发布者就会停(慢)下来;订阅者消费了数据之后(调用Subscription.request方法),缓冲池有位置了,submit方法就会继续执行下去,就是通过这样的机制,实现了调节发布者发布数据的速率,消费得快,生成就快,消费得慢,发布者就会被阻塞,当然就会慢下来了。
单线程版本:
一个生产者,一个消费者
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
BackpressureSubscriber subscriber = new BackpressureSubscriber();
BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
publisher.start();
subscriber.start();
// 为了演示效果,这里让主线程休眠一段时间
Thread.sleep(50000);
publisher.stop();
subscriber.stop();
}
@SneakyThrows
public static void processDataLogic(List<Integer> batch) {
//模拟任务执行
int r = new Random().nextInt(3000);
Thread.sleep(r);
System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
}
static class BackpressurePublisher {
private final BackpressureSubscriber subscriber;
private volatile boolean running;
public BackpressurePublisher(BackpressureSubscriber subscriber) {
this.subscriber = subscriber;
this.running = true;
}
public void start() {
Thread thread = new Thread(() -> {
int item = 1;
while (running) {
List<Integer> batch = new ArrayList<>();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
batch.add(item++);
}
while (!subscriber.accept(batch)) {
if (!running) {
break;
}
}
}
});
thread.start();
}
public void stop() {
running = false;
}
}
static class BackpressureSubscriber {
private volatile boolean running;
public BackpressureSubscriber() {
this.running = true;
}
public boolean accept(List<Integer> batch) {
if (running) {
processDataLogic(batch);
return true;
} else {
return false;
}
}
public void start() {
// Subscriber 在 JDK 8 中没有异步处理的能力,因此不需要单独开启线程
}
public void stop() {
running = false;
}
}
}多线程版本
一个生产者,多个消费者
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
BackpressureSubscriber subscriber = new BackpressureSubscriber();
BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
publisher.start();
subscriber.start();
// 为了演示效果,这里让主线程休眠一段时间
Thread.sleep(50000);
publisher.stop();
subscriber.stop();
}
@SneakyThrows
public static void processDataLogic(List<Integer> batch) {
//模拟任务执行
int r = new Random().nextInt(3000);
Thread.sleep(r);
System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
}
static class BackpressurePublisher {
private final BackpressureSubscriber subscriber;
private volatile boolean running;
public BackpressurePublisher(BackpressureSubscriber subscriber) {
this.subscriber = subscriber;
this.running = true;
}
public void start() {
Thread thread = new Thread(() -> {
int item = 1;
while (running) {
List<Integer> batch = new ArrayList<>();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
batch.add(item++);
}
while (!subscriber.accept(batch)) {
if (!running) {
break;
}
}
}
});
thread.start();
}
public void stop() {
running = false;
}
}
static class BackpressureSubscriber {
private volatile boolean running;
private final ExecutorService executor;
private final int workerSize = 2;
private final List<Future> futures;
public BackpressureSubscriber() {
this.running = true;
this.executor = Executors.newFixedThreadPool(workerSize);
futures = new ArrayList<>(workerSize);
}
public boolean accept(List<Integer> batch) {
if (running) {
Future f = executor.submit(() -> processDataLogic(batch));
futures.add(f);
waitForTaskDone(futures);
return true;
} else {
return false;
}
}
public void waitForTaskDone(List<Future> futures) {
while (futures.size() >= workerSize) {
for (Future future : futures) {
if (future.isDone()) {
// 只要有一个worker是空闲就重新获取任务
futures.remove(future);
return;
}
}
}
}
public void start() {
// Subscriber 在 JDK 8 中没有异步处理的能力,因此不需要单独开启线程
}
public void stop() {
running = false;
executor.shutdown();
}
}
}到此这篇关于用JAVA自己实现一套背压机制的文章就介绍到这了,更多相关java背压机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
使用ServletUtil.write方法下载接口文件中文乱码问题解决
本文主要介绍了使用ServletUtil.write方法下载接口文件中文乱码问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2024-05-05
springboot controller 增加指定前缀的两种实现方法
这篇文章主要介绍了springboot controller 增加指定前缀的两种实现方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-02-02
Spring Boot LocalDateTime格式化处理的示例详解
这篇文章主要介绍了Spring Boot LocalDateTime格式化处理的示例详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧2018-10-10
Spring Cache自定义缓存key和过期时间的实现代码
使用 Redis的客户端 Spring Cache时,会发现生成 key中会多出一个冒号,而且有一个空节点的存在,查看源码可知,这是因为 Spring Cache默认生成key的策略就是通过两个冒号来拼接,本文给大家介绍了Spring Cache自定义缓存key和过期时间的实现,需要的朋友可以参考下2024-05-05
spring应用中多次读取http post方法中的流遇到的问题
这篇文章主要介绍了spring应用中多次读取http post方法中的流,文中给大家列举处理问题描述及解决方法,需要的朋友可以参考下2018-11-11


最新评论