SpringBoot定时任务实现数据库数据同步全过程

 更新时间:2025年12月18日 09:18:50   作者:小小初霁  
文章详细介绍了从简单到企业级数据库同步需求的技术方案,包括选型、实现步骤、优化方案、异常处理策略、生产环境配置建议等

一、技术方案选型

1. 核心组件

  • Spring Scheduler:轻量级定时任务框架
  • Spring Data JPA:数据库操作(可替换为MyBatis)
  • Quartz:复杂调度需求(集群/持久化)
  • Spring Batch:大批量数据处理

2. 架构示意图

[定时触发器] -> [数据抽取] -> [数据转换] -> [数据加载] -> [结果通知]

二、基础实现步骤

1. 添加依赖

<!-- Spring Boot Starter Web (包含Scheduler) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Data JPA -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!-- 数据库驱动(示例使用MySQL) -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

2. 启用定时任务

@SpringBootApplication
@EnableScheduling
public class DataSyncApplication {
    public static void main(String[] args) {
        SpringApplication.run(DataSyncApplication.class, args);
    }
}

3. 实现定时任务类

@Component
public class DataSyncScheduler {
    
    private static final Logger logger = LoggerFactory.getLogger(DataSyncScheduler.class);
    
    @Autowired
    private SourceRepository sourceRepo;
    
    @Autowired
    private TargetRepository targetRepo;

    // 每天凌晨1点执行
    @Scheduled(cron = "0 0 1 * * ?")
    @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
    public void syncDataDaily() {
        try {
            logger.info("开始数据同步任务...");
            
            // 1. 获取增量数据
            LocalDateTime lastSyncTime = getLastSyncTime();
            List<SourceEntity> newData = sourceRepo.findByUpdateTimeAfter(lastSyncTime);
            
            // 2. 数据转换
            List<TargetEntity> transformedData = transformData(newData);
            
            // 3. 批量保存
            targetRepo.saveAll(transformedData);
            
            // 4. 更新同步时间
            updateLastSyncTime(LocalDateTime.now());
            
            logger.info("数据同步完成,处理记录数:{}", newData.size());
        } catch (Exception e) {
            logger.error("数据同步任务异常:", e);
            // 添加重试或报警逻辑
        }
    }
    
    // 数据转换方法
    private List<TargetEntity> transformData(List<SourceEntity> sourceList) {
        return sourceList.stream()
                .map(entity -> new TargetEntity(
                        entity.getId(),
                        entity.getName(),
                        entity.getData(),
                        LocalDateTime.now()))
                .collect(Collectors.toList());
    }
    
    // 获取上次同步时间(示例)
    private LocalDateTime getLastSyncTime() {
        // 可从数据库或缓存获取
        return LocalDateTime.now().minusDays(1);
    }
    
    // 更新同步时间
    private void updateLastSyncTime(LocalDateTime time) {
        // 持久化存储逻辑
    }
}

三、其他优化方案

1. 分布式锁机制

// 使用Redis实现分布式锁
@Scheduled(cron = "${sync.cron}")
public void distributedSyncTask() {
    String lockKey = "data_sync_lock";
    String requestId = UUID.randomUUID().toString();
    try {
        // 尝试获取锁
        boolean locked = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, requestId, 30, TimeUnit.MINUTES);
        
        if (locked) {
            // 执行同步逻辑
            performSync();
        }
    } finally {
        // 释放锁
        if (requestId.equals(redisTemplate.opsForValue().get(lockKey))) {
            redisTemplate.delete(lockKey);
        }
    }
}

2. 分页批量处理

private void batchSyncWithPagination() {
    int pageSize = 1000;
    int page = 0;
    
    Page<SourceEntity> dataPage;
    do {
        dataPage = sourceRepo.findAll(PageRequest.of(page, pageSize));
        List<TargetEntity> targetList = transformData(dataPage.getContent());
        targetRepo.saveAll(targetList);
        page++;
    } while (dataPage.hasNext());
}

3. 事务优化配置

# application.yml
spring:
  jpa:
    properties:
      hibernate:
        jdbc:
          batch_size: 500
        order_inserts: true
        order_updates: true

4. 性能监控配置

@Aspect
@Component
public class SyncMonitorAspect {
    
    @Around("@annotation(org.springframework.scheduling.annotation.Scheduled)")
    public Object monitorTask(ProceedingJoinPoint joinPoint) throws Throwable {
        long start = System.currentTimeMillis();
        try {
            return joinPoint.proceed();
        } finally {
            long duration = System.currentTimeMillis() - start;
            Metrics.timer("sync.task.duration")
                    .tag("task", joinPoint.getSignature().getName())
                    .record(duration, TimeUnit.MILLISECONDS);
        }
    }
}

