RPC 技术分享#
RPC 介绍#
远程过程调用(Remote Procedure Call 缩写 RPC)是一个计算机通信协议,通俗来讲就是调用远程服务上的一个函数,在调用时将对象名、函数名、参数等传递给远程服务器,服务器将处理结果返回给客户端,客户端调用 server 端提供的接口就像是调用本地的函数一样。 RPC 的消息可以通过 TCP、UDP 或者 HTTP 等传输。
RPC 调用流程#
虽然说,远程过程调用并不需要我们关心如何编解码,如何通信,但是最基本的,如果一个方法需要支持远程过程调用,需要满足一定的约束和规范。 不同 RPC 框架的约束和规范是不同的,如果使用 Golang 的标准库 net/rpc,方法需要长这个样子:
1
| func (t *T) MethodName(argType T1, replyType *T2) error
|
即需要满足以下 5 个条件:
- 方法类型(T)是导出的(首字母大写)
- 方法名(MethodName)是导出的
- 方法有 2 个参数(argType T1, replyType *T2),均为导出/内置类型
- 方法的第 2 个参数一个指针(replyType *T2)
- 方法的返回值类型是 error
RPC 示例#
我们使用 golang
提供 net/rpc
包,我们先构造一个HelloService
类型
1
2
3
4
5
6
| type HelloService struct {}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}
|
其中 Hello 方法必须满足 Go 语言的 RPC 规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个 error 类型,同时必须是公开的方法。
然后就可以将 HelloService
类型的对象注册为一个 RPC 服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func main() {
rpc.RegisterName("HelloService", new(HelloService))
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
rpc.ServeConn(conn)
}
|
其中 rpc.Register
函数调用会将对象类型中所有满足 RPC 规则的对象方法注册为 RPC 函数,所有注册的方法会放在 HelloService
服务空间之下,通过 rpc.ServeConn
函数在该 TCP 链接上为对方提供 RPC 服务。
下面是客户端请求 HelloService
服务的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| package main
func main() {
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
|
首先是通过rpc.Dial
连接 RPC 服务,然后通过client.Call
调用具体的 RPC 方法。在调用client.Call
时,需要指明调用的 RPC 服务名字和方法名字,第二和第三个参数分别我们定义 RPC 方法的两个参数。
Protobuf#
Protobuf
是Protocol Buffers
的简称,它是 Google 公司开发的一种Interface Definition Language(IDL)
数据描述语言,Protobuf
是一套类似 Json 或者 XML 的数据传输格式和规范。它非常轻便高效,很适合做数据存储或 RPC
数据交换格式。
protobuf 与 go 语言常见数据类型映射表:
protobuf | golang |
---|
bool | bool |
string | string |
bytes | []byte |
int32 | int32 |
int64 | int64 |
uint32 | uint32 |
uint64 | uint64 |
float | float32 |
double | float64 |
复合类型映射表:
(1) 数组类型
等价 go 的 []string
1
2
3
| message HelloRequest {
repeated string name = 1;
}
|
(2) 嵌套类型
等价 go 的[]User
1
2
3
4
5
6
7
| message User {
string name = 1;
}
message HelloRequest {
repeated User users = 1;
}
|
(3) map
等价 go 的 map[string]string
1
2
3
| message HelloRequest {
map<string, string> names = 1;
}
|
简单示例#
1
2
| brew install protobuf
protoc --version
|
定义 hello.proto
类型
[src/proto/hello.proto]
1
2
3
4
5
6
7
8
9
10
11
| syntax = "proto3";
option go_package = ".;rpcs";
message Info {
string name = 1;
}
service HelloService {
rpc Hello (Info) returns (Info);
}
|
开头的 syntax 语句表示采用 proto3 的语法,go_package 定义 server 包名.
生成对应的 pb.go
文件
1
| protoc --go_out=src/server src/proto/hello.proto
|
重新定义
1
2
3
4
5
6
| type HelloService struct{}
func (p *HelloService) Hello(request *Info, reply *Info) error {
reply.Name = "hello:" + request.GetName()
return nil
}
|
生成 grpc 对应的代码
1
| protoc --go_out=plugins=grpc:src/server src/proto/hello.proto
|
gRPC 框架#
google 开源的 RPC 框架,支持 Python、Golang、Java 等众多开发语言。
grpc 优点:
- 性能好,比 json 编解码数读快几十倍。
- 代码生成方便,使用 proto 工具自动生成对应语言代码。
- 支持多种流传输方式,支持一元 RPC、服务端流式 RPC、客户端流式 RPC、双流向 RPC 共 4 中传输流。
- 有超时和取消处理机制,客户端和服务端在截止时间后对取消事件进行相关处理。
简单示例#
创建服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| func main() {
address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", 8080))
if err != nil {
log.Fatal(err)
}
inbound, err := net.ListenTCP("tcp", address)
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
listener := new(types.SimpleServer)
rpcs.RegisterSimpleServer(s, listener)
_ = s.Serve(inbound)
}
|
创建客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| func main() {
conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", 8080), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
c := rpcs.NewSimpleClient(conn)
in := bufio.NewReader(os.Stdin)
for {
line, _, err := in.ReadLine()
if err != nil {
log.Fatal(err)
}
reply, err := c.GetLine(context.Background(), &rpcs.SimpleRequest{Data: string(line)})
if err != nil {
log.Fatal(err)
}
log.Printf("Reply: %v, Data: %v", reply, reply.Data)
}
}
|
测试连接
在 grpc 中,一共有 4 种调用方式:
- 一元 RPC: 称为单次 RPC,也就是一问一答 RPC 请求,是最基础最常用的调用方式。
- 服务端流式 RPC: 是一个单向流,客户端发起一次普通 RPC 请求,服务端通过流式返回数据集。
- 客户端流式 RPC: 是一个单向流,客户端通过流式发送数据集,服务端回复一次普通 RPC 请求。
- 双向流式 RPC: 由客户端以流式发起请求,服务端同样以流式方式响应请求。一定有客户端发起,但交互方式(谁先谁后、一次发多少、相应多少、什么时候关闭)则由程序编写的方式来控制。
gRPC 调用方法
首先定义 proto
文件
[greeter.proto]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| syntax = "proto3";
option go_package = ".;rpcs";
service Greeter {
// 一元RPC
rpc SayUnary (HelloRequest) returns (HelloReply) {}
// 服务端流式RPC
rpc SayServerStream (HelloRequest) returns (stream HelloReply) {}
// 客户端流式RPC
rpc SayClientStream (stream HelloRequest) returns (HelloReply) {}
// 双向流式RPC
rpc SayBidirectionalStream (stream HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
|
生成 gRPC 调用代码
1
| protoc src/proto/greeter.proto --go_out=plugins=grpc:src/server
|
服务端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
type GreeterServer struct{}
func (g *GreeterServer) SayUnary(ctx context.Context, r *rpcs.HelloRequest) (*rpcs.HelloReply, error) {
fmt.Println("\nSayUnary receive req: " + r.Name)
return &rpcs.HelloReply{Message: "hello " + r.Name}, nil
}
func (g *GreeterServer) SayServerStream(r *rpcs.HelloRequest, stream rpcs.Greeter_SayServerStreamServer) error {
var err error
fmt.Println("\nSayServerStream receive req: " + r.Name)
for i := 0; i < 5; i++ {
err = stream.Send(&rpcs.HelloReply{Message: "hello " + r.Name + fmt.Sprintf(" %d", i)})
if err != nil {
return err
}
time.Sleep(time.Second)
}
return nil
}
func (g *GreeterServer) SayClientStream(stream rpcs.Greeter_SayClientStreamServer) error {
values := []string{}
defer func() {
fmt.Println("\nSayClientStream receive req: ", values)
}()
for {
// 默认的MaxReceiveMessageSize值为1024x1024x4字节
resp, err := stream.Recv()
if err != nil {
// 判断是否数据流结束
if err == io.EOF {
break
}
return err
}
values = append(values, resp.Name)
}
time.Sleep(10 * time.Millisecond)
return stream.SendAndClose(&rpcs.HelloReply{
Message: "hello " + strings.Join(values, ","),
})
}
func (g *GreeterServer) SayBidirectionalStream(stream rpcs.Greeter_SayBidirectionalStreamServer) error {
sendValues := []string{}
receiveValues := []string{}
defer func() {
fmt.Println("SayBidirectionalStream send req: ", sendValues)
fmt.Println("SayBidirectionalStream receive req: ", receiveValues)
}()
for {
resp, err := stream.Recv()
if err != nil {
// 判断是否数据流结束
if err == io.EOF {
break
}
return err
}
err = stream.Send(&rpcs.HelloReply{Message: "hello " + resp.Name})
if err != nil {
return err
}
sendValues = append(sendValues, resp.Name)
receiveValues = append(receiveValues, resp.Name)
}
return nil
}
|
客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
func SayUnary(client rpcs.GreeterClient) error {
resp, err := client.SayUnary(context.Background(), &rpcs.HelloRequest{Name: "CallUnary.0"})
if err != nil {
return err
}
fmt.Println("resp:", resp.Message)
return nil
}
func SayServerStream(client rpcs.GreeterClient) error {
stream, err := client.SayServerStream(context.Background(), &rpcs.HelloRequest{Name: "CallServerStream.0"})
if err != nil {
return err
}
for {
// 默认的MaxReceiveMessageSize值为1024x1024x4字节
resp, err := stream.Recv()
if err != nil {
// 判断是否数据流结束
if err == io.EOF {
break
}
return err
}
fmt.Println("resp:", resp.Message)
}
time.Sleep(10 * time.Millisecond)
return stream.CloseSend()
}
func SayClientStream(client rpcs.GreeterClient) error {
stream, err := client.SayClientStream(context.Background())
if err != nil {
return err
}
names := []string{"CallClientStream:1", "CallClientStream:2", "CallClientStream:3"}
for _, name := range names {
err := stream.Send(&rpcs.HelloRequest{Name: name})
if err != nil {
return err
}
time.Sleep(time.Second)
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
fmt.Println("resp:", resp.Message)
return nil
}
func SayBidirectionalStream(client rpcs.GreeterClient) error {
stream, err := client.SayBidirectionalStream(context.Background())
if err != nil {
return err
}
names := []string{"CallBidirectionalStream:1", "CallBidirectionalStream:2", "CallBidirectionalStream:3", "CallBidirectionalStream:4"}
for _, name := range names {
err := stream.Send(&rpcs.HelloRequest{Name: name})
if err != nil {
return err
}
resp, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return err
}
fmt.Println("resp:", resp.Message)
}
time.Sleep(10 * time.Millisecond)
return stream.CloseSend()
}
|
启动服务端服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func main() {
s := grpc.NewServer()
rpcs.RegisterGreeterServer(s, new(types.GreeterServer))
reflection.Register(s)
list, err := net.Listen("tcp", "127.0.0.1:8080")
if err != nil {
panic(err)
}
err = s.Serve(list)
if err != nil {
panic(err)
}
}
|
启动客户端服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| func main() {
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
client := rpcs.NewGreeterClient(conn)
fmt.Println("一元RPC调用示例:SayUnary")
if err := rpcs.SayUnary(client); err != nil {
panic(err)
}
fmt.Println("")
fmt.Println("服务端流式RPC:SayServerStream")
if err := rpcs.SayServerStream(client); err != nil {
panic(err)
}
fmt.Println("")
fmt.Println("客户端流式RPC:SayClientStream")
if err := rpcs.SayClientStream(client); err != nil {
panic(err)
}
fmt.Println("")
fmt.Println("双向流式RPC:SayBidirectionalStream")
if err := rpcs.SayBidirectionalStream(client); err != nil {
panic(err)
}
}
|
可以看到,使用 gRpc 使用流式调用可以更灵活的支持如数据实时查询同步能业务场景。
grpc 拦截器#
grpc 拦截器(Interceptor)可以在每一个 RPC 方法的前面或后面做统一的特殊处理,并且不直接侵入业务代码, 例如鉴权校验、超时控制、日志记录、链路跟踪等。
拦截器的类型分为两种:
- 一元拦截器(UnaryInterceptor):拦截和处理一元 RPC 调用。
- 流拦截器(StreamInterceptor):拦截和处理流式 RPC 调用。
创建一个一元拦截器方法
1
2
3
4
| func UnaryServerLoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
return resp, err
}
|
使用方法
1
2
3
4
5
6
7
| func main(){
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(middlewares.UnaryServerLoggingInterceptor),
}
s := grpc.NewServer(opts...)
}
|
go-grpc-middleware 拦截库#
因为 grpc 拦截器类型不能重复,当需要多个拦截器时,借助 go-grpc-middleware
库来实现
安装库
1
| go get -u github.com/grpc-ecosystem/go-grpc-middleware
|
定义两个一元服务端拦截器
1
2
3
4
5
6
7
8
9
| func UnaryServerInterceptor1(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
return resp, err
}
func UnaryServerInterceptor2(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
return resp, err
}
|
使用方法
1
2
3
4
5
6
7
| func main() {
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(middlewares.UnaryServerInterceptor1, middlewares.UnaryServerInterceptor2)),
}
s := grpc.NewServer(opts...)
}
|
proto 编译引用外部包问题
调试 grpc 工具#
grpcurl#
grpcurl 是一个命令行工具,可让您与 gRPC 服务器进行交互,基本上是 curl 针对 gRPC 服务器的。
grpcurl
1
2
| brew install grpcurl
grpcurl --version
|
1
| grpcurl -plaintext 127.0.0.1:8080 list
|
1
| grpcurl -plaintext 127.0.0.1:8080 list Greeter
|
1
| grpcurl -plaintext 127.0.0.1:8080 describe Greeter
|
1
2
| grpcurl -plaintext -d '{"name":"grpc"}' 127.0.0.1:8080 Greeter.SayUnary
grpcurl -plaintext -d @ 127.0.0.1:8080 Greeter.SayUnary
|