二、Prometheus源码分析之:服务发现

​ 上一篇文章一、Prometheus源码分析之:数据采集),分析promentheus是如何采集target的指标数据的,接下来分析下,Prometheus的服务发现能力。在1.3版本后,服务发现能力成为独立模块。

2.1、简介

​ Prometheus通过pull拉取target数据,在上一篇文章中,target是通过静态文件配置的,难道后面增加一个target就需要手动配置,然后重新加载配置吗?其实加载target还支持一种方式:动态发现。

​ Prometheus目前支持以下平台的动态发现能力:

  • 容器编排系统:kubernetes
  • 云平台:EC2、Azure、OpenStack
  • 服务发现:DNS、Zookeeper、Consul 等。

2.2、加载配置

​ ServiceDiscoveryConfig配置结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type ServiceDiscoveryConfig struct {
// 静态服务发现配置
StaticConfigs []*targetgroup.Group `yaml:"static_configs,omitempty"`
// DNS服务发现配置
DNSSDConfigs []*dns.SDConfig `yaml:"dns_sd_configs,omitempty"`
// 配置文件服务发现配置
FileSDConfigs []*file.SDConfig `yaml:"file_sd_configs,omitempty"`
// Consul服务发现配置
ConsulSDConfigs []*consul.SDConfig `yaml:"consul_sd_configs,omitempty"`
// zookeeper Serverset 服务发现配置
ServersetSDConfigs []*zookeeper.ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"`
// zookeeper Nerve 服务发现配置
NerveSDConfigs []*zookeeper.NerveSDConfig `yaml:"nerve_sd_configs,omitempty"`
// 根据Marathon API 服务发现配置
MarathonSDConfigs []*marathon.SDConfig `yaml:"marathon_sd_configs,omitempty"`
// 根据Kubernetes API 服务发现配置
KubernetesSDConfigs []*kubernetes.SDConfig `yaml:"kubernetes_sd_configs,omitempty"`
// GCE 服务发现配置
GCESDConfigs []*gce.SDConfig `yaml:"gce_sd_configs,omitempty"`
// EC2服务发现配置
EC2SDConfigs []*ec2.SDConfig `yaml:"ec2_sd_configs,omitempty"`
// Openstack 服务发现配置
OpenstackSDConfigs []*openstack.SDConfig `yaml:"openstack_sd_configs,omitempty"`
// Azure 服务发现配置
AzureSDConfigs []*azure.SDConfig `yaml:"azure_sd_configs,omitempty"`
// Triton 服务发现配置
TritonSDConfigs []*triton.SDConfig `yaml:"triton_sd_configs,omitempty"`
}

​ 在Prometheus初始化过程中,通过执行discoveryManagerScrape.ApplyConfig进行服务发现相关配置的加载。

​ 移除目前正在运行的providers,根据新的provided 配置,启动新的providers。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
// 加锁
m.mtx.Lock()
// 函数结束后 解锁
defer m.mtx.Unlock()
// 遍历已存在target
for pk := range m.targets {
if _, ok := cfg[pk.setName]; !ok {
// 删除标签
discoveredTargets.DeleteLabelValues(m.name, pk.setName)
}
}
// 取消所有Discoverer
m.cancelDiscoverers()
for name, scfg := range cfg {
// 根据scfg,注册服务发现实例
m.registerProviders(scfg, name)
// 设置标签
discoveredTargets.WithLabelValues(m.name, name).Set(0)
}
for _, prov := range m.providers {
// 启动服务发现实例
m.startProvider(m.ctx, prov)
}

return nil
}

2.2.1、注册Providers

其中m.registerProviders的主要作用就是根据cfg(配置)注册所有provider实例,保存在m.providers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string) {
// 标签
var added bool
// 加载Providers的add方法
add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {
// 读取cfg类型
t := reflect.TypeOf(cfg).String()
for _, p := range m.providers {
// 检查该cfg是否加载过
if reflect.DeepEqual(cfg, p.config) {
// 如果加载过,记录该job
p.subs = append(p.subs, setName)
// 变更标签状态
added = true
// 跳出
return
}
}
// 创建一个Discoverer实例
d, err := newDiscoverer()
if err != nil {
level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", t)
failedConfigs.WithLabelValues(m.name).Inc()
return
}
// 创建一个provider
provider := provider{
// 生成provider名称
name: fmt.Sprintf("%s/%d", t, len(m.providers)),
// 关联对应的Discoverer实例(比如DNS、zk等)
d: d,
// 关联配置
config: cfg,
// 关联job
subs: []string{setName},
}
// 添加该provider到m.providers队列中
m.providers = append(m.providers, &provider)
// 更新标签
added = true
}
// 遍历DNS配置,生成该Discoverer
for _, c := range cfg.DNSSDConfigs {
add(c, func() (Discoverer, error) {
return dns.NewDiscovery(*c, log.With(m.logger, "discovery", "dns")), nil
})
}
.
.
.
.
.
.
// 类似配置遍历省略,感兴趣可以阅读源码查看
}

