一篇非常好的Spring Integration 教程

 更新时间:2026年06月15日 10:19:11   作者:Full Stack Developme  
Spring Integration 是 Spring 家族里专门用来搞定‌企业应用集成‌的框架,核心就是让不同系统之间能‌松耦合‌地通过‌消息‌来通信,本文给大家介绍Spring Integration 教程,感兴趣的朋友一起看看吧

一、什么是 Spring Integration

Spring Integration 是 Spring 生态系统中的一个扩展模块,用于实现企业应用集成 (EAI, Enterprise Application Integration)。它基于 Spring 框架,提供了一套声明式的适配器,用于集成不同的系统和服务。

核心特点:

  • 基于消息驱动的架构
  • 支持多种传输协议(HTTP, TCP, JMS, AMQP, FTP, File 等)
  • 提供开箱即用的端点适配器
  • 支持企业集成模式 (EIP, Enterprise Integration Patterns)

二、核心概念

1. Message

// 消息由消息头和消息体组成
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}
// 创建消息
Message<String> message = MessageBuilder.withPayload("Hello")
    .setHeader("key", "value")
    .build();

2. Message Channel

消息通道用于在发送者和接收者之间传递消息。

// 点对点通道
@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
// 发布订阅通道
@Bean
public MessageChannel publishSubscribeChannel() {
    return new PublishSubscribeChannel();
}
// 队列通道
@Bean
public MessageChannel queueChannel() {
    return new QueueChannel(10);
}

3. Message Endpoint

消息端点负责处理消息。

三、快速入门示例

Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- 可选:特定协议支持 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-http</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
</dependency>

基础配置示例

@Configuration
@EnableIntegration
public class IntegrationConfig {
    // 定义消息通道
    @Bean
    public MessageChannel inputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }
    // 定义集成流程
    @Bean
    public IntegrationFlow simpleFlow() {
        return IntegrationFlow.from(inputChannel())
            .transform(String.class, s -> s.toUpperCase())
            .filter(s -> s.startsWith("A"))
            .handle(System.out::println)
            .get();
    }
}

使用 @MessagingGateway

// 定义网关接口
@MessagingGateway(defaultRequestChannel = "inputChannel")
public interface SimpleGateway {
    void sendMessage(String message);
    @Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel")
    String sendAndReceive(String message);
}
// 使用网关
@Service
public class MessageService {
    @Autowired
    private SimpleGateway gateway;
    public void send(String message) {
        gateway.sendMessage(message);
    }
}

四、常用企业集成模式

1. 消息转换器 (Transformer)

@Bean
public IntegrationFlow transformerFlow() {
    return IntegrationFlow.from("inputChannel")
        .transform(new GenericTransformer<String, User>() {
            @Override
            public User transform(String source) {
                return new User(source);
            }
        })
        .channel("outputChannel")
        .get();
}

2. 消息过滤器 (Filter)

@Bean
public IntegrationFlow filterFlow() {
    return IntegrationFlow.from("inputChannel")
        .filter(payload -> payload instanceof User)
        .filter("payload.age > 18")  // SpEL 表达式
        .channel("adultChannel")
        .get();
}

3. 消息路由器 (Router)

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("inputChannel")
        .route(payload -> {
            if (payload instanceof Order) return "orderChannel";
            if (payload instanceof Payment) return "paymentChannel";
            return "errorChannel";
        })
        .get();
}

4. 消息拆分器 (Splitter) 和聚合器 (Aggregator)

@Bean
public IntegrationFlow splitterAggregatorFlow() {
    return IntegrationFlow.from("inputChannel")
        .split()  // 拆分消息
        .channel("splitChannel")
        .aggregate()  // 聚合消息
        .channel("outputChannel")
        .get();
}

五、常用适配器示例

1. 文件适配器

@Configuration
public class FileIntegrationConfig {
    // 读取文件
    @Bean
    public IntegrationFlow fileReaderFlow() {
        return IntegrationFlow.from(
            Files.inboundAdapter(new File("/input"))
                .patternFilter("*.txt"),
            e -> e.poller(Pollers.fixedDelay(1000))
        )
        .transform(File.class, File::getAbsolutePath)
        .handle(System.out::println)
        .get();
    }
    // 写入文件
    @Bean
    public IntegrationFlow fileWriterFlow() {
        return IntegrationFlow.from("fileInputChannel")
            .handle(Files.outboundAdapter(new File("/output"))
                .autoCreateDirectory(true))
            .get();
    }
}

