Go语言Grpc Stream的实现

 更新时间:2022年06月20日 09:40:16   作者:范闲  
本文主要介绍了Go语言Grpc Stream的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

Stream Grpc

在我们单次投递的数据量很大的时候,比如传输一个二进制文件的时候,数据包过大,会造成瞬时传输压力。或者接收方接收到数据后,需要对数据做一系列的处理工作,

比如:数据过滤 -> 数据格式转换 -> 数据求和 ,这种场景非常适合使用stream grpc,

Stream Grpc演示

syntax = "proto3";

package book_stream;

option go_package = "/book_stream";

service HelloStreamService {
  rpc BookListStream(BookListStreamRequest) returns (stream BookListStreamResponse){};
  rpc CreateBookStream(stream CreateBookStreamRequest) returns (CreateBookStreamResponse){}
  rpc FindBookByIdStream(stream FindBookByIdStreamRequest) returns (stream FindBookByIdStreamResponse){}
}

message BookListStreamRequest{
}

message BookListStreamResponse{
  BookPoint book = 1;
}

message CreateBookStreamRequest{
  BookPoint book = 1;
}

message CreateBookStreamResponse{
  repeated BookIdPoint idx = 1;
}

message FindBookByIdStreamRequest{
  BookIdPoint idx = 1;
}
message FindBookByIdStreamResponse{
  BookPoint book = 1;
}

message BookIdPoint{
  uint64 idx = 1;
}

message BookPoint{
  uint64 idx = 1;
  string name = 2;
  float price = 3;
  string author = 4;
}

运行protoc --go_out=plugins=grpc:. *.proto生成脚手架文件

  • BookListStream服务端流式RPC
  • CreateBookStream客户端流式RPC
  • FindBookByIdStream双向流式RPC

注意,这里只是用作方便演示使用,演示方法都不是线程安全的

服务端server

var port = 8888

func main() {
   server := grpc.NewServer()
   book_stream.RegisterHelloStreamServiceServer(server, new(HelloStreamServiceImpl))
   lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
   if err != nil {
      panic(err)
   }
   if err := server.Serve(lis); err != nil {
      panic(err)
   }
}

客户端

func main() {
   var port = 8888
   conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithInsecure())
   if err != nil {
      panic(err)
   }
   defer conn.Close()
   client := book_stream.NewHelloStreamServiceClient(conn)

   ctx := context.Background()
   if err := createBookStream(ctx, client); err != nil {
      panic(err)
   }
   if err := printBookList(ctx, client); err != nil {
      panic(err)
   }
   if err := getBookListById(ctx, client); err != nil {
      panic(err)
   }
}

BookListStream

服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求

简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。

server端实现

var bookStore = map[uint64]book_stream.BookPoint{
   1: {
      Idx:    1,
      Author: "程子",
      Price:  9.9,
      Name:   "游戏思维",
   },
   2: {
      Idx:    2,
      Author: "丁锐",
      Price:  9.9,
      Name:   "活出必要的锋芒",
   },
}


type HelloStreamServiceImpl struct{}

func (HelloStreamServiceImpl) BookListStream(_ *book_stream.BookListStreamRequest, streamServer book_stream.HelloStreamService_BookListStreamServer) error {
   for idx, bookPoint := range bookStore {
      err := streamServer.Send(&book_stream.BookListStreamResponse{Book: &book_stream.BookPoint{
         Idx:    idx,
         Name:   bookPoint.Name,
         Price:  bookPoint.GetPrice(),
         Author: bookPoint.Author,
      }})
      if err != nil {
         return err
      }
   }
   return nil
}

客户端实现

func printBookList(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   req := &book_stream.BookListStreamRequest{}
   listStream, err := client.BookListStream(ctx, req)
   if err != nil {
      return err
   }
   for true {
      resp, err := listStream.Recv()
      if err != nil {
         if err == io.EOF {
            return nil
         }
         return err
      }
      fmt.Printf("%v\n", *resp.Book)
   }
   return nil
}

CreateBookStream

客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端

server端实现

func (HelloStreamServiceImpl) CreateBookStream(server book_stream.HelloStreamService_CreateBookStreamServer) error {
   var resList []*book_stream.BookIdPoint
   for {
      resp, err := server.Recv()
      if err == io.EOF {
         return server.SendAndClose(&book_stream.CreateBookStreamResponse{Idx: resList})
      }
      if err != nil {
         return err
      }
      bookStore[resp.Book.Idx] = *resp.Book
      resList = append(resList, &book_stream.BookIdPoint{Idx: resp.Book.Idx})
   }
}

