Spring Batch大数据量处理之从入门到精通实践

 更新时间:2026年04月18日 14:33:29   作者:程序员鸭梨  
文章介绍了SpringBatch的基本架构和核心配置,接着通过CSV导入数据库和数据库导出到文件两个简单任务示例进行讲解,最后介绍了如何处理多步骤复杂任务

一、Spring Batch 基础架构

1.1 核心配置

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public JobExplorer jobExplorer(DataSource dataSource) throws Exception {
        JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
        factoryBean.setDataSource(dataSource);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }

    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry());
        return postProcessor;
    }
}

1.2 数据库表结构

-- Spring Batch 元数据表
-- BATCH_JOB_INSTANCE: 作业实例
-- BATCH_JOB_EXECUTION: 作业执行
-- BATCH_JOB_EXECUTION_PARAMS: 作业参数
-- BATCH_STEP_EXECUTION: 步骤执行
-- BATCH_JOB_EXECUTION_CONTEXT: 作业上下文
-- BATCH_STEP_EXECUTION_CONTEXT: 步骤上下文

-- 创建自定义监控表
CREATE TABLE batch_job_monitoring (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    job_name VARCHAR(100) NOT NULL,
    job_instance_id BIGINT,
    job_execution_id BIGINT,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    status VARCHAR(20),
    read_count BIGINT DEFAULT 0,
    write_count BIGINT DEFAULT 0,
    skip_count BIGINT DEFAULT 0,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

二、简单任务示例

2.1 CSV 导入数据库

@Configuration
public class CsvToDatabaseJobConfig {

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Bean
    public Job csvToDatabaseJob() {
        return new JobBuilder("csvToDatabaseJob", jobRepository)
            .start(csvToDatabaseStep())
            .listener(jobExecutionListener())
            .build();
    }

    @Bean
    public Step csvToDatabaseStep() {
        return new StepBuilder("csvToDatabaseStep", jobRepository)
            .<ProductInput, Product>chunk(1000, transactionManager)
            .reader(csvItemReader())
            .processor(productItemProcessor())
            .writer(databaseItemWriter())
            .faultTolerant()
            .skipLimit(10)
            .skip(ValidationException.class)
            .retryLimit(3)
            .retry(TransientDataAccessException.class)
            .listener(stepExecutionListener())
            .build();
    }

    @Bean
    public FlatFileItemReader<ProductInput> csvItemReader() {
        return new FlatFileItemReaderBuilder<ProductInput>()
            .name("csvItemReader")
            .resource(new FileSystemResource("input/products.csv"))
            .delimited()
            .names("id", "name", "description", "price", "category")
            .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                setTargetType(ProductInput.class);
            }})
            .linesToSkip(1) // 跳过表头
            .build();
    }

    @Bean
    public ItemProcessor<ProductInput, Product> productItemProcessor() {
        return input -> {
            Product product = new Product();
            product.setId(input.getId());
            product.setName(input.getName().trim());
            product.setDescription(input.getDescription());
            product.setPrice(new BigDecimal(input.getPrice()));
            product.setCategory(input.getCategory());
            product.setCreatedAt(LocalDateTime.now());
            return product;
        };
    }

    @Bean
    public JdbcBatchItemWriter<Product> databaseItemWriter() {
        return new JdbcBatchItemWriterBuilder<Product>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO products (id, name, description, price, category, created_at) " +
                 "VALUES (:id, :name, :description, :price, :category, :createdAt) " +
                 "ON DUPLICATE KEY UPDATE " +
                 "name = VALUES(name), description = VALUES(description), " +
                 "price = VALUES(price), category = VALUES(category)")
            .dataSource(dataSource)
            .build();
    }
}

2.2 数据库导出到文件

@Configuration
public class DatabaseToFileJobConfig {

    @Bean
    public Job exportOrdersJob() {
        return new JobBuilder("exportOrdersJob", jobRepository)
            .start(exportOrdersStep())
            .build();
    }

    @Bean
    public Step exportOrdersStep() {
        return new StepBuilder("exportOrdersStep", jobRepository)
            .<Order, OrderOutput>chunk(500, transactionManager)
            .reader(orderItemReader())
            .processor(orderItemProcessor())
            .writer(orderItemWriter())
            .build();
    }

    @Bean
    public JdbcPagingItemReader<Order> orderItemReader() {
        return new JdbcPagingItemReaderBuilder<Order>()
            .name("orderItemReader")
            .dataSource(dataSource)
            .queryProvider(new PagingQueryProvider() {
                @Override
                public void init(DataSource dataSource) {}
                
                @Override
                public String getSortKey() {
                    return "id";
                }
                
                @Override
                public String getSelectClause() {
                    return "SELECT id, user_id, total_amount, status, created_at";
                }
                
                @Override
                public String getFromClause() {
                    return "FROM orders";
                }
                
                @Override
                public String getWhereClause() {
                    return "WHERE created_at >= :startDate AND created_at <= :endDate";
                }
            })
            .parameterValues(Map.of("startDate", startDate, "endDate", endDate))
            .pageSize(1000)
            .rowMapper(new OrderRowMapper())
            .build();
    }

