5、RPC框架解析:gRPC服务发现

5.1、简介

​ 集群服务中的节点ip不会永远不变,如果变化了怎么办?1台机器处理不过来,扩容了3台,如何让新增的3台快速投入使用,流量如何很快的流入新增的机器。集群中出现有问题的机器,如何下线呢?解决这些问题需要引入一个新的概念,服务发现

​ 服务发现需要解决以下问题:

  • 服务IP与端口的确定方式。
  • 服务注册与发现。
  • 服务下线
  • 服务健康监测。
  • 节点加入或退出,如何通知订阅者变化。
  • 查看应用的订阅列表,发布列表,以及订阅节点。

当A节点访问B集群的机器,这次访问究竟应该访问哪个节点?基于什么策略如何确定的?这又涉及到另一个概念,负载均衡,它是提供高可用服务的关键基础组件,核心作用就是将大量请求以合理的方式分配到多个执行单元上去执行,达到最优化的资源使用方式,避免多个单元同时发生过载。

比如我们从注册中心查询到10个可用节点,通过负载均衡器,可以将我们的10个请求,以一种策略打散到不同的可执行节点上,保障可用性。

关于基于etcd的服务发现与负载均衡算法可以先了解下。

rpc_demo

5.2、实践

5.2.1、etcd安装

etcd下载地址:etcd

1
2
3
4
5
6
7
8
9
10
11
cd etcd-v3.4.7-darwin-amd64
./etcd

2020-04-20 20:11:59.612972 I | etcdmain: etcd Version: 3.4.7
2020-04-20 20:11:59.613083 I | etcdmain: Git SHA: e694b7bb0
2020-04-20 20:11:59.613091 I | etcdmain: Go Version: go1.12.17
2020-04-20 20:11:59.613098 I | etcdmain: Go OS/Arch: darwin/amd64
2020-04-20 20:11:59.613105 I | etcdmain: setting maximum number of CPUs to 8, total number of available CPUs is 8
2020-04-20 20:11:59.613129 N | etcdmain: failed to detect default host (default host not supported on darwin_amd64)
2020-04-20 20:11:59.613142 W | etcdmain: no data-dir provided, using default data-dir ./default.etcd
2020-04-20 20:11:59.613231 N | etcdmain: the server is already initialized as member before, starting as etcd member...

5.2.1、Load balancing

可以看下grpc关于负载均衡的demo

负载均衡算法中介绍了两种负载均衡模式,客户端模式服务端模式,gRPC官网推荐的是客户端模式,既进程内负载均衡。

客户端从注册中心拉取所有注册的服务,然后让问server B。

5.2.2、Builder

首先实现Builder接口,Builder的作用是创建一个解析器,该解析器将用于监听目标注册中心server节点的变化。

1
2
3
4
5
6
7
8
9
10
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}

接着实现Resolver接口,Resolver监听目标节点的地址的变化更新。

1
2
3
4
5
6
7
8
9
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}

rpc请求的时候,会调用Builder获取Resolver解析器。那么我们在Builder中实现与etcd客户端的初始化,并且开启解析器根据servicename读取当前配置的ip:port。

实现ETCDResolverBuilder,这里连接etcd有个小坑,就是连接超时并不提示错误,只有真正执行指令的时候才会报错,这里需要通过cli.Status主动检查下连接状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
const (
ExampleScheme = "test"
)

func (e *ETCDResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
once.Do(func() {
// 设置etcd链接配置
clientConfig := clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
}
cli, _ = clientv3.New(clientConfig)
// 连接状态检查
timeoutCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := cli.Status(timeoutCtx, clientConfig.Endpoints[0])
if err != nil {
panic(err)
}
})

// 初始化ETCDResolver解析器
r := &ETCDResolver{
target: target,
cc: cc,
}
key := fmt.Sprintf("/%s/%s/", target.Scheme, target.Endpoint)
// 开启解析
r.start(key)
return r, nil
}

func (e *ETCDResolverBuilder) Scheme() string {
return ExampleScheme
}

5.2.3、Resolver

实现ETCDResolver解析器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type ETCDResolver struct {
target resolver.Target
cc resolver.ClientConn
addrStores map[string][]string
}

// grcp再次尝试解析的时候调用,如果不需要实现特殊逻辑,可以不用考虑
func (e *ETCDResolver) ResolveNow(o resolver.ResolveNowOptions) {
fmt.Println("ResolveNow")
}
func (e *ETCDResolver) Close() {
fmt.Println("close")
}

