Golang 实现 Redis系列(六)如何实现 pipeline 模式的 redis 客户端

 更新时间:2021年07月11日 09:27:20   作者:Finley  
pipeline 模式的 redis 客户端需要有两个后台协程负责 tcp 通信,调用方通过 channel 向后台协程发送指令,并阻塞等待直到收到响应,本文是使用 golang 实现 redis 系列的第六篇, 将介绍如何实现一个 Pipeline 模式的 Redis 客户端。

本文的完整代码在github.com/hdt3213/godis/redis/client

通常 TCP 客户端的通信模式都是阻塞式的: 客户端发送请求 -> 等待服务端响应 -> 发送下一个请求。因为需要等待网络传输数据,完成一次请求循环需要等待较多时间。

我们能否不等待服务端响应直接发送下一条请求呢?答案是肯定的。

TCP 作为全双工协议可以同时进行上行和下行通信,不必担心客户端和服务端同时发包会导致冲突。

p.s. 打电话的时候两个人同时讲话就会冲突听不清,只能轮流讲。这种通信方式称为半双工。广播只能由电台发送到收音机不能反向传输,这种方式称为单工。

我们为每一个 tcp 连接分配了一个 goroutine 可以保证先收到的请求先先回复。另一个方面,tcp 协议会保证数据流的有序性,同一个 tcp 连接上先发送的请求服务端先接收,先回复的响应客户端先收到。因此我们不必担心混淆响应所对应的请求。

这种在服务端未响应时客户端继续向服务端发送请求的模式称为 Pipeline 模式。因为减少等待网络传输的时间,Pipeline 模式可以极大的提高吞吐量,减少所需使用的 tcp 链接数。

pipeline 模式的 redis 客户端需要有两个后台协程程负责 tcp 通信,调用方通过 channel 向后台协程发送指令,并阻塞等待直到收到响应,这是一个典型的异步编程模式。

我们先来定义 client 的结构:

type Client struct {
    conn        net.Conn // 与服务端的 tcp 连接
    pendingReqs chan *Request // 等待发送的请求
    waitingReqs chan *Request // 等待服务器响应的请求
    ticker      *time.Ticker // 用于触发心跳包的计时器
    addr        string

    ctx        context.Context
    cancelFunc context.CancelFunc
    writing    *sync.WaitGroup // 有请求正在处理不能立即停止,用于实现 graceful shutdown
}

type Request struct {
    id        uint64 // 请求id
    args      [][]byte // 上行参数
    reply     redis.Reply // 收到的返回值
    heartbeat bool // 标记是否是心跳请求
    waiting   *wait.Wait // 调用协程发送请求后通过 waitgroup 等待请求异步处理完成
    err       error
}

调用者将请求发送给后台协程,并通过 wait group 等待异步处理完成:

func (client *Client) Send(args [][]byte) redis.Reply {
	request := &request{
		args:      args,
		heartbeat: false,
		waiting:   &wait.Wait{},
	}
	request.waiting.Add(1)
	client.working.Add(1)
	defer client.working.Done()
	client.pendingReqs <- request // 请求入队
	timeout := request.waiting.WaitWithTimeout(maxWait) // 等待响应或者超时
	if timeout {
		return reply.MakeErrReply("server time out")
	}
	if request.err != nil {
		return reply.MakeErrReply("request failed")
	}
	return request.reply
}

client 的核心部分是后台的读写协程。先从写协程开始:

// 写协程入口
func (client *Client) handleWrite() {
	for req := range client.pendingReqs {
		client.doRequest(req)
	}
}

// 发送请求
func (client *Client) doRequest(req *request) {
	if req == nil || len(req.args) == 0 {
		return
	}
    // 序列化请求
	re := reply.MakeMultiBulkReply(req.args)
	bytes := re.ToBytes()
	_, err := client.conn.Write(bytes)
	i := 0
    // 失败重试
	for err != nil && i < 3 {
		err = client.handleConnectionError(err)
		if err == nil {
			_, err = client.conn.Write(bytes)
		}
		i++
	}
	if err == nil {
        // 发送成功等待服务器响应
		client.waitingReqs <- req
	} else {
		req.err = err
		req.waiting.Done()
	}
}

读协程是我们熟悉的协议解析器模板, 不熟悉的朋友可以到解析Redis Cluster原理了解更多。

// 收到服务端的响应
func (client *Client) finishRequest(reply redis.Reply) {
	defer func() {
		if err := recover(); err != nil {
			debug.PrintStack()
			logger.Error(err)
		}
	}()
	request := <-client.waitingReqs
	if request == nil {
		return
	}
	request.reply = reply
	if request.waiting != nil {
		request.waiting.Done()
	}
}

// 读协程是个 RESP 协议解析器
func (client *Client) handleRead() error {
	ch := parser.ParseStream(client.conn)
	for payload := range ch {
		if payload.Err != nil {
			client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
			continue
		}
		client.finishRequest(payload.Data)
	}
	return nil
}

