RPC 技术分享

RPC 介绍

远程过程调用(Remote Procedure Call 缩写 RPC)是一个计算机通信协议,通俗来讲就是调用远程服务上的一个函数,在调用时将对象名、函数名、参数等传递给远程服务器,服务器将处理结果返回给客户端,客户端调用 server 端提供的接口就像是调用本地的函数一样。 RPC 的消息可以通过 TCP、UDP 或者 HTTP 等传输。

RPC 调用流程

rpc-procedure.jpeg

虽然说,远程过程调用并不需要我们关心如何编解码,如何通信,但是最基本的,如果一个方法需要支持远程过程调用,需要满足一定的约束和规范。 不同 RPC 框架的约束和规范是不同的,如果使用 Golang 的标准库 net/rpc,方法需要长这个样子:

1
func (t *T) MethodName(argType T1, replyType *T2) error

即需要满足以下 5 个条件:

  • 方法类型(T)是导出的(首字母大写)
  • 方法名(MethodName)是导出的
  • 方法有 2 个参数(argType T1, replyType *T2),均为导出/内置类型
  • 方法的第 2 个参数一个指针(replyType *T2)
  • 方法的返回值类型是 error

RPC 示例

我们使用 golang 提供 net/rpc 包,我们先构造一个HelloService类型

1
2
3
4
5
6
type HelloService struct {}

func (p *HelloService) Hello(request string, reply *string) error {
    *reply = "hello:" + request
    return nil
}

其中 Hello 方法必须满足 Go 语言的 RPC 规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个 error 类型,同时必须是公开的方法。

然后就可以将 HelloService 类型的对象注册为一个 RPC 服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func main() {
    rpc.RegisterName("HelloService", new(HelloService))

    listener, err := net.Listen("tcp", ":1234")
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    conn, err := listener.Accept()
    if err != nil {
        log.Fatal("Accept error:", err)
    }

    rpc.ServeConn(conn)
}

其中 rpc.Register 函数调用会将对象类型中所有满足 RPC 规则的对象方法注册为 RPC 函数,所有注册的方法会放在 HelloService 服务空间之下,通过 rpc.ServeConn 函数在该 TCP 链接上为对方提供 RPC 服务。

下面是客户端请求 HelloService 服务的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package main

