Golang实现Redis事务深入探究

 更新时间:2024年01月24日 09:33:20   作者:绍纳 nullbody笔记  
这篇文章主要介绍了Golang实现Redis事务深入探究,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

用11篇文章实现一个可用的Redis服务,姑且叫EasyRedis吧,希望通过文章将Redis掰开撕碎了呈现给大家,而不是仅仅停留在八股文的层面,并且有非常爽的感觉,欢迎持续关注学习。

  • [x] easyredis之TCP服务
  • [x] easyredis之网络请求序列化协议(RESP)
  • [x] easyredis之内存数据库
  • [x] easyredis之过期时间 (时间轮实现)
  • [x] easyredis之持久化 (AOF实现)
  • [x] easyredis之发布订阅功能
  • [x] easyredis之有序集合(跳表实现)
  • [x] easyredis之 pipeline 客户端实现
  • [x] easyredis之事务(原子性/回滚)
  • [ ] easyredis之连接池
  • [ ] easyredis之分布式集群存储

EasyRedis事务

本篇实现的Redis命令如下:

Multi : 开启事务
Exec [rb]:执行事务
Watch key [key...]:监视key在事务执行之前不能发生变更,如果事务执行时发现有变更,事务终止执行
Unwatch:取消监视
Discard: 退出事务

如何实现事务的原子性和回滚?

  • 这里我们可以联想MYSQL如何实现事务的原子性。当mysql的存储引擎采用Innodb并且事务的隔离级别为可重复读(RR)的时候, MYSQL的读可以分成【当前读】和【快照读】;【当前读】采用的机制就是加锁。【快照读】的机制就是利用MVCC的原理,在开启事务的时候,生成一个事务版本,并根据记录中的undo log链表中的每个记录的事务版本号比对,判断自己是否可以读写该条记录(这个相当于无锁)。如果对MYSQL的八股文不了解的可以看这篇文章 https://xiaolincoding.com/mysql/transaction/mvcc.html
  • 回滚如何实现:很简单,比如本次我们要插入一条记录,回滚就删除一条记录。如果是修改一条记录,那就把修改前的记录记下来,回滚就将记录恢复回去即可。如果是删除记录,那就把记录先记录下来,回滚就是再插入回去即可。

在本次的项目中,我们将使用【锁】保证原子性,使用【记录原始数据】的方式实现回滚。

我们依次看下各个命令的关键代码:

本篇代码的入口位于engine/database.go

func (db *DB) Exec(c abstract.Connection, redisCommand [][]byte) protocol.Reply {
	cmdName := strings.ToLower(string(redisCommand[0]))
	if cmdName == "multi" {
		iflen(redisCommand) != 1 {
			return protocol.NewArgNumErrReply(cmdName)
		}
		return StartMulti(c) // 开启事务
	} elseif cmdName == "discard" {
		iflen(redisCommand) != 1 {
			return protocol.NewArgNumErrReply(cmdName)
		}
		return DiscardMulti(c) // 取消事务
	} elseif cmdName == "watch" {
		return Watch(db, c, redisCommand[1:]) // 监视watch key [key...]
	} elseif cmdName == "unwatch" {
		iflen(redisCommand) != 1 {
			return protocol.NewArgNumErrReply(cmdName)
		}
		return UnWatch(db, c) // 取消监视
	} elseif cmdName == "exec" {
		return ExecMulti(db, c, redisCommand[1:]) // 执行事务
	}
	// **事务模式** 将命令入队到命令缓冲队列中
	if c != nil && c.IsTransaction() {
		return EnqueueCmd(c, redisCommand)
	}
	// ** 普通模式 **
	return db.execNormalCommand(c, redisCommand)
}

multi 开启事务

事务的执行是基于当前的socket连接,所以设定开启事务,其实就是设定socket连接为开始事务状态(设定为true)

// 开启事务
func StartMulti(c abstract.Connection) protocol.Reply {
	if c.IsTransaction() {
		return protocol.NewGenericErrReply("multi is already start,do not repeat it")
	}
	// 设定开启
	c.SetTransaction(true)
	return protocol.NewOkReply()
}

// socket连接
func (k *KeepConnection) SetTransaction(val bool) {
	if !val { // 取消事务模式,清空队列和watch key
		k.queue = nil
		k.watchKey = nil
		k.txErrors = nil
	}
	// 开启事务状态
	k.trx.Store(val)
}

一旦开启事务以后,后续的redis命令不再执行直接,而是会缓存起来。最后在执行exec命令的时候一次性批量执行。

//... 省略
// **事务模式** 将命令入队到命令缓冲队列中
	if c != nil && c.IsTransaction() {
		return EnqueueCmd(c, redisCommand)
	}
//... 省略

discard 取消事务

可以看到和上面的开启是一个相反的操作,设置为false

