一文详解GO如何实现Redis的AOF持久化

 更新时间:2023年03月27日 09:38:11   作者:CSGOPHER  
这篇文章主要为大家详细介绍了GO如何实现Redis的AOF持久化的,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以了解一下

GO实现Redis的AOF持久化

将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据

https://github.com/csgopher/go-redis

本文涉及以下文件:redis.conf:配置文件

aof:实现aof

redis.conf

appendonly yes
appendfilename appendonly.aof

aof/aof.go

type CmdLine = [][]byte

const (
   aofQueueSize = 1 << 16
)

type payload struct {
   cmdLine CmdLine
   dbIndex int
}

type AofHandler struct {
   db          databaseface.Database
   aofChan     chan *payload
   aofFile     *os.File
   aofFilename string
   currentDB   int
}

func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {
   handler := &AofHandler{}
   handler.aofFilename = config.Properties.AppendFilename
   handler.db = db
   handler.LoadAof()
   aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
   if err != nil {
      return nil, err
   }
   handler.aofFile = aofFile
   handler.aofChan = make(chan *payload, aofQueueSize)
   go func() {
      handler.handleAof()
   }()
   return handler, nil
}

func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
   if config.Properties.AppendOnly && handler.aofChan != nil {
      handler.aofChan <- &payload{
         cmdLine: cmdLine,
         dbIndex: dbIndex,
      }
   }
}

func (handler *AofHandler) handleAof() {
   handler.currentDB = 0
   for p := range handler.aofChan {
      if p.dbIndex != handler.currentDB {
         // select db
         data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
         _, err := handler.aofFile.Write(data)
         if err != nil {
            logger.Warn(err)
            continue
         }
         handler.currentDB = p.dbIndex
      }
      data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
      _, err := handler.aofFile.Write(data)
      if err != nil {
         logger.Warn(err)
      }
   }
}

func (handler *AofHandler) LoadAof() {
   file, err := os.Open(handler.aofFilename)
   if err != nil {
      logger.Warn(err)
      return
   }
   defer file.Close()
   ch := parser.ParseStream(file)
   fakeConn := &connection.Connection{}
   for p := range ch {
      if p.Err != nil {
         if p.Err == io.EOF {
            break
         }
         logger.Error("parse error: " + p.Err.Error())
         continue
      }
      if p.Data == nil {
         logger.Error("empty payload")
         continue
      }
      r, ok := p.Data.(*reply.MultiBulkReply)
      if !ok {
         logger.Error("require multi bulk reply")
         continue
      }
      ret := handler.db.Exec(fakeConn, r.Args)
      if reply.IsErrorReply(ret) {
         logger.Error("exec err", err)
      }
   }
}
  • AofHandler:1.从管道中接收数据 2.写入AOF文件
  • AddAof:用户的指令包装成payload放入管道
  • handleAof:将管道中的payload写入磁盘
  • LoadAof:重启Redis后加载aof文件

database/database.go

type Database struct {
   dbSet []*DB
   aofHandler *aof.AofHandler
}

func NewDatabase() *Database {
   mdb := &Database{}
   if config.Properties.Databases == 0 {
      config.Properties.Databases = 16
   }
   mdb.dbSet = make([]*DB, config.Properties.Databases)
   for i := range mdb.dbSet {
      singleDB := makeDB()
      singleDB.index = i
      mdb.dbSet[i] = singleDB
   }
   if config.Properties.AppendOnly {
      aofHandler, err := aof.NewAOFHandler(mdb)
      if err != nil {
         panic(err)
      }
      mdb.aofHandler = aofHandler
      for _, db := range mdb.dbSet {
         singleDB := db
         singleDB.addAof = func(line CmdLine) {
            mdb.aofHandler.AddAof(singleDB.index, line)
         }
      }
   }
   return mdb
}

将AOF加入到database里

使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db

database/db.go

type DB struct {
   index int
   data   dict.Dict
   addAof func(CmdLine)
}

func makeDB() *DB {
	db := &DB{
		data:   dict.MakeSyncDict(),
		addAof: func(line CmdLine) {},
	}
	return db
}