2. HTTP 适配器

@Configuration
public class HttpIntegrationConfig {
    // HTTP 入站网关
    @Bean
    public IntegrationFlow httpInboundFlow() {
        return IntegrationFlow.from(
            Http.inboundGateway("/api/message")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class)
                .replyTimeout(30000)
        )
        .transform(String.class, s -> "Processed: " + s)
        .get();
    }
    // HTTP 出站网关
    @Bean
    public IntegrationFlow httpOutboundFlow() {
        return IntegrationFlow.from("requestChannel")
            .handle(Http.outboundGateway("https://api.example.com/data")
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class))
            .channel("responseChannel")
            .get();
    }
}

3. JMS 适配器

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
</dependency>
@Configuration
public class JmsIntegrationConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:61616");
    }
    // JMS 入站适配器
    @Bean
    public IntegrationFlow jmsInboundFlow() {
        return IntegrationFlow.from(
            Jms.inboundAdapter(connectionFactory())
                .destination("queue.in")
        )
        .transform(String.class, String::toUpperCase)
        .handle(message -> System.out.println("Received: " + message))
        .get();
    }
    // JMS 出站适配器
    @Bean
    public IntegrationFlow jmsOutboundFlow() {
        return IntegrationFlow.from("jmsOutputChannel")
            .handle(Jms.outboundAdapter(connectionFactory())
                .destination("queue.out"))
            .get();
    }
}

六、高级特性

1. 错误处理

@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlow.from("inputChannel")
        .transform(...)
        .handle(..., e -> e
            .advice(ExpressionEvaluatingRequestHandlerAdvice.class)
            .advice(advice -> advice
                .onFailureExpression("payload.message")
                .trapException(true))
        )
        .get();
}
// 全局错误通道
@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
        .handle(message -> {
            Exception exception = (Exception) message.getPayload();
            log.error("Error: ", exception);
        })
        .get();
}

2. 消息历史

@Configuration
@EnableIntegration
@EnableMessageHistory
public class HistoryConfig {
    @Bean
    public IntegrationFlow historyFlow() {
        return IntegrationFlow.from("inputChannel")
            .transform(...)
            .enrichHeaders(s -> s.header(MessageHistory.HEADER_NAME, new MessageHistory()))
            .handle(...)
            .get();
    }
}

3. 控制总线

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from("controlBus")
        .controlBus()
        .get();
}
// 使用控制总线
@Component
public class ControlBusService {
    @Autowired
    @Qualifier("controlBus")
    private MessageChannel controlBus;
    public void stopChannel() {
        controlBus.send(MessageBuilder.withPayload("@myChannel.stop()").build());
    }
}

七、完整示例:文件处理系统

@SpringBootApplication
@EnableIntegration
public class FileProcessingApplication {
    public static void main(String[] args) {
        SpringApplication.run(FileProcessingApplication.class, args);
    }
}
@Configuration
public class FileProcessingFlow {
    private static final Logger log = LoggerFactory.getLogger(FileProcessingFlow.class);
    // 文件输入目录
    @Value("${input.directory:/input}")
    private String inputDirectory;
    // 处理成功目录
    @Value("${success.directory:/success}")
    private String successDirectory;
    // 处理失败目录
    @Value("${failed.directory:/failed}")
    private String failedDirectory;
    @Bean
    public IntegrationFlow fileProcessingFlow() {
        return IntegrationFlow.from(
            Files.inboundAdapter(new File(inputDirectory))
                .patternFilter("*.csv")
                .preventDuplicates(true)
                .autoCreateDirectory(true),
            e -> e.poller(Pollers.fixedDelay(5000)
                .maxMessagesPerPoll(5)
                .advice(expressionAdvice()))
        )
        .channel(MessageChannels.queue("processingChannel", 10))
        .transform(Files.toStringTransformer())  // 文件转字符串
        .split(s -> s.delimiters("\n"))  // 按行拆分
        .filter(line -> !line.trim().isEmpty())
        .transform(line -> parseCsvLine(line))  // 解析CSV
        .aggregate(aggregatorSpec -> aggregatorSpec
            .releaseStrategy(new SimpleSequenceSizeReleaseStrategy())
            .correlationStrategy(message -> "batch"))
        .handle(message -> processBatch((List<Map<String, String>>) message.getPayload()))
        .handle(Files.outboundAdapter(new File(successDirectory))
            .autoCreateDirectory(true)
            .fileNameGenerator(message -> generateFileName(message)))
        .get();
    }
    // 错误处理:失败的文件移动到失败目录
    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
            .handle(message -> {
                Message<?> failedMessage = (Message<?>) message.getHeaders().get("inputMessage");
                File failedFile = (File) failedMessage.getPayload();
                FileUtils.moveFileToDirectory(failedFile, new File(failedDirectory), true);
                log.error("Failed to process file: {}", failedFile.getName());
            })
            .get();
    }
    private Map<String, String> parseCsvLine(String line) {
        // CSV解析逻辑
        return new HashMap<>();
    }
    private void processBatch(List<Map<String, String>> batch) {
        // 批量处理逻辑
        log.info("Processing batch of {} records", batch.size());
    }
    private String generateFileName(Message<?> message) {
        return "processed_" + System.currentTimeMillis() + ".json";
    }
    @Bean
    public Advice expressionAdvice() {
        return new ExpressionEvaluatingRequestHandlerAdvice();
    }
}