func main() {
    client, err := rpc.Dial("tcp", "localhost:1234")
    if err != nil {
        log.Fatal("dialing:", err)
    }

    var reply string
    err = client.Call("HelloService.Hello", "hello", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

首先是通过rpc.Dial连接 RPC 服务,然后通过client.Call调用具体的 RPC 方法。在调用client.Call时,需要指明调用的 RPC 服务名字和方法名字,第二和第三个参数分别我们定义 RPC 方法的两个参数。

Protobuf

ProtobufProtocol Buffers的简称,它是 Google 公司开发的一种Interface Definition Language(IDL)数据描述语言,Protobuf是一套类似 Json 或者 XML 的数据传输格式和规范。它非常轻便高效,很适合做数据存储或 RPC 数据交换格式。

protobuf 与 go 语言常见数据类型映射表:

protobufgolang
boolbool
stringstring
bytes[]byte
int32int32
int64int64
uint32uint32
uint64uint64
floatfloat32
doublefloat64

复合类型映射表:

(1) 数组类型

等价 go 的 []string

1
2
3
message HelloRequest {
    repeated string name = 1}

(2) 嵌套类型

等价 go 的[]User

1
2
3
4
5
6
7
message User {
    string name = 1;
}

message HelloRequest {
    repeated User users = 1;
}

(3) map

等价 go 的 map[string]string

1
2
3
message HelloRequest {
    map<string, string> names = 1;
}

简单示例

  • 首先安装 protobuf 代码工具
1
2
brew install protobuf
protoc --version

定义 hello.proto 类型

[src/proto/hello.proto]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
syntax = "proto3";

option go_package = ".;rpcs";

message Info {
  string name = 1;
}

service HelloService {
  rpc Hello (Info) returns (Info);
}

开头的 syntax 语句表示采用 proto3 的语法,go_package 定义 server 包名.

生成对应的 pb.go 文件

1
protoc --go_out=src/server src/proto/hello.proto

重新定义

1
2
3
4
5
6
type HelloService struct{}

func (p *HelloService) Hello(request *Info, reply *Info) error {
    reply.Name = "hello:" + request.GetName()
    return nil
}

生成 grpc 对应的代码

1
protoc --go_out=plugins=grpc:src/server src/proto/hello.proto

gRPC 框架

google 开源的 RPC 框架,支持 Python、Golang、Java 等众多开发语言。

grpc 优点:

  • 性能好,比 json 编解码数读快几十倍。
  • 代码生成方便,使用 proto 工具自动生成对应语言代码。
  • 支持多种流传输方式,支持一元 RPC、服务端流式 RPC、客户端流式 RPC、双流向 RPC 共 4 中传输流。
  • 有超时和取消处理机制,客户端和服务端在截止时间后对取消事件进行相关处理。

简单示例

创建服务端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func main() {
  address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", 8080))
	if err != nil {
		log.Fatal(err)
	}

	inbound, err := net.ListenTCP("tcp", address)
	if err != nil {
		log.Fatal(err)
	}

	s := grpc.NewServer()
	listener := new(types.SimpleServer)
	rpcs.RegisterSimpleServer(s, listener)
	_ = s.Serve(inbound)
}

创建客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
  conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%d", 8080), grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(err)
	}

	c := rpcs.NewSimpleClient(conn)

	in := bufio.NewReader(os.Stdin)
	for {
		line, _, err := in.ReadLine()
		if err != nil {
			log.Fatal(err)
		}
		reply, err := c.GetLine(context.Background(), &rpcs.SimpleRequest{Data: string(line)})
		if err != nil {
			log.Fatal(err)
		}
		log.Printf("Reply: %v, Data: %v", reply, reply.Data)
	}
}

测试连接

在 grpc 中,一共有 4 种调用方式:

  • 一元 RPC: 称为单次 RPC,也就是一问一答 RPC 请求,是最基础最常用的调用方式。
  • 服务端流式 RPC: 是一个单向流,客户端发起一次普通 RPC 请求,服务端通过流式返回数据集。
  • 客户端流式 RPC: 是一个单向流,客户端通过流式发送数据集,服务端回复一次普通 RPC 请求。
  • 双向流式 RPC: 由客户端以流式发起请求,服务端同样以流式方式响应请求。一定有客户端发起,但交互方式(谁先谁后、一次发多少、相应多少、什么时候关闭)则由程序编写的方式来控制。

gRPC 调用方法

首先定义 proto 文件

[greeter.proto]

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
syntax = "proto3";

option go_package = ".;rpcs";

