diff --git a/tsdb/head.go b/tsdb/head.go index 4f4e4febfc..724437f453 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -379,6 +379,8 @@ type headMetrics struct { snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. oooHistogram prometheus.Histogram mmapChunksTotal prometheus.Counter + walReplayUnknownRefsTotal *prometheus.CounterVec + wblReplayUnknownRefsTotal *prometheus.CounterVec } const ( @@ -510,6 +512,14 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_mmap_chunks_total", Help: "Total number of chunks that were memory-mapped.", }), + walReplayUnknownRefsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_replay_unknown_refs_total", + Help: "Total number of unknown series references encountered during WAL replay.", + }, []string{"type"}), + wblReplayUnknownRefsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wbl_replay_unknown_refs_total", + Help: "Total number of unknown series references encountered during WBL replay.", + }, []string{"type"}), } if r != nil { @@ -577,6 +587,8 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { } return float64(val) }), + m.walReplayUnknownRefsTotal, + m.wblReplayUnknownRefsTotal, ) } return m diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5fff859021..161ead50b4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -284,159 +284,176 @@ func BenchmarkLoadWLs(b *testing.B) { exemplarsPercentages := []float64{0, 0.5, 1, 5} lastExemplarsPerSeries := -1 for _, c := range cases { - for _, p := range exemplarsPercentages { - exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100)) - // For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries - // multiple times without this check. - if exemplarsPerSeries == lastExemplarsPerSeries { - continue - } - lastExemplarsPerSeries = exemplarsPerSeries - b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax), - func(b *testing.B) { - dir := b.TempDir() + missingSeriesPercentages := []float64{0, 0.1} + for _, missingSeriesPct := range missingSeriesPercentages { + for _, p := range exemplarsPercentages { + exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100)) + // For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries + // multiple times without this check. + if exemplarsPerSeries == lastExemplarsPerSeries { + continue + } + lastExemplarsPerSeries = exemplarsPerSeries + b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d,missingSeriesPct=%.3f", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct), + func(b *testing.B) { + dir := b.TempDir() - wal, err := wlog.New(nil, nil, dir, compression.None) - require.NoError(b, err) - var wbl *wlog.WL - if c.oooSeriesPct != 0 { - wbl, err = wlog.New(nil, nil, dir, compression.None) + wal, err := wlog.New(nil, nil, dir, compression.None) require.NoError(b, err) - } - - // Write series. - refSeries := make([]record.RefSeries, 0, c.seriesPerBatch) - for k := 0; k < c.batches; k++ { - refSeries = refSeries[:0] - for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ { - lbls := make(map[string]string, labelsPerSeries) - lbls[defaultLabelName] = strconv.Itoa(i) - for j := 1; len(lbls) < labelsPerSeries; j++ { - lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) - } - refSeries = append(refSeries, record.RefSeries{Ref: chunks.HeadSeriesRef(i) * 101, Labels: labels.FromMap(lbls)}) + var wbl *wlog.WL + if c.oooSeriesPct != 0 { + wbl, err = wlog.New(nil, nil, dir, compression.None) + require.NoError(b, err) } - populateTestWL(b, wal, []interface{}{refSeries}) - } - // Write samples. - refSamples := make([]record.RefSample, 0, c.seriesPerBatch) - - oooSeriesPerBatch := int(float64(c.seriesPerBatch) * c.oooSeriesPct) - oooSamplesPerSeries := int(float64(c.samplesPerSeries) * c.oooSamplesPct) - - for i := 0; i < c.samplesPerSeries; i++ { - for j := 0; j < c.batches; j++ { - refSamples = refSamples[:0] - - k := j * c.seriesPerBatch - // Skip appending the first oooSamplesPerSeries samples for the series in the batch that - // should have OOO samples. OOO samples are appended after all the in-order samples. - if i < oooSamplesPerSeries { - k += oooSeriesPerBatch - } - for ; k < (j+1)*c.seriesPerBatch; k++ { - refSamples = append(refSamples, record.RefSample{ - Ref: chunks.HeadSeriesRef(k) * 101, - T: int64(i) * 10, - V: float64(i) * 100, - }) - } - populateTestWL(b, wal, []interface{}{refSamples}) - } - } - - // Write mmapped chunks. - if c.mmappedChunkT != 0 { - chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) - require.NoError(b, err) - cOpts := chunkOpts{ - chunkDiskMapper: chunkDiskMapper, - chunkRange: c.mmappedChunkT, - samplesPerChunk: DefaultSamplesPerChunk, - } - for k := 0; k < c.batches*c.seriesPerBatch; k++ { - // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled) - s.append(c.mmappedChunkT, 42, 0, cOpts) - // There's only one head chunk because only a single sample is appended. mmapChunks() - // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with - // the sample at c.mmappedChunkT is mmapped. - s.cutNewHeadChunk(c.mmappedChunkT, chunkenc.EncXOR, c.mmappedChunkT) - s.mmapChunks(chunkDiskMapper) - } - require.NoError(b, chunkDiskMapper.Close()) - } - - // Write exemplars. - refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch) - for i := 0; i < exemplarsPerSeries; i++ { - for j := 0; j < c.batches; j++ { - refExemplars = refExemplars[:0] - for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { - refExemplars = append(refExemplars, record.RefExemplar{ - Ref: chunks.HeadSeriesRef(k) * 101, - T: int64(i) * 10, - V: float64(i) * 100, - Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)), - }) - } - populateTestWL(b, wal, []interface{}{refExemplars}) - } - } - - // Write OOO samples and mmap markers. - refMarkers := make([]record.RefMmapMarker, 0, oooSeriesPerBatch) - refSamples = make([]record.RefSample, 0, oooSeriesPerBatch) - for i := 0; i < oooSamplesPerSeries; i++ { - shouldAddMarkers := c.oooCapMax != 0 && i != 0 && int64(i)%c.oooCapMax == 0 - - for j := 0; j < c.batches; j++ { - refSamples = refSamples[:0] - if shouldAddMarkers { - refMarkers = refMarkers[:0] - } - for k := j * c.seriesPerBatch; k < (j*c.seriesPerBatch)+oooSeriesPerBatch; k++ { - ref := chunks.HeadSeriesRef(k) * 101 - if shouldAddMarkers { - // loadWBL() checks that the marker's MmapRef is less than or equal to the ref - // for the last mmap chunk. Setting MmapRef to 0 to always pass that check. - refMarkers = append(refMarkers, record.RefMmapMarker{Ref: ref, MmapRef: 0}) + // Write series. + refSeries := make([]record.RefSeries, 0, c.seriesPerBatch) + for k := 0; k < c.batches; k++ { + refSeries = refSeries[:0] + for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ { + lbls := make(map[string]string, labelsPerSeries) + lbls[defaultLabelName] = strconv.Itoa(i) + for j := 1; len(lbls) < labelsPerSeries; j++ { + lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) } - refSamples = append(refSamples, record.RefSample{ - Ref: ref, - T: int64(i) * 10, - V: float64(i) * 100, - }) + refSeries = append(refSeries, record.RefSeries{Ref: chunks.HeadSeriesRef(i) * 101, Labels: labels.FromMap(lbls)}) } - if shouldAddMarkers { - populateTestWL(b, wbl, []interface{}{refMarkers}) + + writeSeries := refSeries + if missingSeriesPct > 0 { + newWriteSeries := make([]record.RefSeries, 0, int(float64(len(refSeries))*(1.0-missingSeriesPct))) + keepRatio := 1.0 - missingSeriesPct + // Keep approximately every 1/keepRatio series. + for i, s := range refSeries { + if int(float64(i)*keepRatio) != int(float64(i+1)*keepRatio) { + newWriteSeries = append(newWriteSeries, s) + } + } + writeSeries = newWriteSeries } - populateTestWL(b, wal, []interface{}{refSamples}) - populateTestWL(b, wbl, []interface{}{refSamples}) - } - } - b.ResetTimer() - - // Load the WAL. - for i := 0; i < b.N; i++ { - opts := DefaultHeadOptions() - opts.ChunkRange = 1000 - opts.ChunkDirRoot = dir - if c.oooCapMax > 0 { - opts.OutOfOrderCapMax.Store(c.oooCapMax) + populateTestWL(b, wal, []interface{}{writeSeries}) } - h, err := NewHead(nil, nil, wal, wbl, opts, nil) - require.NoError(b, err) - h.Init(0) - } - b.StopTimer() - wal.Close() - if wbl != nil { - wbl.Close() - } - }) + + // Write samples. + refSamples := make([]record.RefSample, 0, c.seriesPerBatch) + + oooSeriesPerBatch := int(float64(c.seriesPerBatch) * c.oooSeriesPct) + oooSamplesPerSeries := int(float64(c.samplesPerSeries) * c.oooSamplesPct) + + for i := 0; i < c.samplesPerSeries; i++ { + for j := 0; j < c.batches; j++ { + refSamples = refSamples[:0] + + k := j * c.seriesPerBatch + // Skip appending the first oooSamplesPerSeries samples for the series in the batch that + // should have OOO samples. OOO samples are appended after all the in-order samples. + if i < oooSamplesPerSeries { + k += oooSeriesPerBatch + } + for ; k < (j+1)*c.seriesPerBatch; k++ { + refSamples = append(refSamples, record.RefSample{ + Ref: chunks.HeadSeriesRef(k) * 101, + T: int64(i) * 10, + V: float64(i) * 100, + }) + } + populateTestWL(b, wal, []interface{}{refSamples}) + } + } + + // Write mmapped chunks. + if c.mmappedChunkT != 0 { + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(b, err) + cOpts := chunkOpts{ + chunkDiskMapper: chunkDiskMapper, + chunkRange: c.mmappedChunkT, + samplesPerChunk: DefaultSamplesPerChunk, + } + for k := 0; k < c.batches*c.seriesPerBatch; k++ { + // Create one mmapped chunk per series, with one sample at the given time. + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled) + s.append(c.mmappedChunkT, 42, 0, cOpts) + // There's only one head chunk because only a single sample is appended. mmapChunks() + // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with + // the sample at c.mmappedChunkT is mmapped. + s.cutNewHeadChunk(c.mmappedChunkT, chunkenc.EncXOR, c.mmappedChunkT) + s.mmapChunks(chunkDiskMapper) + } + require.NoError(b, chunkDiskMapper.Close()) + } + + // Write exemplars. + refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch) + for i := 0; i < exemplarsPerSeries; i++ { + for j := 0; j < c.batches; j++ { + refExemplars = refExemplars[:0] + for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { + refExemplars = append(refExemplars, record.RefExemplar{ + Ref: chunks.HeadSeriesRef(k) * 101, + T: int64(i) * 10, + V: float64(i) * 100, + Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)), + }) + } + populateTestWL(b, wal, []interface{}{refExemplars}) + } + } + + // Write OOO samples and mmap markers. + refMarkers := make([]record.RefMmapMarker, 0, oooSeriesPerBatch) + refSamples = make([]record.RefSample, 0, oooSeriesPerBatch) + for i := 0; i < oooSamplesPerSeries; i++ { + shouldAddMarkers := c.oooCapMax != 0 && i != 0 && int64(i)%c.oooCapMax == 0 + + for j := 0; j < c.batches; j++ { + refSamples = refSamples[:0] + if shouldAddMarkers { + refMarkers = refMarkers[:0] + } + for k := j * c.seriesPerBatch; k < (j*c.seriesPerBatch)+oooSeriesPerBatch; k++ { + ref := chunks.HeadSeriesRef(k) * 101 + if shouldAddMarkers { + // loadWBL() checks that the marker's MmapRef is less than or equal to the ref + // for the last mmap chunk. Setting MmapRef to 0 to always pass that check. + refMarkers = append(refMarkers, record.RefMmapMarker{Ref: ref, MmapRef: 0}) + } + refSamples = append(refSamples, record.RefSample{ + Ref: ref, + T: int64(i) * 10, + V: float64(i) * 100, + }) + } + if shouldAddMarkers { + populateTestWL(b, wbl, []interface{}{refMarkers}) + } + populateTestWL(b, wal, []interface{}{refSamples}) + populateTestWL(b, wbl, []interface{}{refSamples}) + } + } + + b.ResetTimer() + + // Load the WAL. + for i := 0; i < b.N; i++ { + opts := DefaultHeadOptions() + opts.ChunkRange = 1000 + opts.ChunkDirRoot = dir + if c.oooCapMax > 0 { + opts.OutOfOrderCapMax.Store(c.oooCapMax) + } + h, err := NewHead(nil, nil, wal, wbl, opts, nil) + require.NoError(b, err) + h.Init(0) + } + b.StopTimer() + wal.Close() + if wbl != nil { + wbl.Close() + } + }) + } } } } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 48263c653a..4de091c291 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -16,6 +16,7 @@ package tsdb import ( "errors" "fmt" + "maps" "math" "os" "path/filepath" @@ -26,6 +27,8 @@ import ( "go.uber.org/atomic" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -50,13 +53,39 @@ type histogramRecord struct { fh *histogram.FloatHistogram } +type seriesRefSet struct { + refs map[chunks.HeadSeriesRef]struct{} + mtx sync.Mutex +} + +func (s *seriesRefSet) merge(other map[chunks.HeadSeriesRef]struct{}) { + s.mtx.Lock() + defer s.mtx.Unlock() + maps.Copy(s.refs, other) +} + +func (s *seriesRefSet) count() int { + s.mtx.Lock() + defer s.mtx.Unlock() + return len(s.refs) +} + +func counterAddNonZero(v *prometheus.CounterVec, value float64, lvs ...string) { + if value > 0 { + v.WithLabelValues(lvs...).Add(value) + } +} + func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { - // Track number of samples that referenced a series we don't know about + // Track number of missing series records that were referenced by other records. + unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}} + // Track number of different records that referenced a series we don't know about // for error reporting. - var unknownRefs atomic.Uint64 + var unknownSampleRefs atomic.Uint64 var unknownExemplarRefs atomic.Uint64 var unknownHistogramRefs atomic.Uint64 var unknownMetadataRefs atomic.Uint64 + var unknownTombstoneRefs atomic.Uint64 // Track number of series records that had overlapping m-map chunks. var mmapOverlappingChunks atomic.Uint64 @@ -91,8 +120,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch processors[i].setup() go func(wp *walSubsetProcessor) { - unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks) - unknownRefs.Add(unknown) + missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks) + unknownSeriesRefs.merge(missingSeries) + unknownSampleRefs.Add(unknownSamples) mmapOverlappingChunks.Add(overlapping) unknownHistogramRefs.Add(unknownHistograms) wg.Done() @@ -102,12 +132,14 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch wg.Add(1) exemplarsInput = make(chan record.RefExemplar, 300) go func(input <-chan record.RefExemplar) { + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) var err error defer wg.Done() for e := range input { ms := h.series.getByID(e.Ref) if ms == nil { unknownExemplarRefs.Inc() + missingSeries[e.Ref] = struct{}{} continue } @@ -121,6 +153,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch h.logger.Warn("Unexpected error when replaying WAL on exemplar record", "err", err) } } + unknownSeriesRefs.merge(missingSeries) }(exemplarsInput) go func() { @@ -220,6 +253,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() // The records are always replayed from the oldest to the newest. + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) Outer: for d := range decoded { switch v := d.(type) { @@ -287,7 +321,8 @@ Outer: continue } if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { - unknownRefs.Inc() + unknownTombstoneRefs.Inc() + missingSeries[chunks.HeadSeriesRef(s.Ref)] = struct{}{} continue } h.tombstones.AddInterval(s.Ref, itv) @@ -376,6 +411,7 @@ Outer: s := h.series.getByID(m.Ref) if s == nil { unknownMetadataRefs.Inc() + missingSeries[m.Ref] = struct{}{} continue } s.meta = &metadata.Metadata{ @@ -389,6 +425,7 @@ Outer: panic(fmt.Errorf("unexpected decoded type: %T", d)) } } + unknownSeriesRefs.merge(missingSeries) if decodeErr != nil { return decodeErr @@ -411,14 +448,23 @@ Outer: return fmt.Errorf("read records: %w", err) } - if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 { + if unknownSampleRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load()+unknownTombstoneRefs.Load() > 0 { h.logger.Warn( "Unknown series references", - "samples", unknownRefs.Load(), + "series", unknownSeriesRefs.count(), + "samples", unknownSampleRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "histograms", unknownHistogramRefs.Load(), "metadata", unknownMetadataRefs.Load(), + "tombstones", unknownTombstoneRefs.Load(), ) + + counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownSeriesRefs.count()), "series") + counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownSampleRefs.Load()), "samples") + counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownExemplarRefs.Load()), "exemplars") + counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownHistogramRefs.Load()), "histograms") + counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownMetadataRefs.Load()), "metadata") + counterAddNonZero(h.metrics.walReplayUnknownRefsTotal, float64(unknownTombstoneRefs.Load()), "tombstones") } if count := mmapOverlappingChunks.Load(); count > 0 { h.logger.Info("Overlapping m-map chunks on duplicate series records", "count", count) @@ -548,10 +594,13 @@ func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord { // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. -func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) { +func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64, uint64) { defer close(wp.output) defer close(wp.histogramsOutput) + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) + var unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks uint64 + minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) appendChunkOpts := chunkOpts{ @@ -573,7 +622,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp for _, s := range in.samples { ms := h.series.getByID(s.Ref) if ms == nil { - unknownRefs++ + unknownSampleRefs++ + missingSeries[s.Ref] = struct{}{} continue } if s.T <= ms.mmMaxTime { @@ -603,6 +653,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp ms := h.series.getByID(s.ref) if ms == nil { unknownHistogramRefs++ + missingSeries[s.ref] = struct{}{} continue } if s.t <= ms.mmMaxTime { @@ -633,13 +684,15 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } h.updateMinMaxTime(mint, maxt) - return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks + return missingSeries, unknownSampleRefs, unknownHistogramRefs, mmapOverlappingChunks } func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { - // Track number of samples, histogram samples, m-map markers, that referenced a series we don't know about + // Track number of missing series records that were referenced by other records. + unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}} + // Track number of samples, histogram samples, and m-map markers that referenced a series we don't know about // for error reporting. - var unknownRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 + var unknownSampleRefs, unknownHistogramRefs, mmapMarkerUnknownRefs atomic.Uint64 lastSeq, lastOff := lastMmapRef.Unpack() // Start workers that each process samples for a partition of the series ID space. @@ -673,8 +726,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch processors[i].setup() go func(wp *wblSubsetProcessor) { - unknown, unknownHistograms := wp.processWBLSamples(h) - unknownRefs.Add(unknown) + missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h) + unknownSeriesRefs.merge(missingSeries) + unknownSampleRefs.Add(unknownSamples) unknownHistogramRefs.Add(unknownHistograms) wg.Done() }(&processors[i]) @@ -742,6 +796,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() // The records are always replayed from the oldest to the newest. + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) for d := range decodedCh { switch v := d.(type) { case []record.RefSample: @@ -794,6 +849,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch ms := h.series.getByID(rm.Ref) if ms == nil { mmapMarkerUnknownRefs.Inc() + missingSeries[rm.Ref] = struct{}{} continue } idx := uint64(ms.ref) % uint64(concurrency) @@ -867,6 +923,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch panic(fmt.Errorf("unexpected decodedCh type: %T", d)) } } + unknownSeriesRefs.merge(missingSeries) if decodeErr != nil { return decodeErr @@ -882,9 +939,21 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch return fmt.Errorf("read records: %w", err) } - if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 { - h.logger.Warn("Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load()) + if unknownSampleRefs.Load()+unknownHistogramRefs.Load()+mmapMarkerUnknownRefs.Load() > 0 { + h.logger.Warn( + "Unknown series references for ooo WAL replay", + "series", unknownSeriesRefs.count(), + "samples", unknownSampleRefs.Load(), + "histograms", unknownHistogramRefs.Load(), + "mmap_markers", mmapMarkerUnknownRefs.Load(), + ) + + counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(unknownSeriesRefs.count()), "series") + counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(unknownSampleRefs.Load()), "samples") + counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(unknownHistogramRefs.Load()), "histograms") + counterAddNonZero(h.metrics.wblReplayUnknownRefsTotal, float64(mmapMarkerUnknownRefs.Load()), "mmap_markers") } + return nil } @@ -952,10 +1021,13 @@ func (wp *wblSubsetProcessor) reuseHistogramBuf() []histogramRecord { // processWBLSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. -func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHistogramRefs uint64) { +func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesRef]struct{}, uint64, uint64) { defer close(wp.output) defer close(wp.histogramsOutput) + missingSeries := make(map[chunks.HeadSeriesRef]struct{}) + var unknownSampleRefs, unknownHistogramRefs uint64 + oooCapMax := h.opts.OutOfOrderCapMax.Load() // We don't check for minValidTime for ooo samples. mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) @@ -972,7 +1044,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi for _, s := range in.samples { ms := h.series.getByID(s.Ref) if ms == nil { - unknownRefs++ + unknownSampleRefs++ + missingSeries[s.Ref] = struct{}{} continue } ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) @@ -997,6 +1070,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi ms := h.series.getByID(s.ref) if ms == nil { unknownHistogramRefs++ + missingSeries[s.ref] = struct{}{} continue } var chunkCreated bool @@ -1027,7 +1101,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi h.updateMinOOOMaxOOOTime(mint, maxt) - return unknownRefs, unknownHistogramRefs + return missingSeries, unknownSampleRefs, unknownHistogramRefs } const (