最后编写 client 的构造器和启动异步协程的代码:

func MakeClient(addr string) (*Client, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    return &Client{
        addr:        addr,
        conn:        conn,
        sendingReqs: make(chan *Request, chanSize),
        waitingReqs: make(chan *Request, chanSize),
        ctx:         ctx,
        cancelFunc:  cancel,
        writing:     &sync.WaitGroup{},
    }, nil
}

func (client *Client) Start() {
    client.ticker = time.NewTicker(10 * time.Second)
    go client.handleWrite()
    go func() {
        err := client.handleRead()
        logger.Warn(err)
    }()
    go client.heartbeat()
}

关闭 client 的时候记得等待请求完成:

func (client *Client) Close() {
    // 先阻止新请求进入队列
    close(client.sendingReqs)

    // 等待处理中的请求完成
    client.writing.Wait()

    // 释放资源
    _ = client.conn.Close() // 关闭与服务端的连接,连接关闭后读协程会退出
    client.cancelFunc() // 使用 context 关闭读协程
    close(client.waitingReqs) // 关闭队列
}

测试一下:

func TestClient(t *testing.T) {
    client, err := MakeClient("localhost:6379")
    if err != nil {
        t.Error(err)
    }
    client.Start()

    result = client.Send([][]byte{
        []byte("SET"),
        []byte("a"),
        []byte("a"),
    })
    if statusRet, ok := result.(*reply.StatusReply); ok {
        if statusRet.Status != "OK" {
            t.Error("`set` failed, result: " + statusRet.Status)
        }
    }

    result = client.Send([][]byte{
        []byte("GET"),
        []byte("a"),
    })
    if bulkRet, ok := result.(*reply.BulkReply); ok {
        if string(bulkRet.Arg) != "a" {
            t.Error("`get` failed, result: " + string(bulkRet.Arg))
        }
    }
}

Keep working, we will find a way out.This is Finley, welcome to join us.

到此这篇关于Golang 实现 Redis系列(六)如何实现 pipeline 模式的 redis 客户端的文章就介绍到这了,更多相关Golang实现pipeline模式的redis客户端内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • golang中的string与其他格式数据的转换方法详解

    golang中的string与其他格式数据的转换方法详解

    这篇文章主要介绍了golang中的string与其他格式数据的转换方法,文章通过代码示例介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2023-10-10
  • Go语言实现Snowflake雪花算法

    Go语言实现Snowflake雪花算法

    雪花算法产生的背景当然是twitter高并发环境下对唯一ID生成的需求,得益于twitter内部牛的技术,雪花算法能够流传于至今并且被广泛使用,本文就详细的介绍一下,感兴趣的可以了解一下
    2021-06-06
  • Go语言字符串及strings和strconv包使用实例

    Go语言字符串及strings和strconv包使用实例

    字符串是工作中最常用的,值得我们专门的练习一下,下面这篇文章主要给大家介绍了关于Go语言字符串及strings和strconv包使用的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-06-06
  • GO语言常用的文件读取方式

    GO语言常用的文件读取方式

    这篇文章主要介绍了GO语言常用的文件读取方式,涉及一次性读取、分块读取与逐行读取等方法,是非常实用的技巧,需要的朋友可以参考下
    2014-12-12
  • Go语言中如何确保Cookie数据的安全传输

    Go语言中如何确保Cookie数据的安全传输

    这篇文章主要介绍了Go语言中如何确保Cookie数据的安全传输,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-03-03
  • Go如何优雅的使用字节池示例详解

    Go如何优雅的使用字节池示例详解

    在编程开发中,我们经常会需要频繁创建和销毁同类对象的情形,这样的操作很可能会对性能造成影响,这时常用的优化手段就是使用对象池(object pool),这篇文章主要给大家介绍了关于Go如何优雅的使用字节池的相关资料,需要的朋友可以参考下
    2022-08-08
  • 利用Go语言实现在终端绘制小兔子

    利用Go语言实现在终端绘制小兔子

    这篇文章主要为大家详细介绍了如何利用Go语言实现在终端绘制小兔子来给大家拜个早年,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2023-01-01
  • go语言之go(goroutine)控制异步详解

    go语言之go(goroutine)控制异步详解

    Go语言通过goroutine实现并发,允许异步执行函数,但单独使用会引发顺序问题,需结合WaitGroup确保主线程等待所有并发任务完成后再输出结果,从而正确同步执行流程
    2025-07-07
  • 基于gin的golang web开发之认证利器jwt

    基于gin的golang web开发之认证利器jwt

    这篇文章主要介绍了基于gin的golang web开发之认证利器jwt,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • Golang实现延迟调用的项目实践

    Golang实现延迟调用的项目实践

    本文主要介绍了Golang实现延迟调用的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-02-02

最新评论