Spring响应式编程之Reactor操作符详解

 更新时间:2025年09月05日 08:44:53   作者:minh_coo  
文章介绍了Reactor库中常用的响应式流操作符,分为创建、转换、组合、条件和错误处理五类,详细列举了每类操作符的功能和用途,这些操作符旨在提高响应式流的可读性和开发效率,帮助开发者更高效地处理数据流

操作符Processo<T,R>

操作符并不是响应式流规范的一部分,但为了改进响应式代码的可读性并降低开发成本,Reactor 库中的 API 提供了一组丰富的操作符,这些操作符为响应式流规范提供了最大的附加值。

下面介绍一些常用的操作符。

(1)创建操作符

  • just:创建一个包含单个元素的Mono或多个元素的Flux;
  • empty:创建一个空的Flux或Mono;
  • defer:在订阅时动态创建一个新的Flux或Mono;
  • fromArray:从数组创建Flux;
  • fromIterable:从Iterable对象创建Flux;
  • range:创建一个从start到end的整数序列Flux;
  • interval:创建一个按时间间隔发布数据的Flux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class CreationExample {
    public static void main(String[] args) {
        // 示例 1: 使用 Mono 创建操作符
        Mono<String> monoJust = Mono.just("Hello, Mono");
        Mono<String> monoEmpty = Mono.empty();
        Mono<String> monoDefer = Mono.defer(() -> Mono.just("Deferred Mono"));

        // 订阅 Mono 并打印结果
        monoJust.subscribe(System.out::println);
        monoEmpty.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Completed"));
        monoDefer.subscribe(System.out::println);

        // 示例 2: 使用 Flux 创建操作符
        Flux<String> fluxJust = Flux.just("A", "B", "C");
        Flux<String> fluxFromArray = Flux.fromArray(new String[]{"A", "B", "C"});
        List<String> list = Arrays.asList("A", "B", "C");
        Flux<String> fluxFromIterable = Flux.fromIterable(list);
        Flux<String> fluxFromStream = Flux.fromStream(Stream.of("A", "B", "C"));
        Flux<Integer> fluxRange = Flux.range(1, 5);
        Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));
        Flux<String> fluxDefer = Flux.defer(() -> Flux.just("Deferred Flux"));

        // 订阅 Flux 并打印结果
        fluxJust.subscribe(System.out::println);
        fluxFromArray.subscribe(System.out::println);
        fluxFromIterable.subscribe(System.out::println);
        fluxFromStream.subscribe(System.out::println);
        fluxRange.subscribe(System.out::println);
        fluxInterval.take(5).subscribe(System.out::println);
        fluxDefer.subscribe(System.out::println);
    }
}

(2)转换操作符

  • map:将Mono中的值或Flux中的每个元素转换为另一种类型;
  • flatmap:将Mono中的值或Flux中的每个元素转换为另一个Mono或另一个Publisher,并展平结果;
  • flatMapSequential:类似于flatMap,但保持顺序并并行处理;
  • flatMapMany:将Mono中的值转换为Flux;
  • collectList: 将Flux中的所有元素收集到一个List中,返回Mono<List<T>>;
  • collectMap:将Flux中的元素收集到一个Map中,返回Mono<Map<K,V>>;
  • reduce:聚合Flux中的元素,返回Mono;
  • buffer:将Flux中的元素收集到List中,按指定大小进行分组;
  • window:将Flux中的元素分组到Flux中,每组包含指定数量的元素;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

public class ConversionExample {
    public static void main(String[] args) {
        // 示例 1: 使用 Mono 转换操作符
        Mono<Integer> mono = Mono.just("123")
                                 .map(Integer::parseInt)
                                 .flatMap(i -> Mono.just(i * 2))
                                 .doOnNext(System.out::println);

        mono.subscribe();

        // 示例 2: 使用 Flux 转换操作符
        Flux<Integer> flux = Flux.just("1", "2", "3", "4", "5")
                                 .map(Integer::parseInt)
                                 .filter(i -> i % 2 == 0)
                                 .flatMap(i -> Flux.just(i * 2))
                                 .concatMap(i -> Flux.just(i + 1))
                                 .buffer(2)
                                 .doOnNext(System.out::println);

        flux.subscribe();
    }
}

