用JAVA实现一套背压机制

 更新时间:2023年06月30日 08:47:27   作者:hwp0710  
背压依我的理解来说,是指订阅者能和发布者交互,可以调节发布者发布数据的速率,解决把订阅者压垮的问题,这篇文章主要介绍了用JAVA自己实现一套背压机制,需要的朋友可以参考下

Reactive Streams:一种支持背压的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。

Reactive Streams主要解决背压(back-pressure)问题。当传入的任务速率大于系统处理能力时,数据处理将会对未处理数据产生一个缓冲区。

背压依我的理解来说,是指订阅者能和发布者交互(通过代码里面的调用request和cancel方法交互),可以调节发布者发布数据的速率,解决把订阅者压垮的问题。关键在于上面例子里面的订阅关系Subscription这个接口,他有request和cancel 2个方法,用于通知发布者需要数据和通知发布者不再接受数据。

我们重点理解背压在jdk9里面是如何实现的。关键在于发布者Publisher的实现类SubmissionPublisher的submit方法是block方法。订阅者会有一个缓冲池,默认为Flow.defaultBufferSize() = 256。当订阅者的缓冲池满了之后,发布者调用submit方法发布数据就会被阻塞,发布者就会停(慢)下来;订阅者消费了数据之后(调用Subscription.request方法),缓冲池有位置了,submit方法就会继续执行下去,就是通过这样的机制,实现了调节发布者发布数据的速率,消费得快,生成就快,消费得慢,发布者就会被阻塞,当然就会慢下来了。

单线程版本:

一个生产者,一个消费者

import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BackpressureExample {
    public static void main(String[] args) throws InterruptedException {
        BackpressureSubscriber subscriber = new BackpressureSubscriber();
        BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
        publisher.start();
        subscriber.start();
        // 为了演示效果,这里让主线程休眠一段时间
        Thread.sleep(50000);
        publisher.stop();
        subscriber.stop();
    }
    @SneakyThrows
    public static void processDataLogic(List<Integer> batch) {
        //模拟任务执行
        int r = new Random().nextInt(3000);
        Thread.sleep(r);
        System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
    }
    static class BackpressurePublisher {
        private final BackpressureSubscriber subscriber;
        private volatile boolean running;
        public BackpressurePublisher(BackpressureSubscriber subscriber) {
            this.subscriber = subscriber;
            this.running = true;
        }
        public void start() {
            Thread thread = new Thread(() -> {
                int item = 1;
                while (running) {
                    List<Integer> batch = new ArrayList<>();
                    for (int i = 0; i < 5; i++) {
                        System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
                        batch.add(item++);
                    }
                    while (!subscriber.accept(batch)) {
                        if (!running) {
                            break;
                        }
                    }
                }
            });
            thread.start();
        }
        public void stop() {
            running = false;
        }
    }
    static class BackpressureSubscriber {
        private volatile boolean running;
        public BackpressureSubscriber() {
            this.running = true;
        }
        public boolean accept(List<Integer> batch) {
            if (running) {
                processDataLogic(batch);
                return true;
            } else {
                return false;
            }
        }
        public void start() {
            // Subscriber 在 JDK 8 中没有异步处理的能力,因此不需要单独开启线程
        }
        public void stop() {
            running = false;
        }
    }
}

多线程版本

一个生产者,多个消费者