客户端实现

var newBookStore = map[uint64]book_stream.BookPoint{
   3: {
      Idx:    3,
      Author: "程子1",
      Price:  9.9,
      Name:   "游戏思维1",
   },
   4: {
      Idx:    4,
      Author: "丁锐1",
      Price:  9.9,
      Name:   "活出必要的锋芒1",
   },
}

func createBookStream(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   stream, err := client.CreateBookStream(ctx)
   if err != nil {
      return err
   }
   for _, bookPoint := range newBookStore {
      if err := stream.Send(&book_stream.CreateBookStreamRequest{
         Book: &bookPoint,
      }); err != nil {
         return err
      }
   }
   recv, err := stream.CloseAndRecv()
   if err != nil {
      return err
   }
   fmt.Println(recv.Idx)
   return nil
}

stream.SendAndClose,它是做什么用的呢?

在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv

stream.CloseAndRecv 和 stream.SendAndClose 是配套使用的流方法,

FindBookByIdStream

服务端实现

func (HelloStreamServiceImpl) FindBookByIdStream(streamServer book_stream.HelloStreamService_FindBookByIdStreamServer) error {
   for {
      resp, err := streamServer.Recv()
      if err == io.EOF {
         return nil
      }
      if err != nil {
         return err
      }
      if book, ok := bookStore[resp.Idx.Idx]; ok {
         if err := streamServer.Send(&book_stream.FindBookByIdStreamResponse{Book: &book}); err != nil {
            return err
         }
      }
   }
}

客户端实现

func getBookListById(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
   stream, err := client.FindBookByIdStream(ctx)
   if err != nil {
      return err
   }
   var findList = []uint64{1, 2}
   for _, idx := range findList {
      err := stream.Send(&book_stream.FindBookByIdStreamRequest{Idx: &book_stream.BookIdPoint{Idx: idx}})
      if err != nil {
         return err
      }
      recv, err := stream.Recv()
      if err != nil {
         return err
      }
      fmt.Printf("%v\n", recv.Book)
   }
   if err := stream.CloseSend(); err != nil {
      return err
   }
   return nil
}

到此这篇关于Go语言Grpc Stream的实现的文章就介绍到这了,更多相关Go语言Grpc Stream 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Golang精编49面试题汇总(选择题)

    Golang精编49面试题汇总(选择题)

    这篇文章主要介绍了Golang精编49面试题汇总(选择题),本文章内容详细,具有很好的参考价值,希望对大家有所帮助,需要的朋友可以参考下
    2023-01-01
  • go等待一组协程结束的操作方式

    go等待一组协程结束的操作方式

    这篇文章主要介绍了go等待一组协程结束的操作方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-05-05
  • Golang内存管理之垃圾收集器详解

    Golang内存管理之垃圾收集器详解

    这篇文章我们主要介绍垃圾收集器的设计原理以及Golang垃圾收集器的实现原理,文中有详细的代码示例及图文介绍,感兴趣的小伙伴跟着小编一起来学习吧
    2023-06-06
  • go doudou开发单体RESTful服务快速上手教程

    go doudou开发单体RESTful服务快速上手教程

    这篇文章主要为大家介绍了go doudou开发单体RESTful服务快速上手教程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • golang标准库time时间包的使用

    golang标准库time时间包的使用

    时间和日期是我们编程中经常会用到的,本文主要介绍了golang标准库time时间包的使用,具有一定的参考价值,感兴趣的可以了解一下
    2023-10-10
  • 重学Go语言之如何开发RPC应用

    重学Go语言之如何开发RPC应用

    这篇文章主要为大家详细介绍了在Go语言中如何构建RPC应用,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-09-09
  • Golang中定时器的陷阱详解

    Golang中定时器的陷阱详解

    这篇文章主要给大家介绍了关于Golang中定时器陷阱的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用golang具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-09-09
  • 一文带你了解Go语言中的类型断言和类型转换

    一文带你了解Go语言中的类型断言和类型转换

    在Go中,类型断言和类型转换是一个令人困惑的事情,他们似乎都在做同样的事情。最明显的不同点是他们具有不同的语法(variable.(type) vs type(variable) )。本文我们就来深入研究一下二者的区别
    2022-09-09
  • Windows下升级go版本过程详解

    Windows下升级go版本过程详解

    这篇文章主要为大家介绍了Windows下升级go版本过程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-03-03
  • GoFrame glist 基础使用和自定义遍历

    GoFrame glist 基础使用和自定义遍历

    这篇文章主要为大家介绍了GoFrame glist的基础使用和自定义遍历示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06

最新评论