gRPC中拦截器的使用详解

 更新时间:2023年10月08日 09:05:05   作者:童话ing  
这篇文章主要介绍了gRPC中拦截器的使用详解,本次主要介绍在gRPC中使用拦截器,包括一元拦截器和流式拦截器,在拦截器中添加JWT认证,客户端登录之后会获得token,请求特定的API时候需要带上token才能访问,需要的朋友可以参考下

前言

本次主要介绍在gRPC中使用拦截器,包括一元拦截器和流式拦截器,在拦截器中添加JWT认证,客户端登录之后会获得token,请求特定的API时候需要带上token才能访问。由于代码中我们使用了grpc-gateway提供http服务,因此需要安装gateway的一些依赖

在本文中就简单介绍一下拦截器的使用,RPC请求分为一元RPC请求和流式RPC请求,所谓一元RPC指的就是请求和响应都是一次完成的,gRPC是基于HTTP2.0的,因此,一元RPC就可以看成客户端请求一次,服务端就响应一次。而流式RPC则是像流一样多次进行响应或者请求传输的。

相应地,拦截器分为服务端拦截和客户端拦截器,根据功能的不同又分为一元拦截器和流式拦截器。

服务端拦截器

1、一元拦截器:UnaryInterceptor

源码中写得比较清楚了, UnaryServerInterceptor 提供了一个钩子来拦截服务器上一元RPC的执行。 info 参数包含了这个RPC拦截器能操作的所有信息, handler 是服务方法实现的一个包装器,用于供拦截器中调用来处理RPC请求的逻辑。

// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		if o.unaryInt != nil {
			panic("The unary server interceptor was already set and may not be reset.")
		}
		o.unaryInt = i
	})
}

主要看其中包含的参数 UnaryServerInterceptor :

// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
// contains all the information of this RPC the interceptor can operate on. And handler // is the wrapper of the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

其中几个参数解释为:

  • ctx context.Context:请求上下文
  • req interface{}:RPC 方法的请求参数
  • info *UnaryServerInfo:包含了RPC 方法的所有信息
  • handler UnaryHandler:RPC 方法真正执行逻辑

2、流式拦截器:StreamInterceptor

StreamServerInterceptor 提供了一个钩子来拦截服务器上流式RPC的执行。 info 包含拦截器可以操作的RPC的所有信息。 handler 是服务方法实现。拦截器负责调用 handler 来完成RPC。

// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		if o.streamInt != nil {
			panic("The stream server interceptor was already set and may not be reset.")
		}
		o.streamInt = i
	})
}

StreamServerInterceptor:

// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC //on the server.info contains all the information of this RPC the interceptor can operate //on. And handler is the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC.
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

3、实现服务端拦截器

下面我们进行简单的实现:

// 一元拦截器
func Unary() grpc.UnaryServerInterceptor {
	return func(
		ctx context.Context,
		req interface{},
		info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (interface{}, error) {
		log.Print("---------> the unaryServerInterceptor: ", info.FullMethod)
		err := interceptor.authorize(ctx, info.FullMethod)//实现拦截验证的逻辑,自行实现,我这里是截取的完整代码中的,也可以参考我的gitee上的代码
		if err != nil {
			return nil, err
		}
		return handler(ctx, req)
	}
}
//stream拦截器
func Stream() grpc.StreamServerInterceptor {
	return func(
		srv interface{},
		stream grpc.ServerStream,
		info *grpc.StreamServerInfo,
		handler grpc.StreamHandler) error {
		log.Print("--------->the streamServerInterceptor: ", info.FullMethod)
		err := interceptor.authorize(stream.Context(), info.FullMethod)//实现拦截验证的逻辑,自行实现,我这里是截取的完整代码中的,也可以参考我的gitee上的代码
		if err != nil {
			return err
		}
		return handler(srv, stream)
	}
}

将上面实现的拦截器加入到Server中即可:

serverOptions := []grpc.ServerOption{
	grpc.UnaryInterceptor(Unary()),
	grpc.StreamInterceptor(Stream()),
}
grpcServer := grpc.NewServer(serverOptions...)

客户端拦截器

1、一元拦截器:WithUnaryInterceptor

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
// unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
	return newFuncDialOption(func(o *dialOptions) {
		o.unaryInt = f
	})
}

UnaryClientInterceptor

// UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
// Unary interceptors can be specified as a DialOption, using
// WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a
// ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC
// delegates all unary RPC invocations to the interceptor, and it is the
// responsibility of the interceptor to call invoker to complete the processing
// of the RPC.
//
// method is the RPC name. req and reply are the corresponding request and
// response messages. cc is the ClientConn on which the RPC was invoked. invoker
// is the handler to complete the RPC and it is the responsibility of the
// interceptor to call it. opts contain all applicable call options, including
// defaults from the ClientConn as well as per-call options.
//
// The returned error must be compatible with the status package.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

2、流式拦截器:WithStreamInterceptor

// WithStreamInterceptor returns a DialOption that specifies the interceptor for
// streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
	return newFuncDialOption(func(o *dialOptions) {
		o.streamInt = f
	})
}

StreamClientInterceptor 拦截客户端流 ClientStream 的创建,流式拦截器可以指定为一个 Dial 选项。当创建一个客户端连接时,使用 WithStreamInterceptor() 或者 WithChainStreamInterceptor() 。当一个流拦截器被设置在客户端连接中的时候,gRPC将所有流的创建都交给拦截器,拦截器调用streamer。

