基于Java实现一个简单的数据同步组件

 更新时间:2023年06月19日 10:07:18   作者:磊叔的技术博客  
这篇文章主要为大家详细介绍了如何基于Java实现一个简单的数据同步组件,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以了解一下

目前关于数据同步的组件开源社区有很多,如:Flink CDC, DataX, seaTunel,Kattle 等等,大体上可以分为两种:基于日志的和基于 JDBC 的。这些同步组件在进行整库同步或者 schema 差异性不大的情况下通过可视化界面或者配置文件映射的方式可以直接达到我们库-库的同步诉求,但是对于一些定制化较大的场景处理起来还比较麻烦。为了满足这样的一个场景,笔者写了一个小的同步组件。

PS: 我们的业务场景比较特殊,源的种类比较多,有 Oracle、Mysql、文件以及 API 接口等。另外一点是需要同步的数据量不是很大,引入额外的数据同步组件对于我们来说会有额外的运维成本和学习成本,因此基于上述两个点,决定自己写一个小的组件。

业务框架

两个基本需求

  • 支持双写,即外部源经过数据同步组件,一方面是根据自定义的标准化模型转换成需要的专题数据;另一方面是需要将原始数据原封不动的重新落到我们自己的库中;
  • 能够支持将专题库数据通过逆向转换,写成不同的原始库的数据格式

组件模型

以 Mysql 为例,同步组件的主要模块如下:

  • 1、绿色背景是业务插件,每个业务都会对应一个业务插件,编写同步器 Syncer 和 转换器 Convertor
  • 2、点虚线是一条单独的通道,用于将目标库数据转换成不同的原始库数据
  • 3、借助了一些队列实现了简单的生产者消费者模型,队列作为一个缓冲区,以便于后续实现对于读取原始库和写目标库的控制。

项目模块划分如下:

  • api-web 对外提供 api 服务,能够发起数据同步任务、取消数据同步任务、暂停数据同步任务和取消数据同步任务等接口
  • common 是一些公共的工具、模型、枚举。
  • connectors 连接器的具体实现,比如 MysqlConnector 就是基于 JDBC API 实现对原始库的数据读。
  • core 核心包,主要包括一些接口的定义和逻辑处理的标准化流程
  • executors 执行器层,包括任务管理、线程池资源管理等
  • plugins 具体业务插件,主要使用 Syncer 同步器和 Convertor 模型转换器

核心问题

针对数据同步组件,解决的核心问题可以抽象成如下模型:A_DB -> A -> B -> B_DB;即将 A 数据库中的数据读取出来之后转换成 A class instance,然后将 A class instance 转换成 B class instance,再将 B class instance 写到 B 数据库。

解决 A_DB 到 A

从 A_DB -> A 或者 B -> B_DB 这个过程,就是我们所熟知的 ORM 解决的问题;不管是 hibernate、mybatis 还是 SpringBoot JPA 都是围绕着这个问题展开的。

在本篇的组件中,因没有引入 ORM,所以将数据库行映射成一个 java 对象也需要自己实现。DataX 中是通过配置文件来描述的,在本篇中没有才采用这种描述方式,而是通过语言耦合性更高的注解的方式来实现的(由业务属性决定);

如下是一个描述具体业务的 Java 对象的定义,@Table 注解用来描述 JmltModel 和哪个表是关联的, @Colum 注解用来描述属性是和哪个字段关联的.

@Data
// @Table 注解用来描述 JmltModel 和哪个表是关联 的
@Table(name = "user_info") 
public class JmltModel implements Serializable {
    // @Colum 注解用来描述属性是和哪个字段关联的
    @Colum(name = "id")
    private Long id;
    @Colum(name = "email")
    private String email;
    @Colum(name = "name")
    private String name;
    @Colum(name = "create_time")
    private Date create_time;
}

有了这个描述关系,即可以在 runtime 时通过泛型 + 反射来实现 A_DB -> A 过程的模板设计。

MysqlConnector 的实现来进行说明,下面抽取了 MysqlConnector 组件中的部分代码(做了一些删减);下面这段代码中有 1-6 6 的步骤,这部分属于生产端,即从原始 Mysql 表中分页读取数据,并将读取到的数据映射成实际的对象,再通过业务定义的 convertor 转换成目标的对象,最后丢到队列中去等待消费。

