SpringBatch结合SpringBoot简单使用实现工资发放批处理操作方式

 更新时间:2023年09月11日 15:59:56   作者:回炉重造P  
这篇文章主要介绍了SpringBatch结合SpringBoot简单使用实现工资发放批处理操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

最近有接触到批处理相关的需求,学习了下SpringBatch的使用方法。SpringBatch能把复杂的批处理任务进行step分解,并能通过reader和writer满足不同来源数据的处理需求,支持在step定义时设置异常重试策略等,比较方便拓展。

简单记录下基于SpringBoot写的使用demo。

需求

两张表,user_with_role和role_num,分别有user信息和工资流水信息,role_num冗余。

user_with_role表,包含员工信息和role字段信息

CREATE TABLE `user_with_role` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `username` VARCHAR(45) NULL,
  `role` VARCHAR(45) NULL,
  PRIMARY KEY (`id`));

role_num表,包含发钱信息

CREATE TABLE `role_num1` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `role` VARCHAR(45) NULL,
  `account` INT NULL,
  `username` VARCHAR(45) NULL,
  `inputtime` VARCHAR(45) NULL,
  PRIMARY KEY (`id`));

pom设置和application配置文件

在springboot的基础上使用batch,pom.xml中增加dependency

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

还需要加上jdbc依赖,不过这个一般都有加。

application中增加:

spring:
  batch:
    initialize-schema: always

这个配置主要是让springbatch自动在数据库中创建运行所需的表,SpringBoot2.7版本后修改为 spring.batch.jdbc.initialize-schema 了,不过我还在用老土2.3,先这样吧。

bean类

UserWithRole

@Data
public class UserWithRole {
    private int id;
    private String userName;
	private String role;
}

RoleNum

@Data
public class RoleNum {
    private int id;
    private String role;
    private int account;
    private String userName;
	private String inputTime;
}

关键job配置类

包含reader,writer,step和整体job的配置,通过@Bean注解的方法来进行spring管理,下面一步步来。

@Configuration
@EnableBatchProcessing
// batch job设置
// 将user_with_role中的role_user工资信息,输出到role_num表中
public class RoleCountingBatchJobConfig {
    //注入任务对象工厂
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    //任务的执行由Step决定,注入step对象的factory
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DataSource dataSource;
    @Autowired
    private NamedParameterJdbcTemplate namedParameterJdbcTemplate;

SpringBatch默认会构建两个Factory用来进行job构建,DataSource默认为Spring配置的数据库,NamedParameterJdbcTemplate由Spring提供,用来将statement中的问号占位符改为利用命名参数映射更方便,不过我后面没用。

    // 设置itemReader
    @Bean
    public JdbcCursorItemReader<UserWithRole> getItemReader(){
        JdbcCursorItemReader<UserWithRole> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(dataSource);//设置数据源
        // 实体映射
        itemReader.setRowMapper(new RowMapper<UserWithRole>() {
            @Override
            public UserWithRole mapRow(ResultSet rs, int rowNum) throws SQLException {
                UserWithRole uwr = new UserWithRole();
                uwr.setId(rs.getInt("id"));
                uwr.setUserName(rs.getString("userName"));
                uwr.setRole(rs.getString("role"));
                return uwr;
            }
        });
        String sql = "select u.id as id, u.username as userName, u.role as role from user_with_role as u";
        itemReader.setSql(sql);
        return itemReader;
    }

itemReader 用的是 JdbcCursorItemReader ,用来从user_with_role表中读取数据,主要是配置 RowMapper ,将行数据封装成 UserWithRole 对象,sql就是简单查询。