八、最佳实践

  • 合理使用通道类型:DirectChannel 用于同步,QueueChannel 用于缓冲,PublishSubscribeChannel 用于广播
  • 避免阻塞操作:使用 QueueChannel 时注意配置合适的大小和 poller
  • 错误处理:始终配置错误通道,记录异常并适当重试
  • 监控和管理:使用 Spring Boot Actuator 监控集成端点
management:
  endpoints:
    web:
      exposure:
        include: integration

测试:使用 @SpringIntegrationTest 进行集成测试

@SpringBootTest
@SpringIntegrationTest(noAutoStartup = {"inputChannel"})
class IntegrationFlowTest {
    @Test
    void testFlow() {
        // 测试逻辑
    }
}

到此这篇关于一篇非常好的Spring Integration 教程的文章就介绍到这了,更多相关Spring Integration 教程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 在zuulFilter中注入bean失败的解决方案

    在zuulFilter中注入bean失败的解决方案

    这篇文章主要介绍了在zuulFilter中注入bean失败的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • idea以文本形式输出idea目录结构方式

    idea以文本形式输出idea目录结构方式

    本文介绍了使用Alt+F12打开Terminal终端,cd到项目路径下,使用tree命令查看项目结构的方法,并表示这是个人经验,仅供参考
    2026-05-05
  • SpringBoot项目如何连接MySQL8.0数据库

    SpringBoot项目如何连接MySQL8.0数据库

    这篇文章主要介绍了SpringBoot项目如何连接MySQL8.0数据库,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-11-11
  • Java大数据开发Hadoop MapReduce

    Java大数据开发Hadoop MapReduce

    MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系
    2023-03-03
  • Java中String类的常用方法总结

    Java中String类的常用方法总结

    java.lang.String 类代表字符串。Java程序中所有的字符串文字(例如"abc" )都可以被看作是实现此类的实例。本文主要为大家介绍了String类的常用方法,需要的可以参考一下
    2022-11-11
  • Java枚举之EnumSet详解

    Java枚举之EnumSet详解

    这篇文章主要介绍了Java枚举之EnumSet详解,使用时进行与或运算,但是定义多了之后,会很乱、臃肿,编写容易出错,EnumSet可以实现类似的功能,且使用起来很简洁,需要的朋友可以参考下
    2023-12-12
  • java中超过long范围的超大整数相加算法详解(面试高频)

    java中超过long范围的超大整数相加算法详解(面试高频)

    这篇文章主要介绍了java中超过long范围的超大整数相加算法(面试高频),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • maven打包时候修改包名称带上git版本号和打包时间方式

    maven打包时候修改包名称带上git版本号和打包时间方式

    这篇文章主要介绍了maven打包时候修改包名称带上git版本号和打包时间方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-04-04
  • Groovy正则表达式使用解读

    Groovy正则表达式使用解读

    文章总结了Groovy中的正则表达式特点,指出其作为Java扩展支持=~和==~操作符,转义字符使用需特别处理,正则表达式在双引号字符串中不需要转义,但需双斜线表示特殊字符,与Java相比,Groovy更简便,基于Pattern和Matcher类,用于模式匹配和捕获子字符串
    2025-09-09
  • SpringBoot实现IP地址解析的示例代码

    SpringBoot实现IP地址解析的示例代码

    本篇带大家实践在springboot项目中获取请求的ip与详细地址,我们的很多网站app中都已经新增了ip地址显示,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01

最新评论