一、Prometheus源码分析之:数据采集

第一次接触Prometheus就被深深吸引,决定阅读源码,了解它的设计方式。首先分析的是pull metrics部分,讨论Prometheus是如何从目标点采集数据的。

1.1、简介

​ Prometheus采集数据使用pull模式,通过HTTP协议去采集指标,只要应用系统能够提供HTTP接口就可以接入监控系统。

​ 拉取目标称之为scrape,一个scrape一般对应一个进程。如下为scrape相关的配置。

配置文件:

1
2
3
4
5
6
7
8
scrape_interval:     15s
scrape_configs:
- job_name: 'test_server_name'
static_configs:
- targets: ['localhost:8886']
labels:
project: 'test_server'
environment: 'test'

该配置描述:每15秒去拉取一次上报数据,拉取目标为localhost:8886。

1.2、源码分析

通过解析Prometheus采集目标数据源码,进一步了解Prometheus是如何完成pull metrics工作的。

1.2.1、读取配置

ScrapeConfig的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type ScrapeConfig struct {
// 作业名称
JobName string `yaml:"job_name"`
// 同名lable,是否覆盖处理
HonorLabels bool `yaml:"honor_labels,omitempty"`
HonorTimestamps bool `yaml:"honor_timestamps"`
// 采集目标url参数
Params url.Values `yaml:"params,omitempty"`
// 采集周期
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// 采集超时时间
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// 目标 URl path
MetricsPath string `yaml:"metrics_path,omitempty"`
Scheme string `yaml:"scheme,omitempty"`
SampleLimit uint `yaml:"sample_limit,omitempty"`
// 服务发现配置
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
// 客户端http client 配置
HTTPClientConfig config_util.HTTPClientConfig `yaml:",inline"`
// 目标重置规则
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// 指标重置规则
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"`
}

读取配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
// 初始化map结构,用于保存配置
c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs {
// 配置读取维度
c[scfg.JobName] = scfg
}
m.scrapeConfigs = c
// 设置 所有时间序列和警告与外部通信时用的外部标签 external_labels
if err := m.setJitterSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}

// 如果配置已经更改,清理历史配置,重新加载到池子中
var failed bool
for name, sp := range m.scrapePools {
// 如果当前job不存在,则删除
if cfg, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
} else if !reflect.DeepEqual(sp.config, cfg) {
// 如果配置变更,重新启动reload,进行加载
err := sp.reload(cfg)
if err != nil {
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
failed = true
}
}
}
// 失败 return
if failed {
return errors.New("failed to apply the new configuration")
}
return nil
}

Prometheus 中,将任意一个独立的数据源(target)称之为实例(instance)。包含相同类型的实例的集合称之为作业(job),从读取配置中,我们也能看到,以job为key。所以注意job在业务侧的使用。

1.2.2、Scrape Manager

​ 添加Scrape Manager 到 run.Group启动。reloadReady.C的作用是当Manager接收到一组数据采集目标(target)的时候,他需要为每个job读取有效的配置。因此这里等待所有配置加载完成,进行下一步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
g.Add(
func() error {
// 当所有配置都准备好
<-reloadReady.C
// 启动scrapeManager
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// 失败处理
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)

1.2.3、加载Targets

加载targets,如果targets更新,会触发重新加载,reloader的加载发生在后台,所以并不会影响target的更新,(配置文件中配置的target是依赖discoveryManagerScrape.ApplyConfig(c)进行加载的,后面分析target服务发现的时候详细分析)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
// 触发重新加载目标。添加新增
case ts := <-tsets:
m.updateTsets(ts)

select {
// 关闭 Scrape Manager 处理信号
case m.triggerReload <- struct{}{}:
default:
}

case <-m.graceShut:
return nil
}
}
}

顺着Run继续阅读,reload为每一组tatget生成一个对应的scrape pool管理targets集合,scrapePool结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type scrapePool struct {
appendable Appendable
logger log.Logger
// 读写锁
mtx sync.RWMutex
// Scrape 配置
config *config.ScrapeConfig
// http client
client *http.Client

// 正在运行的target
activeTargets map[uint64]*Target
// 无效的target
droppedTargets []*Target
// 所有运行的loop
loops map[uint64]loop
// 取消
cancel context.CancelFunc
// 创建loop
newLoop func(scrapeLoopOptions) loop
}

1.2.4、执行reload

m.reloade的流程也很简单,setName指我们配置中的job,如果scrapePools不存在该job,则添加,添加前也是先校验该job的配置是否存在,不存在则报错,创建scrape pool。总结看就是为每个job创建与之对应的scrape pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (m *Manager) reload() {
//加锁
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
//检查该scrape是否存在scrapePools,不存在则创建
if _, ok := m.scrapePools[setName]; !ok {
//读取该scrape的配置
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
// 未读取到该scrape的配置打印错误
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
// 跳出
continue
}
// 创建该scrape的scrape pool
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
// 保存
m.scrapePools[setName] = sp
}

wg.Add(1)
// 并行运行,提升性能。
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(m.scrapePools[setName], groups)

}
// 释放锁
m.mtxScrape.Unlock()
// 阻塞,等待所有pool运行完毕
wg.Wait()
}

1.2.5、创建scrape pool

scrape pool利用newLoop去为该job下的所有target生成对应的loop:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
targetScrapePools.Inc()
if logger == nil {
logger = log.NewNopLogger()
}
// 创建http client,用于执行数据抓取
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
if err != nil {
targetScrapePoolsFailed.Inc()
return nil, errors.Wrap(err, "error creating HTTP client")
}
// 设置buffers
buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
// 设置scrapePool的一些基础属性
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
cancel: cancel,
appendable: app,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
// newLoop用于生层loop,主要处理对应的target,可以理解为,每个target对应一个loop。
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
cache := newScrapeCache()
opts.target.setMetadataStore(cache)