// 取消事务
func DiscardMulti(c abstract.Connection) protocol.Reply {
	if !c.IsTransaction() {
		return protocol.NewGenericErrReply("DISCARD without MULTI")
	}
	// 取消开启
	c.SetTransaction(false)
	return protocol.NewOkReply()
}

watch监视

这个命令平时应该用的不多。含义:监视key,希望在执行命令的时候没有发生过变化。例如我出门之前,我在门上做了一个标记,如果在我出门之后,门口的标记没有变动过,说明在我出门的这个期间没有人来过。否则就认定有人来过我家。我就赶快报警,不进门了。那这个标记的实现方案就是利用版本号的机制,针对key记录一个值,执行命令的时候,比对下该值(版本号)有没有变化即可。

实际变更版本号的代码,继续往下看就可以看到。

// 监视 key [key...]
func Watch(db *DB, conn abstract.Connection, args [][]byte) protocol.Reply {
	iflen(args) < 1 {
		return protocol.NewArgNumErrReply("WATCH")
	}
	if conn.IsTransaction() {
		return protocol.NewGenericErrReply("WATCH inside MULTI is not allowed")
	}
	watching := conn.GetWatchKey()
	for _, bkey := range args {
		key := string(bkey)
		watching[key] = db.GetVersion(key) // 保存当前key的版本号(利用版本号机制判断key是否有变化)
	}
	return protocol.NewOkReply()
}

Unwatch 取消监视

// 清空watch key
func UnWatch(db *DB, conn abstract.Connection) protocol.Reply {
	conn.CleanWatchKey()
	return protocol.NewOkReply()
}

Exec 执行事务

批量执行命令,这里我自己设计了一个rb参数(标准里面没有)表示如果执行错误,自动回滚。默认情况下,命令会全部执行,出现错误不会回滚

// 执行事务  exec rb
func ExecMulti(db *DB, conn abstract.Connection, args [][]byte) protocol.Reply {
	// 说明当前不是【事务模式】
	if !conn.IsTransaction() {
		return protocol.NewGenericErrReply("EXEC without MULTI")
	}
	// 执行完,自动退出事务模式
	defer conn.SetTransaction(false)
	// 如果在入队的时候,就有格式错误,直接返回
	iflen(conn.GetTxErrors()) > 0 {
		return protocol.NewGenericErrReply("EXECABORT Transaction discarded because of previous errors.")
	}
	// 是否自动回滚(这里是自定义的一个参数,标准redis中没有)
	isRollBack := false
	iflen(args) > 0 && strings.ToUpper(string(args[0])) == "RB" { // 有rb参数,说明要自动回滚
		isRollBack = true
	}
	// 获取所有的待执行命令
	cmdLines := conn.GetQueuedCmdLine()
	return db.execMulti(conn, cmdLines, isRollBack)
}

前面就是一个基本的状态判断,最后真正执行的函数是db.execMulti(conn, cmdLines, isRollBack)代码如下:

  • 在代码执行前,将所有命令的key全部都提取出来,分成【读key】【写key】。例如set key value这里的key就是写key. get key这里的key就是读key。【写key】就是对数据会进行变更,【读key】只是读,不会对值发生变更。
  • 如果有watch key看下版本号是否发生改变
  • 接下来最重要的来了,就是对所有的key上锁(原子性),保证所有的命令一起执行(读key上读锁,写key上写锁)
  • 锁上完以后,就执行所有的命令。如果需要回滚,那么在命令执行之前,将原始的数据通过db.GetUndoLog(cmdLine)函数记录原始数据的回滚命令。
  • 如果需要执行回滚命令(就【倒序】执行所有的回滚命令)。还记得上面有对key加锁吧,所以这里是可以直接执行回滚(因为锁保证原子性)
  • 最后对【写key】更新版本号(表示发生了状态变更),这里就和前面的记录版本号对应起来了