由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {
   ......
   if deleted > 0 {
      db.addAof(utils.ToCmdLine2("del", args...))
   }
   return reply.MakeIntReply(int64(deleted))
}

func execFlushDB(db *DB, args [][]byte) resp.Reply {
	db.Flush()
	db.addAof(utils.ToCmdLine2("flushdb", args...))
	return &reply.OkReply{}
}

func execRename(db *DB, args [][]byte) resp.Reply {
	......
	db.addAof(utils.ToCmdLine2("rename", args...))
	return &reply.OkReply{}
}

func execRenameNx(db *DB, args [][]byte) resp.Reply {
	......
	db.addAof(utils.ToCmdLine2("renamenx", args...))
	return reply.MakeIntReply(1)
}

database/string.go

func execSet(db *DB, args [][]byte) resp.Reply {
   ......
   db.addAof(utils.ToCmdLine2("set", args...))
   return &reply.OkReply{}
}

func execSetNX(db *DB, args [][]byte) resp.Reply {
   ......
   db.addAof(utils.ToCmdLine2("setnx", args...))
   return reply.MakeIntReply(int64(result))
}

func execGetSet(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   value := args[1]

   entity, exists := db.GetEntity(key)
   db.PutEntity(key, &database.DataEntity{Data: value})
   db.addAof(utils.ToCmdLine2("getset", args...))
   ......
}

添加addAof方法

测试命令

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n

到此这篇关于一文详解GO如何实现Redis的AOF持久化的文章就介绍到这了,更多相关GO实现Redis AOF持久化内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Go语言os包用法详解

    Go语言os包用法详解

    本文主要介绍了Go语言os包用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • Go string转int,int64,int32及注意事项说明

    Go string转int,int64,int32及注意事项说明

    这篇文章主要介绍了Go string转int,int64,int32及注意事项说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-07-07
  • Go语言流程控制语句

    Go语言流程控制语句

    这篇文章介绍了Go语言流程控制语句的用法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-07-07
  • Go中阻塞以及非阻塞操作实现(Goroutine和main Goroutine)

    Go中阻塞以及非阻塞操作实现(Goroutine和main Goroutine)

    本文主要介绍了Go中阻塞以及非阻塞操作实现(Goroutine和main Goroutine),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-05-05
  • Golang实现将视频按照时间维度剪切的工具

    Golang实现将视频按照时间维度剪切的工具

    这篇文章主要为大家详细介绍了如何利用Golang实现将视频按照时间维度进行剪切,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起了解一下
    2022-12-12
  • Golang实现常见的限流算法的示例代码

    Golang实现常见的限流算法的示例代码

    限流是项目中经常需要使用到的一种工具,一般用于限制用户的请求的频率,也可以避免瞬间流量过大导致系统崩溃,或者稳定消息处理速率,本文主要介绍了使用Go实现常见的限流算法,希望对大家有所帮助
    2023-04-04
  • golang压缩与解压缩文件的示例代码

    golang压缩与解压缩文件的示例代码

    这篇文章主要给大家介绍了golang压缩与解压缩文件,文中通过代码示例给大家介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-02-02
  • Golang共享变量如何解决问题

    Golang共享变量如何解决问题

    协程之间的通信只能够通过通道。但是我们习惯于共享变量,而且很多时候使用共享变量能让代码更简洁。那么Golang共享变量如何解决问题,感兴趣的可以了解一下
    2021-12-12
  • golang 四则运算计算器yacc归约手写实现

    golang 四则运算计算器yacc归约手写实现

    这篇文章主要为大家介绍了golang 四则运算 计算器 yacc 归约的手写实现,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-07-07
  • GO语言并发编程之互斥锁、读写锁详解

    GO语言并发编程之互斥锁、读写锁详解

    这篇文章主要介绍了GO语言并发编程之互斥锁、读写锁详解,本文是GO并发编程实战一书的样章,详细讲解了互斥锁、读写锁,然后给出了一个完整示例,需要的朋友可以参考下
    2014-11-11

最新评论