return newScrapeLoop(
ctx,
opts.scraper,
log.With(logger, "target", opts.target),
buffers,
func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
},
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func() storage.Appender {
app, err := app.Appender()
if err != nil {
panic(err)
}
return appender(app, opts.limit)
},
cache,
jitterSeed,
opts.honorTimestamps,
)
}

return sp, nil
}

1.2.6、group转化为target

scrape pool创建完成后,则通过sp.Sync执行,使用该job对应的pool遍历Group,使其转换为target

1
2
3
4
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(m.scrapePools[setName], groups)

Sync函数解读如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
start := time.Now()

var all []*Target
// 加锁
sp.mtx.Lock()
sp.droppedTargets = []*Target{}
// 遍历所有Group
for _, tg := range tgs {
// 转化对应 targets
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
continue
}
// 将所有有效targets添加到all,等待处理
for _, t := range targets {
// 检查该target的lable是否有效
if t.Labels().Len() > 0 {
// 添加到all队列中
all = append(all, t)
} else if t.DiscoveredLabels().Len() > 0 {
// 记录无效target
sp.droppedTargets = append(sp.droppedTargets, t)
}
}
}
// 解锁
sp.mtx.Unlock()
// 处理all队列,执行scarape同步操作
sp.sync(all)

targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

1.2.7、生成loop

在sync最后,调用了当前scrape pool的sync去处理all队列中的target,添加新的target,删除失效的target。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (sp *scrapePool) sync(targets []*Target) {
// 加锁
sp.mtx.Lock()
defer sp.mtx.Unlock()

var (
// target 标记
uniqueTargets = map[uint64]struct{}{}
// 采集周期
interval = time.Duration(sp.config.ScrapeInterval)
// 采集超时时间
timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
// 重复lable是否覆盖
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)
// 遍历all队列中的所有target
for _, t := range targets {
// 赋值,避免range的坑
t := t
// 生成对应的hash(对该hash算法感兴趣可以看下这里的源码)
hash := t.hash()
// 标记
uniqueTargets[hash] = struct{}{}
// 判断该taget是否已经在运行了。如果没有则运行该target对应的loop,将该loop加入activeTargets中
if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
limit: limit,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})

sp.activeTargets[hash] = t
sp.loops[hash] = l
// 启动该loop
go l.run(interval, timeout, nil)
} else {
// 该target对应的loop已经运行,设置最新的标签信息
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
}
}

var wg sync.WaitGroup

// 停止并且移除无效的targets与对应的loops
// 遍历activeTargets正在执行的Target
for hash := range sp.activeTargets {
// 检查该hash对应的标记是否存在,放过不存在执行清除逻辑
if _, ok := uniqueTargets[hash]; !ok {
wg.Add(1)
// 异步清除
go func(l loop) {
// 停止该loop
l.stop()
// 执行完成
wg.Done()
}(sp.loops[hash])
// 从loops中删除该hash对应的loop
delete(sp.loops, hash)
// 从activeTargets中删除该hash对应的target
delete(sp.activeTargets, hash)
}
}
// 等待所有执行完成
wg.Wait()
}

1.2.8、运行loop

scrape pool对应的sync的实现中可以看到,如果该target没有运行,则启动该target对应的loop,执行l.run,通过一个goroutine来执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
// 偏移量相关设置
select {
case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)):
// Continue after a scraping offset.
case <-sl.scrapeCtx.Done():
close(sl.stopped)
return
}

var last time.Time
// 根据interval设置定时器
ticker := time.NewTicker(interval)
defer ticker.Stop()

mainLoop:
for {
select {
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.scrapeCtx.Done():
break mainLoop
default:
}

var (
start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)

// 记录第一次
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}
// 根据上次拉取数据的大小,设置buffer空间
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)
// 读取数据,设置到buffer中
contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
// 取消,结束scrape
cancel()

if scrapeErr == nil {
b = buf.Bytes()
if len(b) > 0 {
// 记录本次Scrape大小
sl.lastScrapeSize = len(b)
}
} else {
// 错误处理
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
}

// 生成数据,存储指标
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
if appErr != nil {
level.Warn(sl.l).Log("msg", "append failed", "err", appErr)

if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
level.Warn(sl.l).Log("msg", "append failed", "err", err)
}
}
// 对象复用
sl.buffers.Put(b)

if scrapeErr == nil {
scrapeErr = appErr
}
// 上报指标,进行统计
if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err)
}
// 重置时间位置
last = start

select {
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.scrapeCtx.Done():
break mainLoop
case <-ticker.C:
}
}

close(sl.stopped)

sl.endOfRunStaleness(last, ticker, interval)
}

1.2.9、拉取数据

依赖scrape实现数据的抓取,使用GET方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
if s.req == nil {
// 新建Http Request
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return "", err
}
// 设置请求头
req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", userAgentHeader)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))

s.req = req
}
// 发起请求
resp, err := s.client.Do(s.req.WithContext(ctx))
if err != nil {
return "", err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
// 错误处理
if resp.StatusCode != http.StatusOK {
return "", errors.Errorf("server returned HTTP status %s", resp.Status)
}
// 检查Content-Encoding
if resp.Header.Get("Content-Encoding") != "gzip" {
// copy buffer到w
_, err = io.Copy(w, resp.Body)
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}
if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return "", err
}
} else {
s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil {
return "", err
}
}

_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

1.3、总结

​ 总结看每一个job有一个与之对应的scrape pool,每一个target有一个与之对应的loop,每个loop内部执 Http Get请求拉取数据。通过一些控制参数,控制采集周期以及结束等逻辑。