使用Canal和Kafka解决MySQL与缓存的数据一致性问题

 更新时间:2024年07月28日 09:47:02   作者:Master_hl  
这篇文章主要介绍了使用Canal和Kafka解决MySQL与缓存的数据一致性问题,文中通过图文结合的方式给大家介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下

1. 准备工作

1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud    # 监控 AI Cloud 项目

如果要同步多个项目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重启MySQL服务

3. 赋值数据同步权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安装并配置 Canal

下载地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),

可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据

(主节点的 slaveId = 1)

address 配置连接本地的 MySQL

topic 配置数据发送到 Kafka 的某个主题下

5. 拷贝 Jar 包到 lib

将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。

6. 删除 bin 目录下 startup.bat 里的参数

如果启动时报错:

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

删除 -XX:PermSize=128m 参数即可。

7. 启动 canal

打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车

2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:

/**
 * 获取历史聊天记录(对话/绘图)
 *
 * @param type
 * @return {@link ResponseEntity }
 */
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {
    String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);
    Object list = redisTemplate.opsForValue().get(listCacheKey);
    if (ObjectUtil.isNull(list)) {
        LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());
        queryWrapper.eq(Answer::getType, type);
        queryWrapper.eq(Answer::getModel, model);
        queryWrapper.orderByDesc(Answer::getAid);
        List<Answer> answerList = answerService.list(queryWrapper);
        List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());
        Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));
        List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());
        // 缓存 1 天
        redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);
        return ResponseEntity.success(answerVoList);
    } else {
        return ResponseEntity.success(list);
    }
}
/**
 * 查询列表存储 Redis 缓存
 *
 * @param uid
 * @param model
 * @param type
 * @return {@link String }
 */
public static String getListCacheKey(Long uid, Integer model, Integer type) {
    return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 监听 Kafka Topic 中数据并删除 Redis 缓存

首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的  topic 并针对 data 中的数据进行一个提取,得到一个  cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。

【代码示例】

/**
 * canal 监控 binlog 日志,将修改的数据存储 kafka topic 中
 * 监听 kafka topic 中的数据
 *
 * @param data
 * @param ack
 * @throws JsonProcessingException
 */
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {
    HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);
    if (map.isEmpty()) {
        ack.acknowledge();
        return;
    }
    // 匹配上对应的数据库和数据表
    if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&
            KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {
        // 更新缓存 
        List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);
        if (!CollectionUtils.isEmpty(list)) {
            for (Map<String, Object> answerMap : list) {
                String answerListCacheKey = RedisUtil.getListCacheKey(
                        Long.valueOf(answerMap.get("uid").toString()),
                        Integer.parseInt(answerMap.get("model").toString()),
                        Integer.parseInt(answerMap.get("type").toString()));
                // 删除缓存,让下一次查询走数据库,并同步缓存
                redisTemplate.delete(answerListCacheKey);
            }
        }
    }
    //  手动确认应答
    ack.acknowledge();
}
/**
 * canal 同步数据到 kafka
 */
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";
 
 
/**
 * 数据库,缓存数据一致性的
 */
 
public static final String DATABASE_KEY = "database";
 
public static final String TABLE_KEY = "table";
 
public static final String DATA_KEY = "data";
 
public static final String TARGET_DATABASE = "aicloud";
 
public static final String TARGET_TABLE = "answer";

【补充】

kafka ui 下载地址:​​​​​​https://github.com/provectus/kafka-ui/tags

修改配置

kafka:
  clusters:
    - name: kafka3_cluster
      bootstrapServers: 127.0.0.1:9092

以上就是使用Canal和Kafka解决MySQL与缓存的数据一致性问题的详细内容,更多关于MySQL与缓存的数据一致性的资料请关注脚本之家其它相关文章!

相关文章

  • mysql中insert与select的嵌套使用解决组合字段插入问题

    mysql中insert与select的嵌套使用解决组合字段插入问题

    本节主要介绍了mysql中insert与select的嵌套使用解决组合字段插入问题,需要的朋友可以参考下
    2014-07-07
  • MySQL8.0.28安装教程详细图解(windows 64位)

    MySQL8.0.28安装教程详细图解(windows 64位)

    如果电脑上已经有MySQL数据库再进行重做往往会遇到问题,下面这篇文章主要给大家介绍了关于windows 64位系统下MySQL8.0.28安装教程的详细教程,文章通过图文介绍的非常详细,需要的朋友可以参考下
    2023-04-04
  • MySQL中UPDATE JOIN语句的使用详细

    MySQL中UPDATE JOIN语句的使用详细

    UPDATE JOIN是MySQL中一种结合UPDATE语句和JOIN操作的技术,本文主要介绍了MySQL中UPDATE JOIN语句的使用详细,具有一定的参考价值,感兴趣的可以了解一下
    2024-04-04
  • MySQL不同表之前的字段复制

    MySQL不同表之前的字段复制

    今天小编就为大家分享一篇关于MySQL不同表之前的字段复制,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • mysql中InnoDB事务隔离的记录锁、间隙锁和临键锁

    mysql中InnoDB事务隔离的记录锁、间隙锁和临键锁

    mysql中InnoDB默认的事务隔离级别为可重复读(Repeated Read, RR),我们当下的所有介绍都是基于这个隔离级别为前提的,记录锁锁定索引关联的具体记录,间隙锁锁定间隔,防止间隔中被其他事务插入,临键锁锁定索引记录+间隔,防止幻读
    2023-12-12
  • mysql数据库密码忘记解决方法

    mysql数据库密码忘记解决方法

    大家好,本篇文章主要讲的是mysql数据库密码忘记解决方法,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览
    2021-12-12
  • mysql复制data文件迁移的实现步骤

    mysql复制data文件迁移的实现步骤

    有时候,我们需要迁移整个数据库,包括数据文件,本文将介绍如何通过复制MySQL的data文件来完成数据库迁移,具有一定的参考价值,感兴趣的可以了解一下
    2023-11-11
  • 关于MySQL 大批量插入时如何过滤掉重复数据

    关于MySQL 大批量插入时如何过滤掉重复数据

    这篇文章主要介绍关于MySQL 大批量插入时如何过滤重复数据,比如线上库有6个表存在重复数据,其中2个表比较大,96万+和30万+,因为之前处理过相同的问题,就直接拿来了上次的Python去重脚本,脚本很简单,就是连接数据库,查出来重复数据,循环删除,需要的朋友可以参考下
    2021-09-09
  • mysql和oracle默认排序的方法 - 不指定order by

    mysql和oracle默认排序的方法 - 不指定order by

    这篇文章主要介绍了mysql和oracle默认排序的方法 - 不指定order by。具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07
  • Mysql的增删改查语句简单实现

    Mysql的增删改查语句简单实现

    这篇文章主要介绍了Mysql的增删改查语句简单实现的相关资料,需要的朋友可以参考下
    2017-04-04

最新评论