Go语言结合grpc和protobuf实现去中心化的聊天室

 更新时间:2024年03月10日 10:26:40   作者:AlpsMonaco  
这篇文章主要为大家详细介绍了Go语言如何结合grpc和protobuf实现去中心化的聊天室,文中的示例代码讲解详细,有需要的小伙伴可以跟随小编一起学习一下

介绍

传统的聊天室主要是基于c/s架构,需要有一个服务端完成各个客户端的聊天转发。今天我们使用golang+grpc+protobuf,设计一个去中心化、局域网自发现的聊天客户端。

完整代码地址在 github.com/AlpsMonaco/proximity-chat

模块

协议

我们先定义proto消息格式 message/message.proto

syntax = "proto3";

option go_package = "proximity-chat/message";

package message;

service Chat {
    rpc NewNode (stream NodeRequest) returns (stream NodeReply){ }
}

message NodeRequest {
    string msg = 1;
}

message NodeReply {
    string msg = 1;
}

聊天软件一般需要全双工保证时效性,所以这边使用了 stream NodeRequeststream NodeReply。 这边消息只有两个,请求和回复直接透传string就行。

执行

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative message\message.proto

会在相同目录下生成相关的go代码文件。在文件 message_grpc.pb.go 中会包含rpc的interface

type ChatServer interface {
	NewNode(Chat_NewNodeServer) error
	mustEmbedUnimplementedChatServer()
}

我们需要实现这个接口中的 NewNode 服务。

交互

在 service/message.go 中实现 NewNode(Chat_NewNodeServer) error

type MessageWriter interface {
	Write(string)
}

type Message struct {
	Writer MessageWriter
	message.UnimplementedChatServer
}
...
func (m *Message) NewNode(ss message.Chat_NewNodeServer) error {
	head, err := ss.Recv()
	if err != nil {
		m.Writer.Write(fmt.Sprint(err))
		return err
	}
	addr := head.GetMsg()
	if controller.IsChatNodeExist(addr) {
		return nil
	}
	if !controller.AddChatNode(&ServerChatNode{s: ss}, addr) {
		return nil
	}
	err = ss.Send(&message.NodeReply{Msg: "ok"})
	if err != nil {
		return err
	}
	m.Writer.Write("new node " + addr + " has joined")
	for {
		msg, err := ss.Recv()
		if err != nil {
			controller.RemoveNode(addr)
			fmt.Println(err)
			return err
		}
		m.Writer.Write(msg.GetMsg())
	}
}

由于是去中心化,所以没有客户端服务端的概念,我们将它称为一个节点 node。在同一个局域网内,node监听的ip+port做唯一key,用于避免重复进入聊天室。

上面的代码中 controller 模块主要是用来控制和管理断点的,后续会讲。

整体流程是先接收其他node发来的 ip+port ,判断是否已经加入过这个端点,如果没加入过就用controller绑定节点,进行后续的聊天请求,否则中止交互。

控制

在 controller/node.go ,我们使用map和读写锁来维护node的唯一性。

package controller

import (
	"sync"
)

type ChatNode interface {
	SendChatMsg(string) error
	RecvChatMsg() (string, error)
}

var nodeMap map[string]ChatNode = make(map[string]ChatNode)
var nodeMapLock sync.RWMutex

func AddChatNode(node ChatNode, addr string) bool {
	nodeMapLock.Lock()
	defer nodeMapLock.Unlock()
	_, ok := nodeMap[addr]
	if !ok {
		nodeMap[addr] = node
		return true
	}
	return false
}

func RemoveNode(addr string) {
	nodeMapLock.Lock()
	defer nodeMapLock.Unlock()
	delete(nodeMap, addr)
}

func IsChatNodeExist(addr string) bool {
	nodeMapLock.RLock()
	defer nodeMapLock.RUnlock()
	_, ok := nodeMap[addr]
	return ok
}

func Publish(s string) {
	nodeMapLock.RLock()
	defer nodeMapLock.RUnlock()
	for _, n := range nodeMap {
		n.SendChatMsg(s)
	}
}

发现

discover/discover.go 下定义如何发现相同网段上的其他服务。

这边使用 ipnetgen 库来获取相同网段下的所有IP。定期去遍历其他网段上的相同服务。 将自己的监听ip+端口发送给其他node,若返回'ok'则建立通讯。