    @Bean
    public FlatFileItemWriter<OrderOutput> orderItemWriter() {
        return new FlatFileItemWriterBuilder<OrderOutput>()
            .name("orderItemWriter")
            .resource(new FileSystemResource("output/orders.csv"))
            .delimited()
            .delimiter(",")
            .names("orderId", "userId", "amount", "status", "createdDate")
            .headerCallback(writer -> writer.write("OrderID,UserID,Amount,Status,CreatedDate"))
            .footerCallback(writer -> writer.write("Total records exported"))
            .build();
    }
}

三、复杂任务处理

3.1 多步骤任务

@Configuration
public class ComplexBatchJobConfig {

    @Bean
    public Job orderProcessingJob() {
        return new JobBuilder("orderProcessingJob", jobRepository)
            .start(validateOrderStep())
            .next(processPaymentStep())
            .next(updateInventoryStep())
            .next(sendNotificationStep())
            .on("FAILED").to(errorHandlingStep())
            .from(sendNotificationStep()).on("*").to(cleanupStep())
            .end()
            .build();
    }

    @Bean
    public Step validateOrderStep() {
        return new StepBuilder("validateOrderStep", jobRepository)
            .<Order, ValidatedOrder>chunk(100, transactionManager)
            .reader(pendingOrderReader())
            .processor(orderValidator())
            .writer(validatedOrderWriter())
            .build();
    }

    @Bean
    public Step processPaymentStep() {
        return new StepBuilder("processPaymentStep", jobRepository)
            .tasklet((contribution, chunkContext) -> {
                // 处理支付逻辑
                JobParameters params = chunkContext.getStepContext().getJobParameters();
                String batchId = params.getString("batchId");
                
                paymentService.processBatchPayments(batchId);
                
                return RepeatStatus.FINISHED;
            }, transactionManager)
            .build();
    }

    @Bean
    public Step updateInventoryStep() {
        return new StepBuilder("updateInventoryStep", jobRepository)
            .<OrderItem, InventoryUpdate>chunk(200, transactionManager)
            .reader(orderItemReader())
            .processor(inventoryProcessor())
            .writer(inventoryWriter())
            .build();
    }

    @Bean
    public Flow splitFlow() {
        return new FlowBuilder<SimpleFlow>("splitFlow")
            .split(taskExecutor())
            .add(flow1(), flow2(), flow3())
            .build();
    }

    @Bean
    public Flow flow1() {
        return new FlowBuilder<Simple

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 在spring中手写全局异常拦截器

    在spring中手写全局异常拦截器

    这篇文章主要介绍了如何在spring中手写全局异常拦截器,帮助大家更好的理解和使用spring框架,感兴趣的朋友可以了解下
    2020-11-11
  • JAVA开发环境搭建教程

    JAVA开发环境搭建教程

    这篇文章主要为大家详细介绍了JAVA开发环境搭建教程,配置JAVA开发环境,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-06-06
  • Java实现带GUI的气泡诗词效果

    Java实现带GUI的气泡诗词效果

    这篇文章主要为大家介绍了如何利用Java实现带GUI的气泡诗词效果,文中的示例代码讲解详细,对我们学习Java有一定帮助,感兴趣的可以了解一下
    2022-12-12
  • Java基于IO流读取文件的方法

    Java基于IO流读取文件的方法

    这篇文章主要介绍了Java基于IO流读取文件的方法,涉及Java文件流操作的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-10-10
  • 详解Java如何使用责任链默认优雅地进行参数校验

    详解Java如何使用责任链默认优雅地进行参数校验

    项目中参数校验十分重要,它可以保护我们应用程序的安全性和合法性。这篇文章主要介绍了如何使用责任链默认优雅地进行参数校验,需要的可以参考一下
    2023-03-03
  • SpringBoot实现热部署的三种方式

    SpringBoot实现热部署的三种方式

    本文主要介绍了SpringBoot实现热部署的三种方式,主要包括配置pom.xml文件,使用插件的执行命令mvn spring-boot:run启动项,使用springloader本地启动修改jvm参数,使用devtools工具包,感兴趣的可以了解一下
    2023-12-12
  • java并发编程实例分析

    java并发编程实例分析

    在本文里我们给大家分享了关于java并发编程实例分析以及相关知识点,需要的朋友们学习下。
    2019-03-03
  • Java编译和解释执行对比及原理解析

    Java编译和解释执行对比及原理解析

    这篇文章主要介绍了Java编译和解释执行对比及原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • Nacos配置文件使用经验及CAP原则详解

    Nacos配置文件使用经验及CAP原则详解

    这篇文章主要为大家介绍了Nacos配置文件使用经验及CAP规则详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-02-02
  • SpringBoot 集成短信和邮件的配置示例详解

    SpringBoot 集成短信和邮件的配置示例详解

    这篇文章主要介绍了SpringBoot 集成短信和邮件的相关知识,项目中使用lombok插件和swagger依赖,无相关依赖的请自行修改,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-04-04

最新评论