三、Prometheus源码分析之:数据存储

3.1、简介

​ 之前的文章分别介绍了prometheuss是如何采集数据以及如何对采集目标做服务发现的,本篇文章分析prometheus是如何将采集后的数据做保存的。

一、Prometheus源码分析之:数据采集) 描述了通过scrapeLoop的run方法采集指标数据。之后scrapeLoop调用append进行指标的保存。

3.2、scrapeCache

​ 真正存储指标的是storage.Appender,在scrape与storage之间有一层缓存。缓存主要的作用是过滤错误的指标。

1
2
3
4
5
6
7
8
9
10
11
type scrapeCache struct {
iter uint64 // scrape批次
successfulCount int // 成功保存的元数据数
series map[string]*cacheEntry // 缓存解析的相关数据
droppedSeries map[string]*uint64 // 缓存无效指标
seriesCur map[uint64]labels.Labels // 本次采集指标
seriesPrev map[uint64]labels.Labels // 上次采集指标

metaMtx sync.Mutex
metadata map[string]*metaEntry
}

创建scrapeCache,调用newScrapeLoop,初始化scrapeLoop,会判断scrapeCache是否为空,如果为nil,调用newScrapeCache对cache进行初始化。

1
2
3
if cache == nil {
cache = newScrapeCache()
}

newScrapeCache()如下:

1
2
3
4
5
6
7
8
9
func newScrapeCache() *scrapeCache {
return &scrapeCache{
series: map[string]*cacheEntry{},
droppedSeries: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
metadata: map[string]*metaEntry{},
}
}

scrapeCache 方法介绍,这里简介各个fun的作用,详细代码不做注解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 根据met信息,获取对应的cacheEntry
func (c *scrapeCache) get(met string) (*cacheEntry, bool)
// 根据met创建cacheEntry节点
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64)
// 添加无效指标,met作为key
func (c *scrapeCache) addDropped(met string)
// 根据met,检查该指标是否有效
func (c *scrapeCache) getDropped(met string) bool
// 添加当前采集指标
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels)
// 检查指标状态
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool)
// 缓存清理
func (c *scrapeCache) iterDone(flushCache bool)

3.3、append

分析scrapeLoop.append是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
func (sl *scrapeLoop)  append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
var (
// 获取指标存储组件
app = sl.appender()
// 获取解析组件
p = textparse.New(b, contentType)
defTime = timestamp.FromTime(ts)
numOutOfOrder = 0
numDuplicates = 0
numOutOfBounds = 0
)
var sampleLimitErr error

