4、RPC框架解析:gRPC调用流程

对gRPC实践后,可以跟着调用链路分析下gRPC的调用流程。

这里抛几个问题:

  • gRPC调用经历了哪些过程?
  • gRPC内部调用的实现是什么样?
  • gRPC底层传输依赖什么协议?
  • client端如何发起的请求?
  • server如何接收的请求?

4.1、探索客户端

4.1.1、client的生成

以下是demo里面的client实现,生成一个client,然后发起SayHello请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
client := api.NewGreeterClient()
req := &helloworld.HelloRequest{
Name: "ldaysjun",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
rsp, err := client.SayHello(ctx, req)
if err != nil {
log.Fatalf("failed to serve: %v", err)
}

fmt.Println("rsp.message = ", rsp.Message)
}

client的构建依赖以下func。

1
2
3
4
5
6
7
8
func NewGreeterClient() helloworld.GreeterClient {
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
c := helloworld.NewGreeterClient(conn)
return c
}

helloworld.GreeterClient是通过protoc生成的stub代码。如下:

1
2
3
4
5
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GreeterClient interface {
// Sends a greeting
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}

具体的client是通过helloworld.NewGreeterClient(conn)生成的,他的实现如下:

1
2
3
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}

在桩代码中不难发现,greeterClient结构体就是GreeterClient的实现:

1
2
3
4
5
6
7
8
9
10
11
12
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/rpc_demo.helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

4.1.2、SayHello的发起

执行client.SayHello(ctx, req),发起greeterClientSayHello的调用,实现如下:

1
2
3
4
5
6
7
8
9
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
// 发起调用
err := c.cc.Invoke(ctx, "/rpc_demo.helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

整体的调用流程如下:

进入Invoke,Invoke主要实现请求参数的整合与拦截器的调用,实现如下:

1
2
3
4
5
6
7
8
9
10
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// 合并拦截器参数与请求参数
opts = combine(cc.dopts.callOptions, opts)
// 拦截器不为空,调用拦截器
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
// 发起调用
return invoke(ctx, method, args, reply, cc, opts...)
}

无论拦截器是否为空,最终都会调用invoke,invoke中具体实现客户端流的创建,发送数据,接收数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
// 创建客户端流
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
// 发送数据
if err := cs.SendMsg(req); err != nil {
return err
}
// 接收数据
return cs.RecvMsg(reply)
}

4.1.2.1、SendMsg发送数据

实现流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (cs *clientStream) SendMsg(m interface{}) (err error) {
defer func() {
// 错误处理
if err != nil && err != io.EOF {
cs.finish(err)
}
}()
// 状态判断
if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !cs.desc.ClientStreams {
cs.sentLast = true
}

// 序列化数据
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}

// 检查发送数据长度
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
msgBytes := data
// 发送函数创建
op := func(a *csAttempt) error {
// 发送数据
err := a.sendMsg(m, hdr, payload, data)
m, data = nil, nil
return err
}
// 执行发送,开启重试功能
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
Message: msgBytes,
})
}
return
}

4.1.2.2、RecvMsg接收数据

1
2
3
4
5
6
7
8
9
10
11
12
13
func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.binlog != nil && !cs.serverHeaderBinlogged {
cs.Header()
}
var recvInfo *payloadInfo
if cs.binlog != nil {
recvInfo = &payloadInfo{}
}
err := cs.withRetry(func(a *csAttempt) error {
// 接收数据,反序列化,生成m回包
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
}

4.2、服务端探索

Server开启后,会等待连接请求,实现如下(省略其他代码),当接收到请求,开启一个goroutine处理该连接。

1
2
3
4
5
6
for {
rawConn, err := lis.Accept()
go func() {
s.handleRawConn(rawConn)
}()
}

4.2.1、handleRawConn

handleRawConn中主要设置连接超时时间,鉴权,建立HTTP2的连接。请求处理。

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (s *Server) handleRawConn(rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
}
// 设置超时设计
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
// 鉴权
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
if err != credentials.ErrConnDispatched {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
return
}

// 建立HTTP2连接
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(st) {
return
}
// 处理请求
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}

4.2.2、serveStreams

serveStreams的处理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
// 请求处理函数
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
// 请求处理
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}

4.2.3、HandleStreams

HandleStreams循环处理客户端的请求数据,根据frame执行不同的方法。MetaHeadersFrame类型就是执行具体的业务实现。traceCtx将跟踪附加到ctx并返回新的上下文。

大致流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
// 循环执行
for {
t.controlBuf.throttle()
// 读取数据
frame, err := t.framer.fr.ReadFrame()
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
rst: true,
rstCode: se.Code,
onWrite: func() {},
})
}
continue
}
// 错误处理
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
// 判断类型
switch frame := frame.(type) {
// 执行业务请求
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}

4.2.4、handleStream

最后回调执行handleStream,它会解析函数描述,执行注册的业务接口大致执行如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) 
// 解析函数
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
// 解析pos
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
// 获取服务名
service := sm[:pos]
// 获取服务名,获取方法名
method := sm[pos+1:]
// 检查该接口是否存在
srv, knownService := s.m[service]
if knownService {
if md, ok := srv.md[method]; ok {
// 执行业务逻辑
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
// 流式方法执行
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
var errDesc string
if !knownService {
errDesc = fmt.Sprintf("unknown service %v", service)
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if trInfo != nil {
trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}

4.3、总结

​ 通篇主要是讲解gRPC的请求过程,具体的比如服务的创建,连接。编解码,http2的创建,这些细节都没有详细描述,后面有机会在具体探讨。这次主要是了解大体流程。

1
"personID":"1551181497964383"