func (e *ETCDResolver) start(serviceName string) {
var addrList []resolver.Address
// 前缀查询
if result, err := cli.Get(context.Background(), serviceName, clientv3.WithPrefix()); err == nil {
for i := range result.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(result.Kvs[i].Key), serviceName)})
}
}
// 更新目标服务地址
e.cc.UpdateState(resolver.State{Addresses: addrList})
}

5.2.4、服务注册

通过etcd客户端的put方法可以直接注册服务。更新NewGreeter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package logic

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/ldaysjun/rpc_learn/protobuf/helloworld"
"time"
)

const (
ServiceName = "demo.hello.world"
Scheme = "test"
)

type greeter struct {
cli *clientv3.Client
}

func NewGreeter() (helloworld.GreeterServer, error) {
// 初始化etcd客户端
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout:time.Second * 3,
})
imp := &greeter{
cli: cli,
}
// 注册服务
imp.ServiceRegister(Scheme,ServiceName,"localhost:50052")
return imp, nil
}

// 注册服务到etcd
func (g *greeter) ServiceRegister(scheme,serviceName, addr string) {
key := fmt.Sprintf("/%s/%s/%s",scheme,ServiceName,addr)
// 执行put方法
timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Second)
_,err := g.cli.Put(timeoutCtx,key,addr,clientv3.WithPrevKV())
if err != nil {
// 注册失败直接panic
panic(err)
}
}

5.2.5、服务变更

​ 服务变更是很常见的现象,比如服务下线,服务扩容上线更多新的节点。这些节点如何有效并且及时的同步到客户端呢?

​ etcd提供了watch的能力,watch的作用就是在新服务改变(重新注册,下线等)后, 告知各个服务执行相应的逻辑。所以只需要在解析器中使用watch方法,根据对应的变更来调用UpdateState更新客户端解析器维护的节点列表即可,实现如下:

5.2.5.1、新服务上线

通过etcd客户端的watch方法,收到通知后,如果是PUT操作,则做目标服务节点的更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (e *ETCDResolver) watch(serviceName string, addrList []resolver.Address) {
addrMapper := make(map[string]int)
for _, addr := range addrList {
addrMapper[addr.Addr] = 1
}
// 前缀查询
ch := cli.Watch(context.TODO(), serviceName, clientv3.WithPrefix())
for {
select {
case c := <-ch:
// 读取时间
for _, event := range c.Events {
addr := string(event.Kv.Value)
switch event.Type {
// 新增节点
case mvccpb.PUT:
// 判断该节点是否存在,不存在则更新该节点
if _, ok := addrMapper[addr]; !ok {
addrList = append(addrList, resolver.Address{Addr: addr})
e.cc.UpdateState(resolver.State{Addresses: addrList})
addrMapper[addr] = 1
}
case mvccpb.DELETE:
}
}
}
}
}

5.2.5.2、服务下线

服务下线需要主动去注册中心注销自己的节点信息,并且我们把注册操作移到main函数中处理,注销实现如下:

1
2
3
4
5
6
7
8
9
10
11
// 从etcd注销服务
func (g *greeter) ServiceLogout(scheme, serviceName, addr string) {
key := fmt.Sprintf("/%s/%s/%s", scheme, ServiceName, addr)
// 执行put方法
timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Second)
_, err := g.cli.Delete(timeoutCtx, key)
if err != nil {
// 注册失败直接panic
panic(err)
}
}

main函数监听退出事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import (
"github.com/ldaysjun/rpc_learn/protobuf/helloworld"
"github.com/ldaysjun/rpc_learn/rpc_demo/internal/logic"
"google.golang.org/grpc"
"log"
"net"
"os"
"os/signal"
"syscall"
)

const addr = "127.0.0.1:50052"

func main() {
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
return
}
s := grpc.NewServer()
imp, err := logic.NewGreeter()
if err != nil {
panic(err)
}
// 注册服务
imp.ServiceRegister(logic.Scheme,logic.ServiceName, addr)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
if i, ok := s.(syscall.Signal); ok {
// 退出注销服务
imp.ServiceLogout(logic.Scheme,logic.ServiceName,addr)
os.Exit(int(i))
} else {
os.Exit(0)
}
}()

helloworld.RegisterGreeterServer(s, imp)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