// 1、originClass 是原始库对象,这里通过反射获取 Table 注解,从而拿到表名
Table table = (Table) originClass.getDeclaredAnnotation(Table.class);
String tableName = table.name();
// 2、计算所有的条数,然后按照分页的方式进行 fetch
SqlTemplate sqlTemplate = new SqlTemplate(this.originDataSource);
int totalCount = sqlTemplate.count();
RowBounds rowBounds = new RowBounds(totalCount);
int totalPage = rowBounds.getTotalPage();
// 3、这里是按分页批量拉取
for (int i = 1; i <= totalPage; i++) {
    int offset = rowBounds.getOffset(i);
    String condition = " limit " + offset + "," + rowBounds.getPageSize();
    ResultSet resultSet = sqlTemplate.select(condition);
    // 4、将 ResultSet 转成 A 这里就是从 A_DB 到 A 的过程
    ResultSetExtractor<R> extractor = (ResultSetExtractor<R>) new ResultSetExtractor<>(originClass);
    List<R> result = extractor.extractData(resultSet);
    // 5、将 A 转成 B
    List<T> targetResult = convertor.batchConvertFrom(result);
    // 6、丢到队列中去等待消费
    this.rowObjectManager.pushToQueue(targetResult);
}

上述代码片段中的 4 ,就是从 A_DB 到 A 的过程,实际上这部分是 ResultSet 到 Java 对象的过程。一般情况下,我们基于 JDBC API 编程时, ResultSet 到 Java,对于业务来说是非常明确的。大体是这样:

String selectSql = "SELECT * FROM employees"; 
try (ResultSet resultSet = stmt.executeQuery(selectSql)) { 
List<Employee> employees = new ArrayList<>(); 
while (resultSet.next()) 
{ 
    Employee emp = new Employee(); 
    emp.setId(resultSet.getInt("emp_id"));
    emp.setName(resultSet.getString("name"));
    emp.setPosition(resultSet.getString("position")); 
    emp.setSalary(resultSet.getDouble("salary")); 
    employees.add(emp); 
} 

这种从对于明确 Java 对象的情况下是可以的;但是对于一个通用组件来说肯定无法满足。那么就需要解决如何将 ResultSet 转换成 Java 对象变得的更普适一些。思路是:ResultSet -> Map -> Java Object,通过 ResultSet 的 getMetaData 可以取到所有的列名(K)和值(V),并将其存储到 Map 中,代码如下:

/**
 * 将 resultSet 转成 Map
 *
 * @param resultSet
 * @return
 * @throws SQLException
 */
private Map<String, Object> resultSetToMap(ResultSet resultSet) throws Exception {
    Map<String, Object> resultMap = new HashMap<>();
    // 获取 ResultSet 的元数据
    int columnCount = resultSet.getMetaData().getColumnCount();
    // 遍历每一列,将列名和值存储到 Map 中
    for (int i = 1; i <= columnCount; i++) {
        String columnName = resultSet.getMetaData().getColumnName(i);
        Object value = resultSet.getObject(i);
        resultMap.put(columnName, value);
    }
    return resultMap;
}

接着是将 Map 转换成 Java 对象,当然在框架层面的实现上,这里通过泛型机制来实现更加通用的场景:

private T mapResultSetToObject(Map<String, Object> resultMap, Class<T> objectType) throws Exception {
    // 通过目标对象类型构建一个对象
    T object = objectType.newInstance();  
    // 将 map 的 key 作为 field 的名字,map 的 value 作为 field 的值
    for (Map.Entry<String, Object> entry : resultMap.entrySet()) {  
        String fieldName = entry.getKey();  
        Object value = entry.getValue();  
        try {  
            Field declaredField = objectType.getDeclaredField(fieldName);  
            declaredField.setAccessible(true);  
            declaredField.set(object, value);  
        } catch (NoSuchFieldException e) {  
            LOGGER.error("ignore exception, fieldName: " + fieldName + ", objectType: " + objectType);  
        }  
    }
    // 完成对象的填充并返回
    return object;  
}

从 A 到 B 属于业务自己定义的,即 Convertor 部分,下面是 Convertor 的接口定义,业务实现此接口用于实现对象到对象的转换(包括批量转换)

// T 是目标对象类型,R 是原始对象类型
public interface Convertor<T, R> {  
/**  
* 将 T 转换成 R  
*  
* @param origin  
* @return  
*/  
T convertFrom(R origin) throws SQLException;  
/**  
* 将 R 转换成 T  
*  
* @param target  
* @return  
*/  
R convertTo(T target);  
/**  
* 批量转换  
* @param origin  
* @return  
* @throws SQLException  
*/  
List<T> batchConvertFrom(List<R> origin) throws SQLException;  
/**  
* 批量转换将 R 转换成 T  
*  
* @param target  
* @return  
*/ 
List<R> batchConvertTo(List<T> target);

从 B 到 B_DB

前面是 A_DB 到 A 再到 B 的过程,那么接下来就是 B 到 B_DB 的过程。从前面的流程图中可以看出,组件中借助了队列。本篇文章的实现是基于 Disruptor 实现的。在 Convertor 中 A-> B 的逻辑完成之后,就会将 B 的 List 丢到 Disruptor 的 ringBuffer 中等待消费。消费逻辑如下:

public void onEvent(RowObjectEvent rowObjectEvent, long sequence, boolean endOfBatch) {  
    // targetResult  
    List<T> targetResult = (List) rowObjectEvent.getRowObject();  
    // 将 T 写到 目标库  
    Connection tc = null;  
    PreparedStatement pstm = null;  
    try {  
        // 下面即为获取连接,创建 prepareStatement 和执行
        tc = this.targetDataSource.getConnection();  
        SqlTemplate<T> sqlTemplate = new SqlTemplate<>(this.targetDataSource); 
        sqlTemplate.setObj(targetResult.get(0));  
        String sql = sqlTemplate.createBaseSql();  
        pstm = tc.prepareStatement(sql);  
        for (T item : targetResult) {  
            Object[] objects = sqlTemplate.createInsertSql(item);  
            for (int i = 1; i <= objects.length; i++) {  
                if (objects[i - 1] instanceof Long) {  
                    objects[i - 1] = (Long) objects[i - 1] + 1;  
                }  
                pstm.setObject(i, objects[i - 1]);  
            } 
            // 这里执行时批量操作的
            pstm.addBatch();  
        }  
        pstm.executeUpdate();  
    } catch (Exception e) {  
        // ignore some code...
    }  
}

通过上述几个片段,大体阐述了数据同步的过程,以及本篇所实现的组件的部分核心逻辑。

总结

本篇中的代码片段是比较零碎的,其中有相关部分的逻辑并没有在本篇中体现;本篇的主要目的是为了阐述基于 JDBC API 如何实现同步(实际上不仅仅是基于 JDBC 和面向关系型数据库),并且为围绕 DB -> A -> B -> DB 这样的思路给出了每一步实现的代码,有兴趣的同学可以尝试自己实现一个数据同步组件。

以上就是基于Java实现一个简单的数据同步组件的详细内容,更多关于Java数据同步组件的资料请关注脚本之家其它相关文章!

相关文章

  • java调用未知类的指定方法简单实例

    java调用未知类的指定方法简单实例

    这篇文章介绍了java调用未知类的指定方法简单实例,有需要的朋友可以参考一下
    2013-09-09
  • java堆排序原理与实现方法分析

    java堆排序原理与实现方法分析

    这篇文章主要介绍了java堆排序原理与实现方法,结合实例形式分析了java堆排序的相关原理、实现方法与操作注意事项,需要的朋友可以参考下
    2018-12-12
  • java实现百度云文字识别接口代码

    java实现百度云文字识别接口代码

    这篇文章主要为大家详细介绍了java实现百度云文字识别的接口代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-11-11
  • 小白必看toString(),String.valueOf,(String)强转

    小白必看toString(),String.valueOf,(String)强转

    在Java中,往往需要把一个类型的变量转换成String 类型,本文主要介绍了toString(),String.valueOf,(String)强转,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-06-06
  • idea中service或者mapper引入报红的问题及解决

    idea中service或者mapper引入报红的问题及解决

    在使用IntelliJ IDEA开发SpringBoot项目时,有时会遇到Service或Mapper接口引入时报红但不影响项目运行的情况,这主要是因为IDEA的检查级别设置问题,解决方法是将有问题的Error级别改为编译通过的安全级别,即可消除报红
    2024-09-09
  • 简单聊聊Java程序中的换行符

    简单聊聊Java程序中的换行符

    Java程序中的换行符一般使用“\n”表示,它是一个转义字符,表示换行符。根据操作系统的不同,换行符的实际表示可能不同,本文就来简单聊聊他们的区别与使用吧
    2023-03-03
  • jwt原理及Java中实现过程

    jwt原理及Java中实现过程

    JWT是无状态认证的JSON令牌,包含头部、载荷和签名,用于身份验证与权限管理,需注意设置过期时间、使用非对称算法、安全存储传输,及合理刷新策略,确保系统安全与效率
    2025-08-08
  • Spring中基于xml的AOP实现详解

    Spring中基于xml的AOP实现详解

    这篇文章主要介绍了Spring中基于xml的AOP实现详解,基于xml与基于注解的AOP本质上是非常相似的,都是需要封装横切关注点,封装到切面中,然后把横切关注点封装为一个方法,再把该方法设置为当前的一个通知,再通过切入点表达式定位到横切点就可以了,需要的朋友可以参考下
    2023-09-09
  • Springboot ApplicationRunner的使用解读

    Springboot ApplicationRunner的使用解读

    这篇文章主要介绍了Springboot ApplicationRunner的使用解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-05-05
  • SpringBoot日志文件的实现示例

    SpringBoot日志文件的实现示例

    日志是程序中的重要组成部分,使用日志可以快速的发现和定位问题,本文主要介绍了SpringBoot日志文件的实现示例,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-08-08

最新评论