service Greeter {
  // 一元RPC
  rpc SayUnary (HelloRequest) returns (HelloReply) {}
  // 服务端流式RPC
  rpc SayServerStream (HelloRequest) returns (stream HelloReply) {}
  // 客户端流式RPC
  rpc SayClientStream (stream HelloRequest) returns (HelloReply) {}
  // 双向流式RPC
  rpc SayBidirectionalStream (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

生成 gRPC 调用代码

1
protoc src/proto/greeter.proto --go_out=plugins=grpc:src/server

服务端代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

type GreeterServer struct{}

func (g *GreeterServer) SayUnary(ctx context.Context, r *rpcs.HelloRequest) (*rpcs.HelloReply, error) {
	fmt.Println("\nSayUnary receive req: " + r.Name)
	return &rpcs.HelloReply{Message: "hello " + r.Name}, nil
}

func (g *GreeterServer) SayServerStream(r *rpcs.HelloRequest, stream rpcs.Greeter_SayServerStreamServer) error {
	var err error
	fmt.Println("\nSayServerStream receive req: " + r.Name)

	for i := 0; i < 5; i++ {
		err = stream.Send(&rpcs.HelloReply{Message: "hello " + r.Name + fmt.Sprintf(" %d", i)})
		if err != nil {
			return err
		}
		time.Sleep(time.Second)
	}

	return nil
}

func (g *GreeterServer) SayClientStream(stream rpcs.Greeter_SayClientStreamServer) error {
	values := []string{}
	defer func() {
		fmt.Println("\nSayClientStream receive req: ", values)
	}()

	for {
		// 默认的MaxReceiveMessageSize值为1024x1024x4字节
		resp, err := stream.Recv()
		if err != nil {
			// 判断是否数据流结束
			if err == io.EOF {
				break
			}
			return err
		}

		values = append(values, resp.Name)
	}

	time.Sleep(10 * time.Millisecond)
	return stream.SendAndClose(&rpcs.HelloReply{
		Message: "hello " + strings.Join(values, ","),
	})
}

func (g *GreeterServer) SayBidirectionalStream(stream rpcs.Greeter_SayBidirectionalStreamServer) error {
	sendValues := []string{}
	receiveValues := []string{}

	defer func() {
		fmt.Println("SayBidirectionalStream send req: ", sendValues)
		fmt.Println("SayBidirectionalStream receive req: ", receiveValues)
	}()

	for {
		resp, err := stream.Recv()
		if err != nil {
			// 判断是否数据流结束
			if err == io.EOF {
				break
			}
			return err
		}

		err = stream.Send(&rpcs.HelloReply{Message: "hello " + resp.Name})
		if err != nil {
			return err
		}

		sendValues = append(sendValues, resp.Name)
		receiveValues = append(receiveValues, resp.Name)
	}

	return nil
}

客户端代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

func SayUnary(client rpcs.GreeterClient) error {
	resp, err := client.SayUnary(context.Background(), &rpcs.HelloRequest{Name: "CallUnary.0"})
	if err != nil {
		return err
	}

	fmt.Println("resp:", resp.Message)
	return nil
}

func SayServerStream(client rpcs.GreeterClient) error {
	stream, err := client.SayServerStream(context.Background(), &rpcs.HelloRequest{Name: "CallServerStream.0"})
	if err != nil {
		return err
	}

	for {
		// 默认的MaxReceiveMessageSize值为1024x1024x4字节
		resp, err := stream.Recv()
		if err != nil {
			// 判断是否数据流结束
			if err == io.EOF {
				break
			}
			return err
		}

		fmt.Println("resp:", resp.Message)
	}

	time.Sleep(10 * time.Millisecond)
	return stream.CloseSend()
}

func SayClientStream(client rpcs.GreeterClient) error {
	stream, err := client.SayClientStream(context.Background())
	if err != nil {
		return err
	}

	names := []string{"CallClientStream:1", "CallClientStream:2", "CallClientStream:3"}
	for _, name := range names {
		err := stream.Send(&rpcs.HelloRequest{Name: name})
		if err != nil {
			return err
		}
		time.Sleep(time.Second)
	}

	resp, err := stream.CloseAndRecv()
	if err != nil {
		return err
	}
	fmt.Println("resp:", resp.Message)

	return nil
}

func SayBidirectionalStream(client rpcs.GreeterClient) error {
	stream, err := client.SayBidirectionalStream(context.Background())
	if err != nil {
		return err
	}

	names := []string{"CallBidirectionalStream:1", "CallBidirectionalStream:2", "CallBidirectionalStream:3", "CallBidirectionalStream:4"}
	for _, name := range names {
		err := stream.Send(&rpcs.HelloRequest{Name: name})
		if err != nil {
			return err
		}

		resp, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}
			return err
		}

		fmt.Println("resp:", resp.Message)
	}

	time.Sleep(10 * time.Millisecond)
	return stream.CloseSend()
}