// 执行事务:本质就是一堆命令一起执行, isRollback 表示出错是否回滚
func (db *DB) execMulti(conn abstract.Connection, cmdLines []CmdLine, isRollback bool) protocol.Reply {
	// 命令的执行结果
	results := make([]protocol.Reply, len(cmdLines))
	versionKeys := make([][]string, len(cmdLines))
	var writeKeys []string
	var readKeys []string
	for idx, cmdLine := range cmdLines {
		cmdName := strings.ToLower(string(cmdLine[0]))
		cmd, ok := commandCenter[cmdName]
		if !ok {
			// 这里正常不会执行
			continue
		}
		keyFunc := cmd.keyFunc
		readKs, writeKs := keyFunc(cmdLine[1:])
		// 读写key
		readKeys = append(readKeys, readKs...)
		writeKeys = append(writeKeys, writeKs...)
		// 写key需要 变更版本号
		versionKeys[idx] = append(versionKeys[idx], writeKs...)
	}
	watchingKey := conn.GetWatchKey()
	if isWatchingChanged(db, watchingKey) { // 判断watch key是否发生了变更
		return protocol.NewEmptyMultiBulkReply()
	}
	// 所有key上锁(原子性)
	db.RWLock(readKeys, writeKeys)
	defer db.RWUnLock(readKeys, writeKeys)
	undoCmdLines := [][]CmdLine{}
	aborted := false
	for idx, cmdLine := range cmdLines {
		// 生成回滚命令
		if isRollback {
			undoCmdLines = append(undoCmdLines, db.GetUndoLog(cmdLine))
		}
		// 执行命令
		reply := db.execWithLock(cmdLine)
		if protocol.IsErrReply(reply) { // 执行出错
			if isRollback { // 需要回滚
				undoCmdLines = undoCmdLines[:len(undoCmdLines)-1] // 命令执行失败(不用回滚),剔除最后一个回滚命令
				aborted = true
				break
			}
		}
		// 执行结果
		results[idx] = reply
	}
	// 中断,执行回滚
	if aborted {
		size := len(undoCmdLines)
		// 倒序执行回滚指令(完成回滚)
		for i := size - 1; i >= 0; i-- {
			curCmdLines := undoCmdLines[i]
			iflen(curCmdLines) == 0 {
				continue
			}
			for _, cmdLine := range curCmdLines {
				db.execWithLock(cmdLine)
			}
		}
		return protocol.NewGenericErrReply("EXECABORT Transaction discarded because of previous errors.")
	}
	// 执行到这里,说明命令执行完成(可能全部成功,也可能部分成功)
	for idx, keys := range versionKeys {
		if !protocol.IsErrReply(results[idx]) { // 针对执行成功的命令(写命令),变更版本号
			db.addVersion(keys...)
		}
	}
	// 将多个命令执行的结果,进行合并返回
	mixReply := protocol.NewMixReply()
	mixReply.Append(results...)
	return mixReply
}

效果演示(无回滚)

效果演示(有回滚)

因为set key 1将key的内存对象设定为字符串; 而zadd key 1 member操作的内存对象是有序集合; 内存对象不对,所以不能执行。这就是出错的原因。

项目代码地址: https://github.com/gofish2020/easyredis 

以上就是Golang实现Redis事务深入探究的详细内容,更多关于Golang Redis事务的资料请关注脚本之家其它相关文章!

相关文章

  • Golang import 导入包语法及一些特殊用法详解

    Golang import 导入包语法及一些特殊用法详解

    这篇文章主要介绍了Golang import 导入包语法及一些特殊用法,需要的朋友可以参考下
    2020-02-02
  • Golang开发gRPC服务入门介绍

    Golang开发gRPC服务入门介绍

    这篇文章主要介绍了Golang开发gRPC服务,Golang开发gRPC应用程序的套路也已经很清晰,这篇文章就来做一个简单的介绍,算是入门,需要的朋友可以参考下
    2022-04-04
  • Go依赖注入DI工具wire使用详解(golang常用库包)

    Go依赖注入DI工具wire使用详解(golang常用库包)

    依赖注入是指程序运行过程中,如果需要调用另一个对象协助时,无须在代码中创建被调用者,而是依赖于外部的注入,本文结合示例代码给大家介绍Go依赖注入DI工具wire使用,感兴趣的朋友一起看看吧
    2022-04-04
  • 关于Go语言中特有的设计模式与实现方式讲解

    关于Go语言中特有的设计模式与实现方式讲解

    虽然Go语言没有像其他语言那样明确的设计模式,但在实践中,开发者们仍然发现了一些在Go语言中特别适用的设计模式和实现方式,本文就来和大家一一进行讲解
    2023-05-05
  • Go语言中println和fmt.Println区别

    Go语言中println和fmt.Println区别

    本文主要介绍了Go语言中println和fmt.Println区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • Golang利用Recover进行错误处理

    Golang利用Recover进行错误处理

    Golang 中的 recover 是一个鲜为人知但非常有趣和强大的功能,这篇文章小编就来带大家深入了解一下在Golang中是如何利用Recover进行错误处理吧
    2023-12-12
  • Go语言如何实现线程安全的Map

    Go语言如何实现线程安全的Map

    Go语言内置的map虽然高效,但并不是线程安全的,若在多线程环境中直接操作map,可能会引发并发写入的错误,下面我们就来看看如何实现线程安全的Map吧
    2024-11-11
  • 一文详解Go语言io.LimitedReader类型

    一文详解Go语言io.LimitedReader类型

    这篇文章主要为大家介绍了Go语言io.LimitedReader类型示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • golang连接kafka消费进ES操作

    golang连接kafka消费进ES操作

    这篇文章主要介绍了golang连接kafka消费进ES操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • Golang实现自己的Redis(有序集合跳表)实例探究

    Golang实现自己的Redis(有序集合跳表)实例探究

    这篇文章主要为大家介绍了Golang实现自己的Redis(有序集合跳表)实例探究,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01

最新评论