// StreamClientInterceptor intercepts the creation of a ClientStream. Stream
// interceptors can be specified as a DialOption, using WithStreamInterceptor()
// or WithChainStreamInterceptor(), when creating a ClientConn. When a stream
// interceptor(s) is set on the ClientConn, gRPC delegates all stream creations
// to the interceptor, and it is the responsibility of the interceptor to call
// streamer.
//
// desc contains a description of the stream. cc is the ClientConn on which the
// RPC was invoked. streamer is the handler to create a ClientStream and it is
// the responsibility of the interceptor to call it. opts contain all applicable
// call options, including defaults from the ClientConn as well as per-call
// options.
//
// StreamClientInterceptor may return a custom ClientStream to intercept all I/O
// operations. The returned error must be compatible with the status package.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

参数解释:

  • ctx context.Context是请求上下文
  • desc *StreamDesc包含了流中描述的信息
  • cc *ClientConn是调用RPC的客户端连接
  • method string是请求的方法名
  • streamer Streamer是一个创建客户端流的处理器,流式拦截器中需要调用它。
  • opts ...CallOption包含了所有适用呼叫选项,包括来自于客户端连接的默认选项和所有的呼叫。

3、实现客户端拦截器

下面是具体实现:

func Unary() grpc.UnaryClientInterceptor {
	return func(
		ctx context.Context,
		method string,
		req, reply interface{},
		conn *grpc.ClientConn,
		invoker grpc.UnaryInvoker, //回调函数
		opts ...grpc.CallOption,
	) error {
		log.Printf("-------> unary interceptor: %s", method)
		if authMethods[method] { //如果是拦截的方法,在调用实际的rpc方法之前将token添加到context中,这个存储需要拦截的方法
			return invoker(attachToken(ctx), method, req, reply, conn, opts...) //attachToken为自己实现的客户端拦截请求之后附加token的方法
		}
		return invoker(ctx, method, req, reply, conn, opts...)
	}
}
// Stream returns a client interceptor to authenticate stream RPC
func Stream() grpc.StreamClientInterceptor {
	return func(
		ctx context.Context,
		desc *grpc.StreamDesc,
		conn *grpc.ClientConn,
		method string,
		streamer grpc.Streamer,
		opts ...grpc.CallOption,
	) (grpc.ClientStream, error) {
		log.Printf("-------> stream interceptor: %s", method)
		if interceptor.authMethods[method] {
			return streamer(attachToken(ctx), desc, conn, method, opts...)
		}
		return streamer(ctx, desc, conn, method, opts...)
	}
}

使用也比较简单,在Dial中添加即可:

conn2, err := grpc.Dial(
	Address, //地址
	transportOption, //SSL/TLS证书选择
	grpc.WithUnaryInterceptor(Unary()),
	grpc.WithStreamInterceptor(Stream()))
if err != nil {
	log.Fatal("cannot dial server: ", err)
}

到此这篇关于gRPC中拦截器的使用详解的文章就介绍到这了,更多相关gRPC中的拦截器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Golang新提案:panic 能不能加个 PanicError?

    Golang新提案:panic 能不能加个 PanicError?

    这篇文章主要为大家介绍了Golang的新提案关于panic能不能加个PanicError的问题分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • 读取Go项目中的配置文件的方法

    读取Go项目中的配置文件的方法

    本文主要介绍了读取Go项目中的配置文件的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • Golang通过SSH执行交换机操作实现

    Golang通过SSH执行交换机操作实现

    这篇文章主要介绍了Golang通过SSH执行交换机操作实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • golang中beego入门

    golang中beego入门

    Beego是一个基于Go语言的开源框架,用于构建Web应用程序和API,本文主要介绍了golang中beego入门,具有一定的参考价值,感兴趣的可以了解一下
    2023-12-12
  • GO语言利用K近邻算法实现小说鉴黄

    GO语言利用K近邻算法实现小说鉴黄

    本文给大家分享的是一段GO语言利用K近邻算法实现小说鉴黄的方法,本方法的鉴别的关键是关键是向量点的选择和阈值的判定,推荐给大家,有需要的小伙伴可以参考下。
    2015-03-03
  • grpcurl通过命令行访问gRPC服务

    grpcurl通过命令行访问gRPC服务

    这篇文章主要为大家介绍了grpcurl通过命令行访问gRPC服务示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • SingleFlight模式的Go并发编程学习

    SingleFlight模式的Go并发编程学习

    这篇文章主要为大家介绍了SingleFlight模式的Go并发编程学习,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-04-04
  • 浅谈Go Channel 高级实践

    浅谈Go Channel 高级实践

    这篇文章主要介绍了浅谈Go Channel 高级实践,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-08-08
  • Go使用Google Gemini Pro API创建简单聊天机器人

    Go使用Google Gemini Pro API创建简单聊天机器人

    这篇文章主要为大家介绍了Go使用Google Gemini Pro API创建简单聊天机器人实现过程详解,本文将通过最新的gemini go sdk来实现命令行聊天机器人
    2023-12-12
  • go语言面试如何实现自旋锁?

    go语言面试如何实现自旋锁?

    这篇文章主要为大家介绍了go语言面试中常问的如何实现自旋锁问题实例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11

最新评论