    // 设置itemWriter
    @Bean
    public JdbcBatchItemWriter<RoleNum> getItemWriter(){
        JdbcBatchItemWriter<RoleNum> itemWriter = new JdbcBatchItemWriter<>();
        itemWriter.setDataSource(dataSource);
        itemWriter.setJdbcTemplate(namedParameterJdbcTemplate);
        String sql = "insert into role_num(role, account, username, inputtime) values(?, ?, ?, ?)";
        itemWriter.setSql(sql);
        itemWriter.setItemPreparedStatementSetter(new RoleNumPreparedStatementSetter());
        return itemWriter;
    }

itemWriter 用的是 JdbcBatchItemWriter ,用来把RoleNum对象存到数据库中,主要的参数写入逻辑写在 RoleNumPreparedStatementSetter 类中。

public class RoleNumPreparedStatementSetter implements ItemPreparedStatementSetter<RoleNum> {
    // 将role_num的批处理计算结果存入的sql
    @Override
    public void setValues(RoleNum roleNum, PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, roleNum.getRole());
        preparedStatement.setInt(2, roleNum.getAccount());
        preparedStatement.setString(3, roleNum.getUserName());
        preparedStatement.setString(4, roleNum.getInputTime());
    }
}

RoleNumPreparedStatementSetter 主要是将参数通过占位符放到sql中。

    // 设置process
    @Bean
    public RoleUserCountingProcess getProcess(){
        return new RoleUserCountingProcess();
    }
public class RoleUserCountingProcess implements ItemProcessor<UserWithRole, RoleNum> {
    // 将user的信息转换成roleNum信息,当作发工资
    @Override
    public RoleNum process(UserWithRole userWithRole) throws Exception {
        RoleNum newRoleNum = new RoleNum();
        newRoleNum.setUserName(userWithRole.getUserName());
        newRoleNum.setRole(userWithRole.getRole());
        newRoleNum.setInputTime(new LocalDateTime(new Date()).toString());
        if(userWithRole.getRole()==null)
            return newRoleNum;
        switch (userWithRole.getRole()) {
            case "employee":
                newRoleNum.setAccount(100);
                break;
            case "manager":
                newRoleNum.setAccount(10000);
                break;
            case "boss":
                newRoleNum.setAccount(100000);
                break;
        }
        System.out.println(newRoleNum.getUserName() + "---" + newRoleNum.getAccount());
        return newRoleNum;
    }
}

process中定义了具体的工资写入逻辑,实现了 ItemProcess 接口的process方法,将itemReader的输出UserWithRole类和itemWriter的输入RoleNum类进行连接,从而完成process过程。

    // 设置step
    @Bean
    public Step getStep(){
        return stepBuilderFactory.get("user_role_convert_step")
                .<UserWithRole, RoleNum>chunk(10)
                .reader(getItemReader())
                .processor(getProcess())
                .writer(getItemWriter())
                .build();
    }
    // 获取job对象
    @Bean
    public Job RoleCountingBatchJob(JobBuilderFactory jobBuilders,
                                    StepBuilderFactory stepBuilders){
        return jobBuilders.get("user_role_convert_step")
                .start(getStep())
                .build();
    }

最后定义step和job整体流程。

在配置文件中加入:

spring:
  batch:
    job:
      enabled: false

可以阻止job在项目启动时自动执行。

controller启动jobInstance

@Controller
public class BatchRunnerController {
    @Autowired
    JobLauncher jobLauncher;
    // 自定义job任务
    @Autowired
    Job roleCountingBatchJob;
    @RequestMapping("/countingRoleUser")
    @ResponseBody
    public String countingRollUser() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        // 设置时间parameter来区分instance
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addDate("startDate", new Date());
        jobLauncher.run(roleCountingBatchJob, jobParametersBuilder.toJobParameters());
        return "role user counting batch success --- " +
                Objects.requireNonNull(jobParametersBuilder.toJobParameters().getDate("startDate")).toString();
    }
}

利用 jobLauncher 来执行对应的job,date时间来区分不同的jobInstance。

实现效果

在这里插入图片描述

user_with_role表

在这里插入图片描述

批处理job正常执行

在这里插入图片描述

duideduide没有role所以没发到工资

总结

本文简单记录了下简单的springbatch的使用,包括item相关,process编写,step构建和最终的job使用。springbatch还有一点是可以设置出现异常的处理策略,比如容忍数次异常,调过某些异常等,在真实使用中比较灵活,有机会再补充。

好了,以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

最新评论