服务正常下线,我们要主动告知客端,让客户端在自己维护的目标服务节点中把自己踢出掉。也是通过Watch,处理DELETE事件即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (e *ETCDResolver) watch(serviceName string, addrList []resolver.Address) {
addrMapper := make(map[string]int)
for _, addr := range addrList {
addrMapper[addr.Addr] = 1
}
// 前缀查询
ch := cli.Watch(context.TODO(), serviceName, clientv3.WithPrefix())
for {
select {
case c := <-ch:
for _, event := range c.Events {
addr := strings.TrimPrefix(string(event.Kv.Key), serviceName)
switch event.Type {
// 新增服务事件
case mvccpb.PUT:
// 如果该addr不存在,则增加该节点
if _, ok := addrMapper[addr]; !ok {
addrList = append(addrList, resolver.Address{Addr: addr})
e.cc.UpdateState(resolver.State{Addresses: addrList})
addrMapper[addr] = 1
}
// 过期或下线服务事件
case mvccpb.DELETE:
if _, ok := addrMapper[addr]; ok {
for i := range addrList {
// 找到该节点,并且删除
if addrList[i].Addr == addr {
addrList[i] = addrList[len(addrList)-1]
addrList = addrList[:len(addrList)-1]
delete(addrMapper, addr)
break
}
}
// 更新服务
e.cc.UpdateState(resolver.State{Addresses: addrList})
}
}
}
}
}
}

5.2.6、健康检查

​ 正常情况下退出,我们可以主动通知etcd注销该下线的节点,但是在异常退出的情况下,可能没有执行注销操作。甚至是正常退出后,也发出了注销通知,但是因为网络环境的原因导致这个通知没有通知到etcd。这种情况下,客户端依然认为该节点可用,继续向该节点发起请求,结果显而易见肯定是超时,因为目标节点已经不存在了。为了避免这种情况出现,引入一个新的概念就是健康检查,定期检查该服务节点是否有效,如果失效了及时通知客户端去删除该节点。

​ etcd健康检查通过租约的方式实现。 用户可以在etcd中注册服务,并且对注册的服务设置key TTL,定时保持服务的心跳以达到监控健康状态的效果。需要维护一个TTL(V3 使用 lease实现),类似于心跳。申请lease租约,设置服务生存周期TTL,让key支持自动过期,在服务正常的状态下,通过KeepAlive定期去续租,避免过期。

​ 通过Grant创建一个租约,并且设置过期时间为9s,在put的时候设置该租约。使用goroutine去定期续租,租约过期时间是9s,那么KeepAlive平均会3s执行一次,这里还需要考虑一个异常场景,ka为nil,极有可能在Keepalive时Lease已经过期,所以需要进行容错处理,分配新的Lease进行重试,这种可能是因为网络原因导致的丢包,续租失败,所以重新执行put就可以

​ 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 注册服务到etcd
func (g *greeter) ServiceRegister(scheme, serviceName, addr string) {
leaseGrantResp, err := g.cli.Grant(context.TODO(), 9)
if err != nil {
log.Fatal(err)
}
key := fmt.Sprintf("/%s/%s/%s", scheme, ServiceName, addr)
// 执行put方法
timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Second)
_, err = g.cli.Put(timeoutCtx, key, addr, clientv3.WithPrevKV(), clientv3.WithLease(leaseGrantResp.ID))
if err != nil {
// 注册失败直接panic
panic(err)
}
go func() {
for {
ch, err := g.cli.KeepAlive(context.TODO(), leaseGrantResp.ID)
if err != nil {
fmt.Println(err)
}
select {
case ka := <-ch:
// 续租前过期,重新生成新的租约,重新put
if ka == nil {
leaseGrantResp, err = g.cli.Grant(context.TODO(), 2)
if err != nil {
log.Fatal(err)
}
timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Second)
_, err = g.cli.Put(timeoutCtx, key, addr, clientv3.WithPrevKV(), clientv3.WithLease(leaseGrantResp.ID))
if err != nil {
// 注册失败直接panic
panic(err)
}
}
fmt.Println("ttl:", ka)
}
}
}()
}

5.3、总结

​ 在rpc框架中,服务发现与注册非常重要。这里使用gRPC+etcd实现了一个demo。Etcd是Kubernetes集群中的一个十分重要的组件,用于保存集群所有的网络配置和对象的状态信息.很多公司在生产环境中都在使用Etcd,如果微服务技术栈都是使用go,很推荐etcd作为基础组件。