(3)组合操作符

  • zipWith:将两个Mono的值组合成一个新的Mono;
  • zip:将多个Flux的元素组合成一个Flux;
  • then:在当前Mono或Flux完成后执行另一个Mono或Flux;
  • thenReturn:在当前Mono或Flux完成后返回一个指定的值;
  • thenMany:在当前Mono完成后返回一个Flux;
  • when:等待多个Publisher完成
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CombinationExample {
    public static void main(String[] args) {
        // 示例 1: 使用 Mono 组合操作符
        Mono<String> mono1 = Mono.just("Hello");
        Mono<String> mono2 = Mono.just("World");
        Mono<String> combined = mono1.zipWith(mono2, (a, b) -> a + " " + b);
        combined.subscribe(System.out::println); // 输出: Hello World

        Mono<Void> when = Mono.when(mono1, mono2);
        when.subscribe(null, Throwable::printStackTrace, () -> System.out.println("Completed")); // 输出: Completed

        // 示例 2: 使用 Flux 组合操作符
        Flux<String> flux1 = Flux.just("A", "B", "C");
        Flux<String> flux2 = Flux.just("1", "2", "3");
        Flux<String> merged = Flux.merge(flux1, flux2);
        merged.subscribe(System.out::println); // 输出: A 1 B 2 C 3

        Flux<String> concatenated = Flux.concat(flux1, flux2);
        concatenated.subscribe(System.out::println); // 输出: A B C 1 2 3

        Flux<String> zipped = Flux.zip(flux1, flux2, (a, b) -> a + b);
        zipped.subscribe(System.out::println); // 输出: A1 B2 C3

        Flux<String> combinedLatest = Flux.combineLatest(flux1, flux2, (a, b) -> a + b);
        combinedLatest.subscribe(System.out::println); // 输出: C3

        Flux<String> started = flux1.startWith("Start");
        started.subscribe(System.out::println); // 输出: Start A B C
    }
}

(4)条件操作符

  • hasElement:判断Mono是否包含元素;
  • hasElements:判断Flux是否包含元素;
  • hasElementWith:判断Mono是否包含与给定Predicate匹配的元素;
  • all:判断Flux中的所有元素是否都满足给定的条件;
  • any:判断Flux中是否有任意一个元素满足给定的条件;
  • isEmpty:判断Flux是否为空;
  • switchIfEmpty:如果Mono或Flux为空,则切换到另一个Mono或Flux;
  • defaultIfEmpty:如果Mono或Flux为空,则返回默认值;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ConditionalExample {
    public static void main(String[] args) {
        // 示例 1: 使用 Mono 条件操作符
        Mono<String> mono = Mono.just("Hello");
        Mono<Boolean> hasElement = mono.hasElement();
        hasElement.subscribe(System.out::println); // 输出: true

        Mono<String> emptyMono = Mono.<String>empty();
        Mono<String> switchIfEmptyMono = emptyMono.switchIfEmpty(Mono.just("Default"));
        switchIfEmptyMono.subscribe(System.out::println); // 输出: Default

        Mono<String> defaultIfEmptyMono = emptyMono.defaultIfEmpty("Default");
        defaultIfEmptyMono.subscribe(System.out::println); // 输出: Default

        // 示例 2: 使用 Flux 条件操作符
        Flux<Integer> flux = Flux.just(1, 2, 3, 4);
        Mono<Boolean> allMatch = flux.all(i -> i > 0);
        allMatch.subscribe(System.out::println); // 输出: true

        Mono<Boolean> anyMatch = flux.any(i -> i > 3);
        anyMatch.subscribe(System.out::println); // 输出: true

        Mono<Boolean> hasElements = flux.hasElements();
        hasElements.subscribe(System.out::println); // 输出: true

        Mono<Boolean> isEmpty = flux.isEmpty();
        isEmpty.subscribe(System.out::println); // 输出: false

        Flux<Integer> emptyFlux = Flux.<Integer>empty();
        Flux<Integer> switchIfEmptyFlux = emptyFlux.switchIfEmpty(Flux.just(10, 20, 30));
        switchIfEmptyFlux.subscribe(System.out::println); // 输出: 10 20 30

        Flux<Integer> defaultIfEmptyFlux = emptyFlux.defaultIfEmpty(999);
        defaultIfEmptyFlux.subscribe(System.out::println); // 输出: 999
    }
}

(5)错误处理操作符

  • onErrorResume:当发生错误时,切换到另一个数据流;
  • onErrorReturn:当发生错误时,返回一个默认值;
  • onErrorMap:将错误映射为另一个错误;
  • retry重试操作一定次数;
  • retryWhen:当错误发生时,根据提供的Publisher逻辑重试;
  • doOnError:当发生错误时执行一些额外的逻辑;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