启动服务端服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func main() {
  s := grpc.NewServer()
	rpcs.RegisterGreeterServer(s, new(types.GreeterServer))

	reflection.Register(s)

	list, err := net.Listen("tcp", "127.0.0.1:8080")
	if err != nil {
		panic(err)
	}
	err = s.Serve(list)
	if err != nil {
		panic(err)
	}
}

启动客户端服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
  conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}

	client := rpcs.NewGreeterClient(conn)

	fmt.Println("一元RPC调用示例:SayUnary")
	if err := rpcs.SayUnary(client); err != nil {
		panic(err)
	}
	fmt.Println("")

	fmt.Println("服务端流式RPC:SayServerStream")
	if err := rpcs.SayServerStream(client); err != nil {
		panic(err)
	}
	fmt.Println("")

	fmt.Println("客户端流式RPC:SayClientStream")
	if err := rpcs.SayClientStream(client); err != nil {
		panic(err)
	}
	fmt.Println("")

	fmt.Println("双向流式RPC:SayBidirectionalStream")
	if err := rpcs.SayBidirectionalStream(client); err != nil {
		panic(err)
	}
}

可以看到,使用 gRpc 使用流式调用可以更灵活的支持如数据实时查询同步能业务场景。

grpc 拦截器

grpc 拦截器(Interceptor)可以在每一个 RPC 方法的前面或后面做统一的特殊处理,并且不直接侵入业务代码, 例如鉴权校验、超时控制、日志记录、链路跟踪等。

拦截器的类型分为两种:

  • 一元拦截器(UnaryInterceptor):拦截和处理一元 RPC 调用。
  • 流拦截器(StreamInterceptor):拦截和处理流式 RPC 调用。

创建一个一元拦截器方法

1
2
3
4
func UnaryServerLoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	resp, err := handler(ctx, req)
	return resp, err
}

使用方法

1
2
3
4
5
6
7
func main(){
  opts := []grpc.ServerOption{
		grpc.UnaryInterceptor(middlewares.UnaryServerLoggingInterceptor),
	}

	s := grpc.NewServer(opts...)
}

go-grpc-middleware 拦截库

因为 grpc 拦截器类型不能重复,当需要多个拦截器时,借助 go-grpc-middleware 库来实现

安装库

1
go get -u github.com/grpc-ecosystem/go-grpc-middleware

定义两个一元服务端拦截器

1
2
3
4
5
6
7
8
9
func UnaryServerInterceptor1(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	resp, err := handler(ctx, req)
	return resp, err
}

func UnaryServerInterceptor2(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	resp, err := handler(ctx, req)
	return resp, err
}

使用方法

1
2
3
4
5
6
7
func main() {
  opts := []grpc.ServerOption{
		grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(middlewares.UnaryServerInterceptor1, middlewares.UnaryServerInterceptor2)),
	}

	s := grpc.NewServer(opts...)
}

proto 编译引用外部包问题

调试 grpc 工具

grpcurl

grpcurl 是一个命令行工具,可让您与 gRPC 服务器进行交互,基本上是 curl 针对 gRPC 服务器的。

grpcurl

1
2
brew install grpcurl
grpcurl --version
  • 查看 grpc 服务提供哪些接口列表
1
grpcurl -plaintext 127.0.0.1:8080 list
  • 查看 grpc 服务提供接口的方法列表
1
grpcurl -plaintext 127.0.0.1:8080 list Greeter
  • 查看 grpc 服务提供接口的方法详情
1
grpcurl -plaintext 127.0.0.1:8080 describe Greeter
  • 调用方法
1
2
grpcurl -plaintext -d '{"name":"grpc"}' 127.0.0.1:8080 Greeter.SayUnary
grpcurl -plaintext -d @ 127.0.0.1:8080 Greeter.SayUnary

参考