loop:
for {
var et textparse.Entry
// 开始遍历,遍历到EOF(字节流尾部),终止遍历
if et, err = p.Next(); err != nil {
if err == io.EOF {
err = nil
}
break
}
// 以下Entry类型跳过
switch et {
case textparse.EntryType:
sl.cache.setType(p.Type())
continue
case textparse.EntryHelp:
sl.cache.setHelp(p.Help())
continue
case textparse.EntryUnit:
sl.cache.setUnit(p.Unit())
continue
case textparse.EntryComment:
continue
default:
}
total++

t := defTime
// 获取指标label,时间戳(如果设置了),当前样本值
met, tp, v := p.Series()
// 如果设置了honorTimestamps,时间戳设置为nil
if !sl.honorTimestamps {
tp = nil
}
// 如果时间戳不为空,更新当前t
if tp != nil {
t = *tp
}
// 检查该指标值是否有效,无效则直接跳过当前处理
if sl.cache.getDropped(yoloString(met)) {
continue
}
// 根据当前met获取对应的cacheEntry结构
ce, ok := sl.cache.get(yoloString(met))
// 如果从缓存中获取,则执行指标的存储操作
if ok {
// 指标存储
switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
case nil:
// 如果不带时间戳
if tp == nil {
// 存储该不带时间戳的指标到seriesCur中。
sl.cache.trackStaleness(ce.hash, ce.lset)
}
// 未找到错误,重置ok为false,执行!ok逻辑
case storage.ErrNotFound:
ok = false
// 乱序样本
case storage.ErrOutOfOrderSample:
// 乱序样本错误记录,并上报
numOutOfOrder++
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
targetScrapeSampleOutOfOrder.Inc()
continue
// 重复样本
case storage.ErrDuplicateSampleForTimestamp:
// 重复样本错误记录,并上报
numDuplicates++
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
targetScrapeSampleDuplicate.Inc()
continue
// 存储越界
case storage.ErrOutOfBounds:
// 存储越界错误记录,并上报
numOutOfBounds++
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
continue
// 超出样本限制错误
case errSampleLimit:
// 如果我们达到上限也要继续解析输出,所以我们要上报正确的样本总量
sampleLimitErr = err
added++
continue
// 未知情况,终止loop
default:
break loop
}
}
// 在缓存中未查找到,
if !ok {
var lset labels.Labels
// 生成mets
mets := p.Metric(&lset)
// 生成hash值
hash := lset.Hash()

// 根据配置重置label set
lset = sl.sampleMutator(lset)

// 如果label set为空,则表明该mets为非法指标
if lset == nil {
// 添加mets到无效指标字典中
sl.cache.addDropped(mets)
continue
}

var ref uint64
// 存储指标
ref, err = app.Add(lset, t, v)

// 错误处理同上,不重复描述
switch err {
case nil:
// 乱序样本
case storage.ErrOutOfOrderSample:
err = nil
numOutOfOrder++
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
targetScrapeSampleOutOfOrder.Inc()
continue
// 重复样本
case storage.ErrDuplicateSampleForTimestamp:
err = nil
numDuplicates++
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
targetScrapeSampleDuplicate.Inc()
continue
// 存储越界
case storage.ErrOutOfBounds:
err = nil
numOutOfBounds++
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
targetScrapeSampleOutOfBounds.Inc()
continue
// 样本限制
case errSampleLimit:
sampleLimitErr = err
added++
continue
default:
level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
break loop
}
if tp == nil {
// 存储该不带时间戳的指标到seriesCur中。
sl.cache.trackStaleness(hash, lset)
}
// 缓存该指标到series中
sl.cache.addRef(mets, ref, lset, hash)
seriesAdded++
}
added++
}
// 错误相关处理,不做分析。
if sampleLimitErr != nil {
if err == nil {
err = sampleLimitErr
}
targetScrapeSampleLimit.Inc()
}
if numOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)
}
if numDuplicates > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)
}
if numOutOfBounds > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)
}
if err == nil {
// 指标状态检查。
sl.cache.forEachStale(func(lset labels.Labels) bool {
// 标记存储中的过期指标
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
// 以下错误不做处理
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
err = nil
}
return err == nil
})
}
if err != nil {
// 出现错误,存储组件进行回滚
app.Rollback()
return total, added, seriesAdded, err
}
// 存储提交
if err := app.Commit(); err != nil {
return total, added, seriesAdded, err
}

// 执行缓存清理相关工作
sl.cache.iterDone(len(b) > 0)

return total, added, seriesAdded, nil
}

3.4、总结

整个存储逻辑都围绕着过滤无效指标进行。特殊点在于存储的时候指标分为有时间戳与无时间戳两种情况。

(1)有时间戳:

  • 解析指标数据通过Series()
  • 利用getDropped判断指标是否有效,无效则跳出处理
  • 通过get查找对应cacheEntry,如果找到利用app.AddFast直接存储样本值。如果未找到,使用sampleMutator进行解析重置,判断lset是否为空,为空则使用addDropped添加到无效字典中,跳出当前处理,如果有效则使用app.Add存储指标。(可以看到,通过get找到使用AddFast存储,未找到使用Add存储,感兴趣可以看下两个fun实现的区别)
  • 通过forEachStale检查指标是否过期。
  • app.Add标记过期指标
  • 调用iterDone进行相关缓存清理。

(2)无时间戳:

  • 每次存储后,如果不带时间戳都会调用trackStaleness,存储指标到seriesCur中

这里seriesCur与seriesPrev的作用就是处理指标label是否过期的。forEachStale实现如下:

1
2
3
4
5
6
7
8
9
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
for h, lset := range c.seriesPrev {
if _, ok := c.seriesCur[h]; !ok {
if !f(lset) {
break
}
}
}
}

如果seriesPrev中的指标(label)存在于seriesPrev,则不处理,如果不存在,则说明过期。其中在iterDone中。

1
2
3
4
5
6
7
// 交换seriesPrev与seriesCur
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev

// 清空当前指标缓存列表
for k := range c.seriesCur {
delete(c.seriesCur, k)
}

所以,每次存储处理后,都会交换seriesPrev与seriesCur,然后清空seriesCur。下次存储在做比较。如果命中过期规则,则标记该样本值为StaleNaN。