2.2.2、启动Provider

在ApplyConfig,执行m.startProvider(m.ctx, prov)启动provider。

1
2
3
4
5
6
7
8
9
10
11
12
func (m *Manager) startProvider(ctx context.Context, p *provider) {
level.Info(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)
// 记录发现的服务
updates := make(chan []*targetgroup.Group)
// 添加取消方法
m.discoverCancel = append(m.discoverCancel, cancel)
// 执行run 每个服务发现都有自己的run方法。
go p.d.Run(ctx, updates)
// 更新发现的服务
go m.updater(ctx, p, updates)
}

这里分析DNS 服务发现对应的Run方法。需要标注下,DNS对应的Discovery其实是refresh中的Discovery的Run实现。

1
2
3
4
5
6
d.Discovery = refresh.NewDiscovery(
logger,
"dns",
time.Duration(conf.RefreshInterval),
d.refresh,
)

Run实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// 首次进入,执行更新
tgs, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
} else {
select {
case ch <- tgs:
case <-ctx.Done():
return
}
}
// 创建定时器
ticker := time.NewTicker(d.interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// 定时执行更新,如果发现变化,通过ch发出更新信息
tgs, err := d.refresh(ctx)
if err != nil {
level.Error(d.logger).Log("msg", "Unable to refresh target groups", "err", err.Error())
continue
}

select {
// 发送 变化的targets
case ch <- tgs:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}

2.2.3、更新服务

当服务发现变化的targets时,通过updates chan进行更新。最终更新Discovery Manager的targets。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
for {

select {
case <-ctx.Done():
return
// 接收updates数据
case tgs, ok := <-updates:
receivedUpdates.WithLabelValues(m.name).Inc()
if !ok {
level.Debug(m.logger).Log("msg", "discoverer channel closed", "provider", p.name)
return
}
// 更新targets
for _, s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}

select {
// 发送更新通知
case m.triggerSend <- struct{}{}:
default:
}
}
}
}

2.3、启动discovery manager

加载完配置,并且完成注册、启动、更新操作后,开始执行discoveryManagerScrape.Run方法。

1
2
3
4
5
6
7
8
9
func (m *Manager) Run() error {
// 后台处理
go m.sender()
for range m.ctx.Done() {
m.cancelDiscoverers()
return m.ctx.Err()
}
return nil
}

定时执行,当接收到服务发现的更新通知,通过m.allGroups()同步服务快照信息到scrapeManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *Manager) sender() {
// 创建定时器
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()

for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
select {
// 检测到更新
case <-m.triggerSend:
sentUpdates.WithLabelValues(m.name).Inc()
select {
// 通过allGroups同步服务快照信息到scrapeManager
case m.syncCh <- m.allGroups():
default:
delayedUpdates.WithLabelValues(m.name).Inc()
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
default:
}
}
default:
}
}
}
}

2.4、关联ScrapeManager

2.4.1、关联

在ScrapeManager在启动的时候会关联discoveryManagerScrape.SyncCh()。

1
2
3
4
5
6
7
func() error {
<-reloadReady.C
// 关联 discoveryManagerScrape 的 syncCh
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},

2.4.2、更新

更新ScrapeManager的targets

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
// 收到更新targets
case ts := <-tsets:
m.updateTsets(ts)

select {
case m.triggerReload <- struct{}{}:
default:
}

case <-m.graceShut:
return nil
}
}
}

执行更新

1
2
3
4
5
6
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
// 替换新的 tagets
m.targetSets = tsets
m.mtxScrape.Unlock()
}

2.5、总结

​ discoveryManager在加载配置的时候,顺便完成provider的注册、启动、以及discovery的自更新通知操作。discoveryManager与ScrapeManager通过discoveryManager的syncCh通道来关联同步。

​ 整个服务发现的流程很值得学习,尤其是discoveryManager支持多种服务发现的扩展配置的相关设设计很值得学习。