7、RPC框架解析:gRPC连接池

7.1、简介

​ 连接池很多同学都肯定接触过,也都使用过。连接池的主要作用就是连接复用,每次访问从池子里获取已经创建好的连接,这样可以减少创建,回收等一系列的操作所产生的的资源消耗,可以极大的提升服务的性能。我们通过分析go-mico的gRPC连接池的实现来总结连接池设计的方法论。

7.2、gRPC的传输基于什么协议?

gRPC是基于HTTP/2的,为什么是HTTP/2,大家可以直接Google,有很多博主的回答都很优秀。总结下来主要是:

  • 连接非阻塞的,多路复用的。
  • 头部压缩,基于二进制协议的传输协议。
  • stream概念。
  • 最重要的另一个原因就是,通用。移动终端设备也是最先使用HTTP/2。

7.3 、gRPC连接池的实现

连接池的实现,基本上都是这4个method

  • getConn 获取连接
  • release 释放连接
  • removeConn 移除连接
  • addConnAfter 添加连接

7.3.1 getConn 获取一条连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) {
// 获取当前时间戳
now := time.Now().Unix()
// 加锁
p.Lock()
// 根据addr获取streamsPool
sp, ok := p.conns[addr]
// 如果该streamsPool不存在,则创建一个新的
if !ok {
// 创建streamsPool
sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0}
// 保存到pool.conns中
p.conns[addr] = sp
}
// 从头部开始遍历conn,先取链表头部的下一个
conn := sp.head.next
// 开始遍历
for conn != nil {
// 获取当前连接状态
switch conn.GetState() {
// Connecting 表示当前连接正在连接中
case connectivity.Connecting:
// 不符合要求,切换下一个conn
conn = conn.next
continue
// Shutdown 标识该连接已经关闭
case connectivity.Shutdown:
// 不符合要求,切换下一个conn
next := conn.next
// 如果该链接中streams为0,则表示可以移除
if conn.streams == 0 {
// 移除该链接
removeConn(conn)
// 空闲数减1
sp.idle--
}
// 移动
conn = next
continue
// TransientFailure 标识该连接出现故障,但是有机会恢复
case connectivity.TransientFailure:
// 不符合要求,切换下一个conn
next := conn.next
// 如果streams数为0,则移除该链接
if conn.streams == 0 {
// 移除连接
removeConn(conn)
// 关闭
conn.ClientConn.Close()
// 空闲数减1
sp.idle--
}
// 移动切换
conn = next
continue
// Ready 表示已经就绪准备好的连接
case connectivity.Ready:
// Idle 表示空闲连接,也可以直接使用
case connectivity.Idle:
}
// 判断该连接是否过期,当前时间-创建时间>最大活跃周期,则过期,执行回收逻辑,移动conn,执行切换
if now-conn.created > p.ttl {
// 标记下一位
next := conn.next
// 如果streams为0,执行删除
if conn.streams == 0 {
// 执行连接的移除
removeConn(conn)
// 关闭链接
conn.ClientConn.Close()
// 最大空闲减1
sp.idle--
}
// 切换链接
conn = next
continue
}
// 判断是够该连接是否过载
if conn.streams >= p.maxStreams {
// 标记下一位连接
next := conn.next
// 移除连接
removeConn(conn)
// 添加该连接到busy队列中
addConnAfter(conn, sp.busy)
// 切换连接
conn = next
continue
}
// 如果当前连接的streams为0,但是将被使用,所以需要将空闲队列做减1操作
if conn.streams == 0 {
sp.idle--
}
// streams加1,被使用
conn.streams++
// 解锁
p.Unlock()
// 返回该连接
return conn, nil
}
p.Unlock()

// 通过遍历没有发现可以使用的连接,则创建一个新连接
cc, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
// 连接初始化
conn = &poolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false}

// 加锁
p.Lock()
// 如果当前streamsPool中的连接数量<最大值,则将该连接加入队列,后续复用
if sp.count < p.size {
addConnAfter(conn, sp.head)
}
// 解锁
p.Unlock()

return conn, nil
}

7.3.1 removeConn 移除连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 重置pre连接
if conn.pre != nil {
conn.pre.next = conn.next
}
// 重置next连接
if conn.next != nil {
conn.next.pre = conn.pre
}
// 链表置空相关操作
conn.pre = nil
conn.next = nil
conn.in = false
// streamsPool 连接数减1
conn.sp.count--
return

###7.3.2 addConnAfter 添加conn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func addConnAfter(conn *poolConn, after *poolConn) {
// 链表相关操作
conn.next = after.next
// 修改前置指针
conn.pre = after
if after.next != nil {
after.next.pre = conn
}
// 移动
after.next = conn
conn.in = true
// streamsPool 加1
conn.sp.count++
return
}