四、异常处理策略

1. 重试机制

@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 5000))
public void performSync() {
    // 同步逻辑
}

@Recover
public void recoverSync(Exception e) {
    // 报警通知
    alertService.sendAlert("数据同步失败:" + e.getMessage());
}

2. 死信队列处理

// 使用Spring Retry + RabbitMQ实现
@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(3)
            .backOffOptions(1000, 2.0, 5000)
            .recoverer(new RepublishMessageRecoverer(rabbitTemplate, "dead-letter-exchange"))
            .build();
}

五、生产环境建议

  • 配置中心管理:将cron表达式放在配置中心实现动态调整
  • 多数据源配置:使用AbstractRoutingDataSource实现动态数据源切换
  • 版本控制:维护数据版本号实现幂等同步
  • 数据校验:添加MD5校验机制保证数据一致性
  • 监控告警:集成Prometheus + Grafana实现可视化监控

六、完整配置示例

# application.properties
# 定时任务配置
sync.cron=0 0 2 * * *
sync.batch-size=1000
sync.max-retry=3

# 数据源配置
spring.datasource.source.url=jdbc:mysql://source-db:3306/db
spring.datasource.source.username=user
spring.datasource.source.password=pass

spring.datasource.target.url=jdbc:mysql://target-db:3306/db
spring.datasource.target.username=user
spring.datasource.target.password=pass

七、常见问题排查

任务未执行

  • 检查@EnableScheduling是否启用
  • 确认cron表达式格式正确
  • 查看线程池配置

数据不一致

  • 检查事务隔离级别
  • 验证数据转换逻辑
  • 添加数据校验机制

性能瓶颈

  • 优化SQL查询(添加索引)
  • 调整批量提交大小
  • 增加JVM内存分配

内存溢出

  • 使用分页查询代替全量加载
  • 优化对象重用机制
  • 增加JVM堆内存

建议使用Arthas进行运行时诊断:https://arthas.aliyun.com

总结

通过以上方案,可以实现从简单到企业级的数据库同步需求。实际应用中应根据数据量级、同步频率和业务需求选择合适的实现策略,并建立完善的监控告警体系。

这些仅为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • java开发工作中对InheritableThreadLocal使用思考

    java开发工作中对InheritableThreadLocal使用思考

    这篇文章主要为大家介绍了java开发工作中对InheritableThreadLocal使用思考详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-11-11
  • Java数据结构贪心算法的实现

    Java数据结构贪心算法的实现

    本文主要介绍了Java数据结构贪心算法的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2007-03-03
  • java处理异常的机制关键字throw和throws使用解析

    java处理异常的机制关键字throw和throws使用解析

    这篇文章主要介绍了java处理异常的机制关键字throw和throws使用解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • java Wrapper类基本用法详解

    java Wrapper类基本用法详解

    在本篇文章里小编给大家整理的是一篇关于java Wrapper类基本用法详解,有兴趣的朋友们可以参考下。
    2021-01-01
  • 解决IDEA Maven下载依赖时报错ERROR - #org.jetbrains.idea.maven - Cannot reconnect.

    解决IDEA Maven下载依赖时报错ERROR - #org.jetbrains.ide

    这篇文章主要介绍了解决IDEA Maven下载依赖时报错ERROR - #org.jetbrains.idea.maven - Cannot reconnect.问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • Java使用sftp定时下载文件的示例代码

    Java使用sftp定时下载文件的示例代码

    SFTP 为 SSH的其中一部分,是一种传输档案至 Blogger 伺服器的安全方式。接下来通过本文给大家介绍了Java使用sftp定时下载文件的示例代码,感兴趣的朋友跟随脚本之家小编一起看看吧
    2018-05-05
  • 一篇文章带你入门java方法

    一篇文章带你入门java方法

    这篇文章主要介绍了java基础之方法详解,文中有非常详细的代码示例,对正在学习java基础的小伙伴们有非常好的帮助,需要的朋友可以参考下
    2021-08-08
  • Spring JDK动态代理实现过程详解

    Spring JDK动态代理实现过程详解

    这篇文章主要介绍了Spring JDK动态代理实现过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • Kryo序列化及反序列化用法示例

    Kryo序列化及反序列化用法示例

    这篇文章主要介绍了Kryo序列化及反序列化用法示例,小编觉得挺不错的,这里分享给大家,需要的朋友可以参考下。
    2017-10-10
  • 深入JVM剖析Java的线程堆栈

    深入JVM剖析Java的线程堆栈

    这篇文章主要介绍了深入JVM剖析Java的线程堆栈,Java中的堆内存和堆栈原理的应用等知识是深入学习Java的重点,需要的朋友可以参考下
    2015-07-07

最新评论