public class ErrorHandlingExample {
    public static void main(String[] args) {
        // 示例 1: 使用 Mono 错误处理操作符
        Mono<String> mono1 = Mono.error(new RuntimeException("Error"))
                                 .onErrorResume(e -> Mono.just("Recovered"));
        mono1.subscribe(System.out::println); // 输出: Recovered

        Mono<String> mono2 = Mono.error(new RuntimeException("Error"))
                                 .onErrorReturn("Default");
        mono2.subscribe(System.out::println); // 输出: Default

        Mono<String> mono3 = Mono.error(new RuntimeException("Error"))
                                 .onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));
        mono3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped Error

        Mono<String> mono4 = Mono.error(new RuntimeException("Error"))
                                 .retry(3);
        mono4.subscribe(System.out::println, Throwable::printStackTrace);

        Mono<String> mono5 = Mono.error(new RuntimeException("Error"))
                                 .retryWhen(companion -> companion.take(3));
        mono5.subscribe(System.out::println, Throwable::printStackTrace);

        Mono<String> mono6 = Mono.error(new RuntimeException("Error"))
                                 .doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));
        mono6.subscribe(System.out::println, Throwable::printStackTrace);

        // 示例 2: 使用 Flux 错误处理操作符
        Flux<String> flux1 = Flux.just("A", "B")
                                 .concatWith(Mono.error(new RuntimeException("Error")))
                                 .onErrorResume(e -> Flux.just("Recovered"));
        flux1.subscribe(System.out::println); // 输出: A B Recovered

        Flux<String> flux2 = Flux.just("A", "B")
                                 .concatWith(Mono.error(new RuntimeException("Error")))
                                 .onErrorReturn("Default");
        flux2.subscribe(System.out::println); // 输出: A B Default

        Flux<String> flux3 = Flux.just("A", "B")
                                 .concatWith(Mono.error(new RuntimeException("Error")))
                                 .onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));
        flux3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped Error

        Flux<String> flux4 = Flux.just("A", "B")
                                 .concatWith(Mono.error(new RuntimeException("Error")))
                                 .retry(3);
        flux4.subscribe(System.out::println, Throwable::printStackTrace);

        Flux<String> flux5 = Flux.just("A", "B")
                                 .concatWith(Mono.error(new RuntimeException("Error")))
                                 .retryWhen(companion -> companion.take(3));
        flux5.subscribe(System.out::println, Throwable::printStackTrace);

        Flux<String> flux6 = Flux.just("A", "B")
                                 .concatWith(Mono.error(new RuntimeException("Error")))
                                 .doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));
        flux6.subscribe(System.out::println, Throwable::printStackTrace);
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Intellij Idea 多模块Maven工程中模块之间无法相互引用问题

    Intellij Idea 多模块Maven工程中模块之间无法相互引用问题

    这篇文章主要介绍了Intellij Idea 多模块Maven工程中模块之间无法相互引用问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • ReentrantLock从源码解析Java多线程同步学习

    ReentrantLock从源码解析Java多线程同步学习

    这篇文章主要为大家介绍了ReentrantLock从源码解析Java多线程同步学习,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-04-04
  • MyBatis动态SQL特性详解

    MyBatis动态SQL特性详解

    动态SQL可以省略很多拼接SQL的步骤,使用类似于JSTL方式,下面这篇文章主要给大家介绍了关于Mybatis动态SQL特性的相关资料,文字通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-11-11
  • 下一代Eclipse 步入云端

    下一代Eclipse 步入云端

    代号Che的下一代Eclipse IDE将运行在云端,可以在任何机器上打开浏览器写代码。项目的建立、编辑、debug、部署可以都在一个仓库中进行,需要的朋友可以参考下
    2015-12-12
  • J2EE验证码图片如何生成和点击刷新验证码

    J2EE验证码图片如何生成和点击刷新验证码

    这篇文章主要介绍了J2EE如何生成验证码图片如何生成,如何点击刷新验证码的相关方法,感兴趣的小伙伴们可以参考一下
    2016-04-04
  • java语言中封装类代码示例

    java语言中封装类代码示例

    这篇文章主要介绍了java语言中封装类,涉及相关代码示例及结果分析,以及封装的好处简单介绍,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • springboot实现邮箱验证码功能

    springboot实现邮箱验证码功能

    这篇文章主要为大家详细介绍了springboot实现邮箱验证码功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-02-02
  • Springboot内嵌SQLite配置使用详解

    Springboot内嵌SQLite配置使用详解

    这篇文章主要介绍了Springboot内嵌SQLite配置使用详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-08-08
  • Java并发工具类Phaser详解

    Java并发工具类Phaser详解

    这篇文章主要介绍了Java并发工具类Phaser详解,Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行,它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行,需要的朋友可以参考下
    2023-11-11
  • Java集合框架超详细小结

    Java集合框架超详细小结

    Java中提供的一种容器,可以用来存储多个数据。java集合大致可以分为Set,List,Queue和Map四种体系。这篇文章主要介绍了Java集合框架超详细小结,需要的朋友可以参考下
    2021-08-08

最新评论