func BeginDiscoverService() {
	minPort := config.GetConfig().GetMinPort()
	maxPort := config.GetConfig().GetMaxPort()
	if minPort > maxPort {
		minPort = maxPort
	}
	for {
		time.Sleep(time.Second)
		gen, err := ipnetgen.New(config.GetConfig().GetCIDR())
		if err != nil {
			panic(err)
		}
		for ip := gen.Next(); ip != nil; ip = gen.Next() {
			for i := minPort; i <= maxPort; i++ {
				addr := fmt.Sprintf("%s:%d", ip.String(), i)
				if addr == GetAddr() {
					continue
				}
				if controller.IsChatNodeExist(addr) {
					continue
				}
				conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
				if err != nil {
					fmt.Printf("did not connect: %v\n", err)
					continue
				}
				client := message.NewChatClient(conn)
				cli, err := client.NewNode(context.Background())
				if err != nil {
					continue
				}
				err = cli.Send(&message.NodeRequest{Msg: GetAddr()})
				if err != nil {
					writer.Write(fmt.Sprint(err))
					continue
				}
				resp, err := cli.Recv()
				if err != nil {
					cli.CloseSend()
					writer.Write(fmt.Sprint(err))
					continue
				}
				if resp.GetMsg() != "ok" {
					cli.CloseSend()
					continue
				}
				if !controller.AddChatNode(&service.ClientChatNode{C: cli}, addr) {
					cli.CloseSend()
					continue
				}
				writer.Write("discover " + addr)
				go func() {
					for {
						msg, err := cli.Recv()
						if err != nil {
							writer.Write(fmt.Sprint(err))
							controller.RemoveNode(addr)
							return
						}
						writer.Write(msg.GetMsg())
					}
				}()
			}
		}
	}
}

配置

我们定义配置的获取方式,配置文件格式为json,定义配置获取的方式 config.go 。

package config

type NetworkConfig struct {
	CIDR    string `json:"cidr"`
	MaxPort int    `json:"max_port"`
	MinPort int    `json:"min_port"`
}

func DefaultNetworkConfig() *NetworkConfig {
	return &NetworkConfig{
		"127.0.0.1/32", 4569, 4565,
	}
}

type ConstNetworkConfig struct {
	c *NetworkConfig
}

func (c *ConstNetworkConfig) GetCIDR() string { return c.c.CIDR }
func (c *ConstNetworkConfig) GetMaxPort() int { return c.c.MaxPort }
func (c *ConstNetworkConfig) GetMinPort() int { return c.c.MinPort }

var config = &ConstNetworkConfig{DefaultNetworkConfig()}

func GetConfig() *ConstNetworkConfig { return config }
func SetConfig(nc *NetworkConfig)    { config = &ConstNetworkConfig{nc} }

这边最主要定义三个字段,内网的ip网段,服务的最小到最大的端口范围。这个配置主要用于搜寻同网段同端口上的相同服务。为了方便调试我们加一个 DefaultNetworkConfig(),监听127.0.0.1上的4565~4569。 同时还加了一个 ConstNetworkConfig 类,供其他模块访问全局配置,同时保护配置不被修改。

运行实例

编译后直接运行,会在指定的端口范围内尝试监听,无需指定端口。主线程中scanf阻塞获取输入。我们直接打开三个进程,在一个终端中输入数据发送,其他两个终端都能获取聊天数据。

以上就是Go语言结合grpc和protobuf实现去中心化的聊天室的详细内容,更多关于Go聊天室的资料请关注脚本之家其它相关文章!

相关文章

  • go语言中的二维切片赋值

    go语言中的二维切片赋值

    这篇文章主要介绍了go语言中的二维切片赋值操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-04-04
  • Go使用XORM操作MySQL的陷阱盘点分析

    Go使用XORM操作MySQL的陷阱盘点分析

    在 Go 语言开发中,大家为了方便,通常会选择使用 ORM 操作数据库,比如使用 XORM 或 GORM 操作 MySQL,本文我们来介绍一下使用 XORM[3] 操作 MySQL 可能会遇到的陷阱
    2023-11-11
  • Go实现整合Logrus实现日志打印

    Go实现整合Logrus实现日志打印

    这篇文章主要介绍了Go实现整合Logrus实现日志打印,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-07-07
  • 一文掌握Go语言并发编程必备的Mutex互斥锁

    一文掌握Go语言并发编程必备的Mutex互斥锁

    Go 语言提供了 sync 包,其中包括 Mutex 互斥锁、RWMutex 读写锁等同步机制,本篇博客将着重介绍 Mutex 互斥锁的基本原理,需要的可以参考一下
    2023-04-04
  • 详解Golang中的Mutex并发原语

    详解Golang中的Mutex并发原语

    Mutex 是 Go 语言中互斥锁的实现,它是一种同步机制,用于控制多个 goroutine 之间的并发访问。本文将着重介绍 Go 的 Mutex 并发原语,希望对大家有所帮助
    2023-03-03
  • Golang打包配置文件的实现示例

    Golang打包配置文件的实现示例

    本文主要介绍了Golang打包配置文件的实现示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-12-12
  • go内置函数copy()的具体使用

    go内置函数copy()的具体使用

    当我们在Go语言中需要将一个切片的内容复制到另一个切片时,可以使用内置的copy()函数,本文就介绍了go内置函数copy()的具体使用,感兴趣的可以了解一下
    2023-08-08
  • Go 面向包新提案透明文件夹必要性分析

    Go 面向包新提案透明文件夹必要性分析

    这篇文章主要为大家介绍了Go 面向包新提案,透明文件夹必要性分析,看看是否合适加进 Go 特性中,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • Golang定时器的2种实现方法与区别

    Golang定时器的2种实现方法与区别

    这篇文章主要给大家介绍了关于Golang定时器的2种实现方法与区别的相关资料,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-02-02
  • golang操作elasticsearch的实现

    golang操作elasticsearch的实现

    这篇文章主要介绍了golang操作elasticsearch,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06

最新评论