mirror of
https://github.com/prometheus/prometheus
synced 2026-04-30 14:50:25 +08:00
Merge pull request #16166 from pr00se/track-missing-series
TSDB: Track count of unknown series referenced during WAL/WBL replay
This commit is contained in:
12
tsdb/head.go
12
tsdb/head.go
@@ -379,6 +379,8 @@ type headMetrics struct {
|
||||
snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
|
||||
oooHistogram prometheus.Histogram
|
||||
mmapChunksTotal prometheus.Counter
|
||||
walReplayUnknownRefsTotal *prometheus.CounterVec
|
||||
wblReplayUnknownRefsTotal *prometheus.CounterVec
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -510,6 +512,14 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||||
Name: "prometheus_tsdb_mmap_chunks_total",
|
||||
Help: "Total number of chunks that were memory-mapped.",
|
||||
}),
|
||||
walReplayUnknownRefsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "prometheus_tsdb_wal_replay_unknown_refs_total",
|
||||
Help: "Total number of unknown series references encountered during WAL replay.",
|
||||
}, []string{"type"}),
|
||||
wblReplayUnknownRefsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "prometheus_tsdb_wbl_replay_unknown_refs_total",
|
||||
Help: "Total number of unknown series references encountered during WBL replay.",
|
||||
}, []string{"type"}),
|
||||
}
|
||||
|
||||
if r != nil {
|
||||
@@ -577,6 +587,8 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
||||
}
|
||||
return float64(val)
|
||||
}),
|
||||
m.walReplayUnknownRefsTotal,
|
||||
m.wblReplayUnknownRefsTotal,
|
||||
)
|
||||
}
|
||||
return m
|
||||
|
||||
@@ -284,159 +284,176 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
exemplarsPercentages := []float64{0, 0.5, 1, 5}
|
||||
lastExemplarsPerSeries := -1
|
||||
for _, c := range cases {
|
||||
for _, p := range exemplarsPercentages {
|
||||
exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100))
|
||||
// For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries
|
||||
// multiple times without this check.
|
||||
if exemplarsPerSeries == lastExemplarsPerSeries {
|
||||
continue
|
||||
}
|
||||
lastExemplarsPerSeries = exemplarsPerSeries
|
||||
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax),
|
||||
func(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
missingSeriesPercentages := []float64{0, 0.1}
|
||||
for _, missingSeriesPct := range missingSeriesPercentages {
|
||||
for _, p := range exemplarsPercentages {
|
||||
exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100))
|
||||
// For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries
|
||||
// multiple times without this check.
|
||||
if exemplarsPerSeries == lastExemplarsPerSeries {
|
||||
continue
|
||||
}
|
||||
lastExemplarsPerSeries = exemplarsPerSeries
|
||||
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d,missingSeriesPct=%.3f", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct),
|
||||
func(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
|
||||
wal, err := wlog.New(nil, nil, dir, compression.None)
|
||||
require.NoError(b, err)
|
||||
var wbl *wlog.WL
|
||||
if c.oooSeriesPct != 0 {
|
||||
wbl, err = wlog.New(nil, nil, dir, compression.None)
|
||||
wal, err := wlog.New(nil, nil, dir, compression.None)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
|
||||
// Write series.
|
||||
refSeries := make([]record.RefSeries, 0, c.seriesPerBatch)
|
||||
for k := 0; k < c.batches; k++ {
|
||||
refSeries = refSeries[:0]
|
||||
for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ {
|
||||
lbls := make(map[string]string, labelsPerSeries)
|
||||
lbls[defaultLabelName] = strconv.Itoa(i)
|
||||
for j := 1; len(lbls) < labelsPerSeries; j++ {
|
||||
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
|
||||
}
|
||||
refSeries = append(refSeries, record.RefSeries{Ref: chunks.HeadSeriesRef(i) * 101, Labels: labels.FromMap(lbls)})
|
||||
var wbl *wlog.WL
|
||||
if c.oooSeriesPct != 0 {
|
||||
wbl, err = wlog.New(nil, nil, dir, compression.None)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
populateTestWL(b, wal, []interface{}{refSeries})
|
||||
}
|
||||
|
||||
// Write samples.
|
||||
refSamples := make([]record.RefSample, 0, c.seriesPerBatch)
|
||||
|
||||
oooSeriesPerBatch := int(float64(c.seriesPerBatch) * c.oooSeriesPct)
|
||||
oooSamplesPerSeries := int(float64(c.samplesPerSeries) * c.oooSamplesPct)
|
||||
|
||||
for i := 0; i < c.samplesPerSeries; i++ {
|
||||
for j := 0; j < c.batches; j++ {
|
||||
refSamples = refSamples[:0]
|
||||
|
||||
k := j * c.seriesPerBatch
|
||||
// Skip appending the first oooSamplesPerSeries samples for the series in the batch that
|
||||
// should have OOO samples. OOO samples are appended after all the in-order samples.
|
||||
if i < oooSamplesPerSeries {
|
||||
k += oooSeriesPerBatch
|
||||
}
|
||||
for ; k < (j+1)*c.seriesPerBatch; k++ {
|
||||
refSamples = append(refSamples, record.RefSample{
|
||||
Ref: chunks.HeadSeriesRef(k) * 101,
|
||||
T: int64(i) * 10,
|
||||
V: float64(i) * 100,
|
||||
})
|
||||
}
|
||||
populateTestWL(b, wal, []interface{}{refSamples})
|
||||
}
|
||||
}
|
||||
|
||||
// Write mmapped chunks.
|
||||
if c.mmappedChunkT != 0 {
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
require.NoError(b, err)
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: c.mmappedChunkT,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
||||
// Create one mmapped chunk per series, with one sample at the given time.
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled)
|
||||
s.append(c.mmappedChunkT, 42, 0, cOpts)
|
||||
// There's only one head chunk because only a single sample is appended. mmapChunks()
|
||||
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
|
||||
// the sample at c.mmappedChunkT is mmapped.
|
||||
s.cutNewHeadChunk(c.mmappedChunkT, chunkenc.EncXOR, c.mmappedChunkT)
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
}
|
||||
require.NoError(b, chunkDiskMapper.Close())
|
||||
}
|
||||
|
||||
// Write exemplars.
|
||||
refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch)
|
||||
for i := 0; i < exemplarsPerSeries; i++ {
|
||||
for j := 0; j < c.batches; j++ {
|
||||
refExemplars = refExemplars[:0]
|
||||
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
|
||||
refExemplars = append(refExemplars, record.RefExemplar{
|
||||
Ref: chunks.HeadSeriesRef(k) * 101,
|
||||
T: int64(i) * 10,
|
||||
V: float64(i) * 100,
|
||||
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
|
||||
})
|
||||
}
|
||||
populateTestWL(b, wal, []interface{}{refExemplars})
|
||||
}
|
||||
}
|
||||
|
||||
// Write OOO samples and mmap markers.
|
||||
refMarkers := make([]record.RefMmapMarker, 0, oooSeriesPerBatch)
|
||||
refSamples = make([]record.RefSample, 0, oooSeriesPerBatch)
|
||||
for i := 0; i < oooSamplesPerSeries; i++ {
|
||||
shouldAddMarkers := c.oooCapMax != 0 && i != 0 && int64(i)%c.oooCapMax == 0
|
||||
|
||||
for j := 0; j < c.batches; j++ {
|
||||
refSamples = refSamples[:0]
|
||||
if shouldAddMarkers {
|
||||
refMarkers = refMarkers[:0]
|
||||
}
|
||||
for k := j * c.seriesPerBatch; k < (j*c.seriesPerBatch)+oooSeriesPerBatch; k++ {
|
||||
ref := chunks.HeadSeriesRef(k) * 101
|
||||
if shouldAddMarkers {
|
||||
// loadWBL() checks that the marker's MmapRef is less than or equal to the ref
|
||||
// for the last mmap chunk. Setting MmapRef to 0 to always pass that check.
|
||||
refMarkers = append(refMarkers, record.RefMmapMarker{Ref: ref, MmapRef: 0})
|
||||
// Write series.
|
||||
refSeries := make([]record.RefSeries, 0, c.seriesPerBatch)
|
||||
for k := 0; k < c.batches; k++ {
|
||||
refSeries = refSeries[:0]
|
||||
for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ {
|
||||
lbls := make(map[string]string, labelsPerSeries)
|
||||
lbls[defaultLabelName] = strconv.Itoa(i)
|
||||
for j := 1; len(lbls) < labelsPerSeries; j++ {
|
||||
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
|
||||
}
|
||||
refSamples = append(refSamples, record.RefSample{
|
||||
Ref: ref,
|
||||
T: int64(i) * 10,
|
||||
V: float64(i) * 100,
|
||||
})
|
||||
refSeries = append(refSeries, record.RefSeries{Ref: chunks.HeadSeriesRef(i) * 101, Labels: labels.FromMap(lbls)})
|
||||
}
|
||||
if shouldAddMarkers {
|
||||
populateTestWL(b, wbl, []interface{}{refMarkers})
|
||||
|
||||
writeSeries := refSeries
|
||||
if missingSeriesPct > 0 {
|
||||
newWriteSeries := make([]record.RefSeries, 0, int(float64(len(refSeries))*(1.0-missingSeriesPct)))
|
||||
keepRatio := 1.0 - missingSeriesPct
|
||||
// Keep approximately every 1/keepRatio series.
|
||||
for i, s := range refSeries {
|
||||
if int(float64(i)*keepRatio) != int(float64(i+1)*keepRatio) {
|
||||
newWriteSeries = append(newWriteSeries, s)
|
||||
}
|
||||
}
|
||||
writeSeries = newWriteSeries
|
||||
}
|
||||
populateTestWL(b, wal, []interface{}{refSamples})
|
||||
populateTestWL(b, wbl, []interface{}{refSamples})
|
||||
}
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
// Load the WAL.
|
||||
for i := 0; i < b.N; i++ {
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
opts.ChunkDirRoot = dir
|
||||
if c.oooCapMax > 0 {
|
||||
opts.OutOfOrderCapMax.Store(c.oooCapMax)
|
||||
populateTestWL(b, wal, []interface{}{writeSeries})
|
||||
}
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(b, err)
|
||||
h.Init(0)
|
||||
}
|
||||
b.StopTimer()
|
||||
wal.Close()
|
||||
if wbl != nil {
|
||||
wbl.Close()
|
||||
}
|
||||
})
|
||||
|
||||
// Write samples.
|
||||
refSamples := make([]record.RefSample, 0, c.seriesPerBatch)
|
||||
|
||||
oooSeriesPerBatch := int(float64(c.seriesPerBatch) * c.oooSeriesPct)
|
||||
oooSamplesPerSeries := int(float64(c.samplesPerSeries) * c.oooSamplesPct)
|
||||
|
||||
for i := 0; i < c.samplesPerSeries; i++ {
|
||||
for j := 0; j < c.batches; j++ {
|
||||
refSamples = refSamples[:0]
|
||||
|
||||
k := j * c.seriesPerBatch
|
||||
// Skip appending the first oooSamplesPerSeries samples for the series in the batch that
|
||||
// should have OOO samples. OOO samples are appended after all the in-order samples.
|
||||
if i < oooSamplesPerSeries {
|
||||
k += oooSeriesPerBatch
|
||||
}
|
||||
for ; k < (j+1)*c.seriesPerBatch; k++ {
|
||||
refSamples = append(refSamples, record.RefSample{
|
||||
Ref: chunks.HeadSeriesRef(k) * 101,
|
||||
T: int64(i) * 10,
|
||||
V: float64(i) * 100,
|
||||
})
|
||||
}
|
||||
populateTestWL(b, wal, []interface{}{refSamples})
|
||||
}
|
||||
}
|
||||
|
||||
// Write mmapped chunks.
|
||||
if c.mmappedChunkT != 0 {
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
require.NoError(b, err)
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: c.mmappedChunkT,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
||||
// Create one mmapped chunk per series, with one sample at the given time.
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled)
|
||||
s.append(c.mmappedChunkT, 42, 0, cOpts)
|
||||
// There's only one head chunk because only a single sample is appended. mmapChunks()
|
||||
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
|
||||
// the sample at c.mmappedChunkT is mmapped.
|
||||
s.cutNewHeadChunk(c.mmappedChunkT, chunkenc.EncXOR, c.mmappedChunkT)
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
}
|
||||
require.NoError(b, chunkDiskMapper.Close())
|
||||
}
|
||||
|
||||
// Write exemplars.
|
||||
refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch)
|
||||
for i := 0; i < exemplarsPerSeries; i++ {
|
||||
for j := 0; j < c.batches; j++ {
|
||||
refExemplars = refExemplars[:0]
|
||||
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
|
||||
refExemplars = append(refExemplars, record.RefExemplar{
|
||||
Ref: chunks.HeadSeriesRef(k) * 101,
|
||||
T: int64(i) * 10,
|
||||
V: float64(i) * 100,
|
||||
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
|
||||
})
|
||||
}
|
||||
populateTestWL(b, wal, []interface{}{refExemplars})
|
||||
}
|
||||
}
|
||||
|
||||
// Write OOO samples and mmap markers.
|
||||
refMarkers := make([]record.RefMmapMarker, 0, oooSeriesPerBatch)
|
||||
refSamples = make([]record.RefSample, 0, oooSeriesPerBatch)
|
||||
for i := 0; i < oooSamplesPerSeries; i++ {
|
||||
shouldAddMarkers := c.oooCapMax != 0 && i != 0 && int64(i)%c.oooCapMax == 0
|
||||
|
||||
for j := 0; j < c.batches; j++ {
|
||||
refSamples = refSamples[:0]
|
||||
if shouldAddMarkers {
|
||||
refMarkers = refMarkers[:0]
|
||||
}
|
||||
for k := j * c.seriesPerBatch; k < (j*c.seriesPerBatch)+oooSeriesPerBatch; k++ {
|
||||
ref := chunks.HeadSeriesRef(k) * 101
|
||||
if shouldAddMarkers {
|
||||
// loadWBL() checks that the marker's MmapRef is less than or equal to the ref
|
||||
// for the last mmap chunk. Setting MmapRef to 0 to always pass that check.
|
||||
refMarkers = append(refMarkers, record.RefMmapMarker{Ref: ref, MmapRef: 0})
|
||||
}
|
||||
refSamples = append(refSamples, record.RefSample{
|
||||
Ref: ref,
|
||||
T: int64(i) * 10,
|
||||
V: float64(i) * 100,
|
||||
})
|
||||
}
|
||||
if shouldAddMarkers {
|
||||
populateTestWL(b, wbl, []interface{}{refMarkers})
|
||||
}
|
||||
populateTestWL(b, wal, []interface{}{refSamples})
|
||||
populateTestWL(b, wbl, []interface{}{refSamples})
|
||||
}
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
// Load the WAL.
|
||||
for i := 0; i < b.N; i++ {
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = 1000
|
||||
opts.ChunkDirRoot = dir
|
||||
if c.oooCapMax > 0 {
|
||||
opts.OutOfOrderCapMax.Store(c.oooCapMax)
|
||||
}
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(b, err)
|
||||
h.Init(0)
|
||||
}
|
||||
b.StopTimer()
|
||||
wal.Close()
|
||||
if wbl != nil {
|
||||
wbl.Close()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
112
tsdb/head_wal.go
112
tsdb/head_wal.go
@@ -16,6 +16,7 @@ package tsdb
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -26,6 +27,8 @@ import (
|
||||
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
@@ -50,13 +53,39 @@ type histogramRecord struct {
|
||||
fh *histogram.FloatHistogram
|
||||
}
|
||||
|
||||
type seriesRefSet struct {
|
||||
refs map[chunks.HeadSeriesRef]struct{}
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func (s *seriesRefSet) merge(other map[chunks.HeadSeriesRef]struct{}) {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
maps.Copy(s.refs, other)
|
||||
}
|
||||
|
||||
func (s *seriesRefSet) count() int {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
return len(s.refs)
|
||||
}
|
||||
|
||||
func counterAddNonZero(v *prometheus.CounterVec, value float64, lvs ...string) {
|
||||
if value > 0 {
|
||||
v.WithLabelValues(lvs...).Add(value)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) {
|
||||
// Track number of samples that referenced a series we don't know about
|
||||
// Track number of missing series records that were referenced by other records.
|
||||
unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}}
|
||||
// Track number of different records that referenced a series we don't know about
|
||||
// for error reporting.
|
||||
var unknownRefs atomic.Uint64
|
||||
var unknownSampleRefs atomic.Uint64
|
||||
var unknownExemplarRefs atomic.Uint64
|
||||
var unknownHistogramRefs atomic.Uint64
|
||||
var unknownMetadataRefs atomic.Uint64
|
||||
var unknownTombstoneRefs atomic.Uint64
|
||||
// Track number of series records that had overlapping m-map chunks.
|
||||
var mmapOverlappingChunks atomic.Uint64
|
||||
|
||||
@@ -91,8 +120,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
processors[i].setup()
|
||||
|
||||
go func(wp *walSubsetProcessor) {
|
||||
unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
|
||||
unknownRefs.Add(unknown)
|
||||
missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
|
||||
unknownSeriesRefs.merge(missingSeries)
|
||||
unknownSampleRefs.Add(unknownSamples)
|
||||
mmapOverlappingChunks.Add(overlapping)
|
||||
unknownHistogramRefs.Add(unknownHistograms)
|
||||
wg.Done()
|
||||
@@ -102,12 +132,14 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
wg.Add(1)
|
||||
exemplarsInput = make(chan record.RefExemplar, 300)
|
||||
go func(input <-chan record.RefExemplar) {
|
||||
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||
var err error
|
||||
defer wg.Done()
|
||||
for e := range input {
|
||||
ms := h.series.getByID(e.Ref)
|
||||
if ms == nil {
|
||||
unknownExemplarRefs.Inc()
|
||||
missingSeries[e.Ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -121,6 +153,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
h.logger.Warn("Unexpected error when replaying WAL on exemplar record", "err", err)
|
||||
}
|
||||
}
|
||||
unknownSeriesRefs.merge(missingSeries)
|
||||
}(exemplarsInput)
|
||||
|
||||
go func() {
|
||||
@@ -220,6 +253,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
}()
|
||||
|
||||
// The records are always replayed from the oldest to the newest.
|
||||
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||
Outer:
|
||||
for d := range decoded {
|
||||
switch v := d.(type) {
|
||||
@@ -287,7 +321,8 @@ Outer:
|
||||
continue
|
||||
}
|
||||
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
|
||||
unknownRefs.Inc()
|
||||
unknownTombstoneRefs.Inc()
|
||||
missingSeries[chunks.HeadSeriesRef(s.Ref)] = struct{}{}
|
||||
continue
|
||||
}
|
||||
h.tombstones.AddInterval(s.Ref, itv)
|
||||
@@ -376,6 +411,7 @@ Outer:
|
||||
s := h.series.getByID(m.Ref)
|
||||
if s == nil {
|
||||
unknownMetadataRefs.Inc()
|
||||
missingSeries[m.Ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
s.meta = &metadata.Metadata{
|
||||
@@ -389,6 +425,7 @@ Outer:
|
||||
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
||||
}
|
||||
}
|
||||
unknownSeriesRefs.merge(missingSeries)
|
||||
|
||||
if decodeErr != nil {
|
||||
return decodeErr
|
||||
@@ -411,14 +448,23 @@ Outer:
|
||||
return fmt.Errorf("read records: %w", err)
|
||||
}
|
||||
|
||||
if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 {
|
||||
if unknownSampleRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load()+unknownTombstoneRefs.Load() > 0 {
|
||||
h.logger.Warn(
|
||||
"Unknown series references",
|
||||
"samples", unknownRefs.Load(),
|
||||
"series", unknownSeriesRefs.count(),
|
||||
"samples", unknownSampleRefs.Load(),
|
||||
"exemplars", unknownExemplarRefs.Load(),
|
||||
"histograms", unknownHistogramRefs.Load(),
|
||||
"metadata", unknownMetadataRefs.Load(),
|
||||
"tombstones", unknownTombstoneRefs.Load(),
|
||||
)
|
||||
|
||||
counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownSeriesRefs.count()), "series")
|
||||
counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownSampleRefs.Load()), "samples")
|
||||
counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownExemplarRefs.Load()), "exemplars")
|
||||
counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownHistogramRefs.Load()), "histograms")
|
||||
counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownMetadataRefs.Load()), "metadata")
|
||||
counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownTombstoneRefs.Load()), "tombstones")
|
||||
}
|
||||
if count := mmapOverlappingChunks.Load(); count > 0 {
|
||||
h.logger.Info("Overlapping m-map chunks on duplicate series records", "count", count)
|
||||
@@ -548,10 +594,13 @@ func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord {
|
||||
// processWALSamples adds the samples it receives to the head and passes
|
||||
// the buffer received to an output channel for reuse.
|
||||
// Samples before the minValidTime timestamp are discarded.
|
||||
func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) {
|
||||
func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64, uint64) {
|
||||
defer close(wp.output)
|
||||
defer close(wp.histogramsOutput)
|
||||
|
||||
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||
var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64
|
||||
|
||||
minValidTime := h.minValidTime.Load()
|
||||
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
||||
appendChunkOpts := chunkOpts{
|
||||
@@ -573,7 +622,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
for _, s := range in.samples {
|
||||
ms := h.series.getByID(s.Ref)
|
||||
if ms == nil {
|
||||
unknownRefs++
|
||||
unknownSampleRefs++
|
||||
missingSeries[s.Ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
if s.T <= ms.mmMaxTime {
|
||||
@@ -603,6 +653,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
ms := h.series.getByID(s.ref)
|
||||
if ms == nil {
|
||||
unknownHistogramRefs++
|
||||
missingSeries[s.ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
if s.t <= ms.mmMaxTime {
|
||||
@@ -633,13 +684,15 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
}
|
||||
h.updateMinMaxTime(mint, maxt)
|
||||
|
||||
return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
|
||||
return missingSeries, unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks
|
||||
}
|
||||
|
||||
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
|
||||
// Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about
|
||||
// Track number of missing series records that were referenced by other records.
|
||||
unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}}
|
||||
// Track number of samples, histogram samples, and m-map markers that referenced a series we don't know about
|
||||
// for error reporting.
|
||||
var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
|
||||
var unknownSampleRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64
|
||||
|
||||
lastSeq, lastOff := lastMmapRef.Unpack()
|
||||
// Start workers that each process samples for a partition of the series ID space.
|
||||
@@ -673,8 +726,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
processors[i].setup()
|
||||
|
||||
go func(wp *wblSubsetProcessor) {
|
||||
unknown, unknownHistograms := wp.processWBLSamples(h)
|
||||
unknownRefs.Add(unknown)
|
||||
missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h)
|
||||
unknownSeriesRefs.merge(missingSeries)
|
||||
unknownSampleRefs.Add(unknownSamples)
|
||||
unknownHistogramRefs.Add(unknownHistograms)
|
||||
wg.Done()
|
||||
}(&processors[i])
|
||||
@@ -742,6 +796,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
}()
|
||||
|
||||
// The records are always replayed from the oldest to the newest.
|
||||
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||
for d := range decodedCh {
|
||||
switch v := d.(type) {
|
||||
case []record.RefSample:
|
||||
@@ -794,6 +849,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
ms := h.series.getByID(rm.Ref)
|
||||
if ms == nil {
|
||||
mmapMarkerUnknownRefs.Inc()
|
||||
missingSeries[rm.Ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
idx := uint64(ms.ref) % uint64(concurrency)
|
||||
@@ -867,6 +923,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
|
||||
}
|
||||
}
|
||||
unknownSeriesRefs.merge(missingSeries)
|
||||
|
||||
if decodeErr != nil {
|
||||
return decodeErr
|
||||
@@ -882,9 +939,21 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
|
||||
return fmt.Errorf("read records: %w", err)
|
||||
}
|
||||
|
||||
if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
|
||||
h.logger.Warn("Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
|
||||
if unknownSampleRefs.Load()+unknownHistogramRefs.Load()+mmapMarkerUnknownRefs.Load() > 0 {
|
||||
h.logger.Warn(
|
||||
"Unknown series references for ooo WAL replay",
|
||||
"series", unknownSeriesRefs.count(),
|
||||
"samples", unknownSampleRefs.Load(),
|
||||
"histograms", unknownHistogramRefs.Load(),
|
||||
"mmap_markers", mmapMarkerUnknownRefs.Load(),
|
||||
)
|
||||
|
||||
counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(unknownSeriesRefs.count()), "series")
|
||||
counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(unknownSampleRefs.Load()), "samples")
|
||||
counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(unknownHistogramRefs.Load()), "histograms")
|
||||
counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(mmapMarkerUnknownRefs.Load()), "mmap_markers")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -952,10 +1021,13 @@ func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord {
|
||||
|
||||
// processWBLSamples adds the samples it receives to the head and passes
|
||||
// the buffer received to an output channel for reuse.
|
||||
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) {
|
||||
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64) {
|
||||
defer close(wp.output)
|
||||
defer close(wp.histogramsOutput)
|
||||
|
||||
missingSeries := make(map[chunks.HeadSeriesRef]struct{})
|
||||
var unknownSampleRefs, unknownHistogramRefs uint64
|
||||
|
||||
oooCapMax := h.opts.OutOfOrderCapMax.Load()
|
||||
// We don't check for minValidTime for ooo samples.
|
||||
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
||||
@@ -972,7 +1044,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||
for _, s := range in.samples {
|
||||
ms := h.series.getByID(s.Ref)
|
||||
if ms == nil {
|
||||
unknownRefs++
|
||||
unknownSampleRefs++
|
||||
missingSeries[s.Ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
|
||||
@@ -997,6 +1070,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||
ms := h.series.getByID(s.ref)
|
||||
if ms == nil {
|
||||
unknownHistogramRefs++
|
||||
missingSeries[s.ref] = struct{}{}
|
||||
continue
|
||||
}
|
||||
var chunkCreated bool
|
||||
@@ -1027,7 +1101,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
|
||||
|
||||
h.updateMinOOOMaxOOOTime(mint, maxt)
|
||||
|
||||
return unknownRefs, unknownHistogramRefs
|
||||
return missingSeries, unknownSampleRefs, unknownHistogramRefs
|
||||
}
|
||||
|
||||
const (
|
||||
|
||||
Reference in New Issue
Block a user