import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BackpressureExample {
    public static void main(String[] args) throws InterruptedException {
        BackpressureSubscriber subscriber = new BackpressureSubscriber();
        BackpressurePublisher publisher = new BackpressurePublisher(subscriber);
        publisher.start();
        subscriber.start();
        // 为了演示效果,这里让主线程休眠一段时间
        Thread.sleep(50000);
        publisher.stop();
        subscriber.stop();
    }
    @SneakyThrows
    public static void processDataLogic(List<Integer> batch) {
        //模拟任务执行
        int r = new Random().nextInt(3000);
        Thread.sleep(r);
        System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r);
    }
    static class BackpressurePublisher {
        private final BackpressureSubscriber subscriber;
        private volatile boolean running;
        public BackpressurePublisher(BackpressureSubscriber subscriber) {
            this.subscriber = subscriber;
            this.running = true;
        }
        public void start() {
            Thread thread = new Thread(() -> {
                int item = 1;
                while (running) {
                    List<Integer> batch = new ArrayList<>();
                    for (int i = 0; i < 5; i++) {
                        System.out.println(Thread.currentThread().getName() + "-----produce data = " + item);
                        batch.add(item++);
                    }
                    while (!subscriber.accept(batch)) {
                        if (!running) {
                            break;
                        }
                    }
                }
            });
            thread.start();
        }
        public void stop() {
            running = false;
        }
    }
    static class BackpressureSubscriber {
        private volatile boolean running;
        private final ExecutorService executor;
        private final int workerSize = 2;
        private final List<Future> futures;
        public BackpressureSubscriber() {
            this.running = true;
            this.executor = Executors.newFixedThreadPool(workerSize);
            futures = new ArrayList<>(workerSize);
        }
        public boolean accept(List<Integer> batch) {
            if (running) {
                Future f = executor.submit(() -> processDataLogic(batch));
                futures.add(f);
                waitForTaskDone(futures);
                return true;
            } else {
                return false;
            }
        }
        public void waitForTaskDone(List<Future> futures) {
            while (futures.size() >= workerSize) {
                for (Future future : futures) {
                    if (future.isDone()) {
                        // 只要有一个worker是空闲就重新获取任务
                        futures.remove(future);
                        return;
                    }
                }
            }
        }
        public void start() {
            // Subscriber 在 JDK 8 中没有异步处理的能力,因此不需要单独开启线程
        }
        public void stop() {
            running = false;
            executor.shutdown();
        }
    }
}

到此这篇关于用JAVA自己实现一套背压机制的文章就介绍到这了,更多相关java背压机制内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中的volatile关键字原理深入解析

    Java中的volatile关键字原理深入解析

    这篇文章主要介绍了Java中的volatile关键字原理深入解析,volatile是Java 编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量,需要的朋友可以参考下
    2023-12-12
  • java HashMap,TreeMap与LinkedHashMap的详解

    java HashMap,TreeMap与LinkedHashMap的详解

    这篇文章主要介绍了 java HashMap,TreeMap与LinkedHashMap的详解的相关资料,这里提供实例代码,帮助大家学习理解 这部分的内容,需要的朋友可以参考下
    2016-11-11
  • Java并发编程中的volatile关键字详解

    Java并发编程中的volatile关键字详解

    这篇文章主要介绍了Java并发编程中的volatile关键字详解,volatile 用于保证我们某个变量的可见性,使其一直存放在主存中,不被移动到某个线程的私有工作内存中,需要的朋友可以参考下
    2023-08-08
  • springboot使用校验框架validation校验的示例

    springboot使用校验框架validation校验的示例

    这篇文章主要介绍了springboot使用校验框架validation校验的示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-02-02
  • Java中@valid和@Validated注解的使用详解

    Java中@valid和@Validated注解的使用详解

    这篇文章主要介绍了Java中@valid和@Validated注解的使用详解,@Validated可以用在类型、方法和方法参数上,但是不能用在成员属性(字段)上,不支持嵌套检测,@Valid可以用在方法、构造函数、方法参数和成员属性(字段)上,支持嵌套检测,需要的朋友可以参考下
    2024-01-01
  • Spring boot如何集成kaptcha并生成验证码

    Spring boot如何集成kaptcha并生成验证码

    这篇文章主要介绍了Spring boot如何集成kaptcha并生成验证码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • 使用controller传boolean形式值

    使用controller传boolean形式值

    这篇文章主要介绍了使用controller传boolean形式值,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • java中@JSONField和@JsonProperty注解的使用说明及对比

    java中@JSONField和@JsonProperty注解的使用说明及对比

    @JSONField与@JsonProperty隶属两个不同的包,前者是阿里系的fastjson包,后者是spring boot官方使用的jackson包,本文主要介绍了java中@JSONField和@JsonProperty注解的使用说明及对比,感兴趣的可以了解一下
    2023-11-11
  • Springboot关于自定义stater的yml无法提示问题解决方案

    Springboot关于自定义stater的yml无法提示问题解决方案

    这篇文章主要介绍了Springboot关于自定义stater的yml无法提示问题及解决方案,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-06-06
  • Java中通过三级缓存解决Spring循环依赖详解

    Java中通过三级缓存解决Spring循环依赖详解

    这篇文章主要介绍了Java中通过三级缓存解决Spring循环依赖详解,当出现两个或多个 Bean 在初始化时相互依赖的情况时,Spring Boot 会将其中一个 Bean 提前暴露出来,以便其他 Bean 能够在初始化时正确地引用它,这一策略能有效避免循环依赖导致的问题,需要的朋友可以参考下
    2023-09-09

最新评论