mirror of
https://github.com/prometheus/prometheus
synced 2026-04-20 22:41:05 +08:00
tsdb/wlog[PERF]: optimize WAL watcher reads (up to 540x less B/op; 13000x less allocs/op) (#18250)
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Compliance testing (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
Sync repo files / repo_sync (push) Has been cancelled
Stale Check / stale (push) Has been cancelled
Lock Threads / action (push) Has been cancelled
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Compliance testing (push) Has been cancelled
CI / Build Prometheus for common architectures (0) (push) Has been cancelled
CI / Build Prometheus for common architectures (1) (push) Has been cancelled
CI / Build Prometheus for common architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (0) (push) Has been cancelled
CI / Build Prometheus for all architectures (1) (push) Has been cancelled
CI / Build Prometheus for all architectures (10) (push) Has been cancelled
CI / Build Prometheus for all architectures (11) (push) Has been cancelled
CI / Build Prometheus for all architectures (2) (push) Has been cancelled
CI / Build Prometheus for all architectures (3) (push) Has been cancelled
CI / Build Prometheus for all architectures (4) (push) Has been cancelled
CI / Build Prometheus for all architectures (5) (push) Has been cancelled
CI / Build Prometheus for all architectures (6) (push) Has been cancelled
CI / Build Prometheus for all architectures (7) (push) Has been cancelled
CI / Build Prometheus for all architectures (8) (push) Has been cancelled
CI / Build Prometheus for all architectures (9) (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
Sync repo files / repo_sync (push) Has been cancelled
Stale Check / stale (push) Has been cancelled
Lock Threads / action (push) Has been cancelled
See the detailed analysis https://docs.google.com/document/d/1efVAMcEw7-R_KatHHcobcFBlNsre-DoThVHI8AO2SDQ/edit?tab=t.0 I ran extensive benchmarks using synthetic data as well as real WAL segments pulled from the prombench runs. All benchmarks are here https://github.com/prometheus/prometheus/compare/bwplotka/wal-reuse?expand=1 * optimization(tsdb/wlog): reuse Ref* buffers across WAL watchers' reads Signed-off-by: bwplotka <bwplotka@gmail.com> * optimization(tsdb/wlog): avoid expensive error wraps Signed-off-by: bwplotka <bwplotka@gmail.com> * optimization(tsdb/wlog): reuse array for filtering Signed-off-by: bwplotka <bwplotka@gmail.com> * fmt Signed-off-by: bwplotka <bwplotka@gmail.com> * lint fix Signed-off-by: bwplotka <bwplotka@gmail.com> * tsdb/record: add test for clear() on histograms Signed-off-by: bwplotka <bwplotka@gmail.com> * updated WriteTo with what's currently expected Signed-off-by: bwplotka <bwplotka@gmail.com> --------- Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
committed by
GitHub
parent
89b3ad45a8
commit
a73202012b
@@ -488,6 +488,7 @@ func NewQueueManager(
|
||||
enableNativeHistogramRemoteWrite bool,
|
||||
enableTypeAndUnitLabels bool,
|
||||
protoMsg remoteapi.WriteMessageType,
|
||||
recordBuf *record.BuffersPool,
|
||||
) *QueueManager {
|
||||
if logger == nil {
|
||||
logger = promslog.NewNopLogger()
|
||||
@@ -537,7 +538,7 @@ func NewQueueManager(
|
||||
|
||||
walMetadata := t.protoMsg != remoteapi.WriteV1MessageType
|
||||
|
||||
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata)
|
||||
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata, recordBuf)
|
||||
|
||||
// The current MetadataWatcher implementation is mutually exclusive
|
||||
// with the new approach, which stores metadata as WAL records and
|
||||
|
||||
@@ -302,7 +302,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
|
||||
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg remoteapi.WriteMessageType) *QueueManager {
|
||||
dir := t.TempDir()
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg, record.NewBuffersPool())
|
||||
|
||||
return m
|
||||
}
|
||||
@@ -770,7 +770,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
|
||||
}
|
||||
)
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, nil)
|
||||
m.StoreSeries(recs.Series, 0)
|
||||
|
||||
// Attempt to samples while the manager is running. We immediately stop the
|
||||
@@ -1346,7 +1346,7 @@ func BenchmarkStoreSeries(b *testing.B) {
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType, record.NewBuffersPool())
|
||||
m.externalLabels = tc.externalLabels
|
||||
m.relabelConfigs = tc.relabelConfigs
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/metadata"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/record"
|
||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
@@ -72,6 +73,8 @@ type WriteStorage struct {
|
||||
scraper ReadyScrapeManager
|
||||
quit chan struct{}
|
||||
|
||||
recordBuf *record.BuffersPool
|
||||
|
||||
// For timestampTracker.
|
||||
highestTimestamp *maxTimestamp
|
||||
enableTypeAndUnitLabels bool
|
||||
@@ -102,6 +105,7 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
|
||||
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet. Deprecated, check prometheus_remote_storage_queue_highest_timestamp_seconds which is more accurate.",
|
||||
}),
|
||||
},
|
||||
recordBuf: record.NewBuffersPool(),
|
||||
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
|
||||
}
|
||||
if reg != nil {
|
||||
@@ -215,6 +219,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||
rwConf.SendNativeHistograms,
|
||||
rws.enableTypeAndUnitLabels,
|
||||
rwConf.ProtobufMessage,
|
||||
rws.recordBuf,
|
||||
)
|
||||
// Keep track of which queues are new so we know which to start.
|
||||
newHashes = append(newHashes, hash)
|
||||
|
||||
13
tsdb/head.go
13
tsdb/head.go
@@ -79,12 +79,13 @@ type Head struct {
|
||||
// This should be typecasted to chunks.ChunkDiskMapperRef after loading.
|
||||
minOOOMmapRef atomic.Uint64
|
||||
|
||||
metrics *headMetrics
|
||||
opts *HeadOptions
|
||||
wal, wbl *wlog.WL
|
||||
exemplarMetrics *ExemplarMetrics
|
||||
exemplars ExemplarStorage
|
||||
logger *slog.Logger
|
||||
metrics *headMetrics
|
||||
opts *HeadOptions
|
||||
wal, wbl *wlog.WL
|
||||
exemplarMetrics *ExemplarMetrics
|
||||
exemplars ExemplarStorage
|
||||
logger *slog.Logger
|
||||
// TODO(bwplotka): Consider using record.Pools that's reused with WAL watchers.
|
||||
refSeriesPool zeropool.Pool[[]record.RefSeries]
|
||||
floatsPool zeropool.Pool[[]record.RefSample]
|
||||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||
|
||||
115
tsdb/record/buffers.go
Normal file
115
tsdb/record/buffers.go
Normal file
@@ -0,0 +1,115 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package record
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/util/zeropool"
|
||||
)
|
||||
|
||||
// BuffersPool offers pool of zero-ed record buffers.
|
||||
type BuffersPool struct {
|
||||
series zeropool.Pool[[]RefSeries]
|
||||
samples zeropool.Pool[[]RefSample]
|
||||
exemplars zeropool.Pool[[]RefExemplar]
|
||||
histograms zeropool.Pool[[]RefHistogramSample]
|
||||
floatHistograms zeropool.Pool[[]RefFloatHistogramSample]
|
||||
metadata zeropool.Pool[[]RefMetadata]
|
||||
}
|
||||
|
||||
// NewBuffersPool returns a new BuffersPool object.
|
||||
func NewBuffersPool() *BuffersPool {
|
||||
return &BuffersPool{}
|
||||
}
|
||||
|
||||
func (p *BuffersPool) GetRefSeries(capacity int) []RefSeries {
|
||||
b := p.series.Get()
|
||||
if b == nil {
|
||||
return make([]RefSeries, 0, capacity)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (p *BuffersPool) PutRefSeries(b []RefSeries) {
|
||||
for i := range b { // Zero out to avoid retaining label data.
|
||||
b[i].Labels = labels.EmptyLabels()
|
||||
}
|
||||
p.series.Put(b[:0])
|
||||
}
|
||||
|
||||
func (p *BuffersPool) GetSamples(capacity int) []RefSample {
|
||||
b := p.samples.Get()
|
||||
if b == nil {
|
||||
return make([]RefSample, 0, capacity)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (p *BuffersPool) PutSamples(b []RefSample) {
|
||||
p.samples.Put(b[:0])
|
||||
}
|
||||
|
||||
func (p *BuffersPool) GetExemplars(capacity int) []RefExemplar {
|
||||
b := p.exemplars.Get()
|
||||
if b == nil {
|
||||
return make([]RefExemplar, 0, capacity)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (p *BuffersPool) PutExemplars(b []RefExemplar) {
|
||||
for i := range b { // Zero out to avoid retaining label data.
|
||||
b[i].Labels = labels.EmptyLabels()
|
||||
}
|
||||
p.exemplars.Put(b[:0])
|
||||
}
|
||||
|
||||
func (p *BuffersPool) GetHistograms(capacity int) []RefHistogramSample {
|
||||
b := p.histograms.Get()
|
||||
if b == nil {
|
||||
return make([]RefHistogramSample, 0, capacity)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (p *BuffersPool) PutHistograms(b []RefHistogramSample) {
|
||||
clear(b)
|
||||
p.histograms.Put(b[:0])
|
||||
}
|
||||
|
||||
func (p *BuffersPool) GetFloatHistograms(capacity int) []RefFloatHistogramSample {
|
||||
b := p.floatHistograms.Get()
|
||||
if b == nil {
|
||||
return make([]RefFloatHistogramSample, 0, capacity)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (p *BuffersPool) PutFloatHistograms(b []RefFloatHistogramSample) {
|
||||
clear(b)
|
||||
p.floatHistograms.Put(b[:0])
|
||||
}
|
||||
|
||||
func (p *BuffersPool) GetMetadata(capacity int) []RefMetadata {
|
||||
b := p.metadata.Get()
|
||||
if b == nil {
|
||||
return make([]RefMetadata, 0, capacity)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (p *BuffersPool) PutMetadata(b []RefMetadata) {
|
||||
clear(b)
|
||||
p.metadata.Put(b[:0])
|
||||
}
|
||||
50
tsdb/record/buffers_test.go
Normal file
50
tsdb/record/buffers_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package record
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
)
|
||||
|
||||
func TestBuffersPool_PtrClear(t *testing.T) {
|
||||
pool := NewBuffersPool()
|
||||
|
||||
h := pool.GetHistograms(1)
|
||||
h = append(h, RefHistogramSample{
|
||||
H: &histogram.Histogram{Schema: 1244124},
|
||||
})
|
||||
pool.PutHistograms(h)
|
||||
|
||||
h2 := pool.GetHistograms(1)
|
||||
require.Empty(t, h2)
|
||||
require.Equal(t, 1, cap(h2))
|
||||
h2 = h2[:1] // extend to capacity to check previously stored item
|
||||
require.Nil(t, h2[0].H)
|
||||
|
||||
fh := pool.GetFloatHistograms(1)
|
||||
fh = append(fh, RefFloatHistogramSample{
|
||||
FH: &histogram.FloatHistogram{Schema: 1244521},
|
||||
})
|
||||
pool.PutFloatHistograms(fh)
|
||||
|
||||
fh2 := pool.GetFloatHistograms(1)
|
||||
require.Empty(t, fh2)
|
||||
require.Equal(t, 1, cap(fh2))
|
||||
fh2 = fh2[:1] // extend to capacity
|
||||
require.Nil(t, fh2[0].FH)
|
||||
}
|
||||
@@ -45,11 +45,15 @@ var (
|
||||
)
|
||||
|
||||
// WriteTo is an interface used by the Watcher to send the samples it's read
|
||||
// from the WAL on to somewhere else. Functions will be called concurrently
|
||||
// and it is left to the implementer to make sure they are safe.
|
||||
// from the WAL on to somewhere else.
|
||||
//
|
||||
// Implementations must:
|
||||
// * Ensure it's safe for concurrent goroutine use.
|
||||
// * Ensure slices are not reused after method calls.
|
||||
type WriteTo interface {
|
||||
// Append and AppendExemplar should block until the samples are fully accepted,
|
||||
// whether enqueued in memory or successfully written to it's final destination.
|
||||
// Append and all the rest Append* methods should block until
|
||||
// the samples are fully accepted e.g. enqueued in memory.
|
||||
//
|
||||
// Once returned, the WAL Watcher will not attempt to pass that data again.
|
||||
Append([]record.RefSample) bool
|
||||
AppendExemplars([]record.RefExemplar) bool
|
||||
@@ -60,9 +64,10 @@ type WriteTo interface {
|
||||
|
||||
// UpdateSeriesSegment and SeriesReset are intended for
|
||||
// garbage-collection:
|
||||
// First we call UpdateSeriesSegment on all current series.
|
||||
// * First we call UpdateSeriesSegment on all current series.
|
||||
// * Then SeriesReset is called.
|
||||
UpdateSeriesSegment([]record.RefSeries, int)
|
||||
// Then SeriesReset is called to allow the deletion of all series
|
||||
// SeriesReset is called to allow the deletion of all series
|
||||
// created in a segment lower than the argument.
|
||||
SeriesReset(int)
|
||||
}
|
||||
@@ -85,6 +90,7 @@ type WatcherMetrics struct {
|
||||
type Watcher struct {
|
||||
name string
|
||||
writer WriteTo
|
||||
recordBuf *record.BuffersPool
|
||||
logger *slog.Logger
|
||||
walDir string
|
||||
lastCheckpoint string
|
||||
@@ -187,12 +193,25 @@ func (m *WatcherMetrics) Unregister() {
|
||||
}
|
||||
|
||||
// NewWatcher creates a new WAL watcher for a given WriteTo.
|
||||
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger *slog.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher {
|
||||
func NewWatcher(
|
||||
metrics *WatcherMetrics,
|
||||
readerMetrics *LiveReaderMetrics,
|
||||
logger *slog.Logger,
|
||||
name string,
|
||||
writer WriteTo,
|
||||
dir string,
|
||||
sendExemplars, sendHistograms, sendMetadata bool,
|
||||
recordBuf *record.BuffersPool,
|
||||
) *Watcher {
|
||||
if logger == nil {
|
||||
logger = promslog.NewNopLogger()
|
||||
}
|
||||
if recordBuf == nil {
|
||||
recordBuf = record.NewBuffersPool()
|
||||
}
|
||||
return &Watcher{
|
||||
logger: logger,
|
||||
recordBuf: recordBuf,
|
||||
writer: writer,
|
||||
metrics: metrics,
|
||||
readerMetrics: readerMetrics,
|
||||
@@ -492,19 +511,24 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
|
||||
|
||||
// Read from a segment and pass the details to w.writer.
|
||||
// Also used with readCheckpoint - implements segmentReadFn.
|
||||
// TODO(bwplotka): Rename tail to !onlySeries; extremely confusing and easy to miss.
|
||||
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
var (
|
||||
dec = record.NewDecoder(labels.NewSymbolTable(), w.logger) // One table per WAL segment means it won't grow indefinitely.
|
||||
series []record.RefSeries
|
||||
samples []record.RefSample
|
||||
samplesToSend []record.RefSample
|
||||
exemplars []record.RefExemplar
|
||||
histograms []record.RefHistogramSample
|
||||
histogramsToSend []record.RefHistogramSample
|
||||
floatHistograms []record.RefFloatHistogramSample
|
||||
floatHistogramsToSend []record.RefFloatHistogramSample
|
||||
metadata []record.RefMetadata
|
||||
)
|
||||
series := w.recordBuf.GetRefSeries(512)
|
||||
samples := w.recordBuf.GetSamples(512)
|
||||
exemplars := w.recordBuf.GetExemplars(512)
|
||||
histograms := w.recordBuf.GetHistograms(512)
|
||||
floatHistograms := w.recordBuf.GetFloatHistograms(512)
|
||||
metadata := w.recordBuf.GetMetadata(512)
|
||||
defer func() {
|
||||
w.recordBuf.PutRefSeries(series)
|
||||
w.recordBuf.PutSamples(samples)
|
||||
w.recordBuf.PutExemplars(exemplars)
|
||||
w.recordBuf.PutHistograms(histograms)
|
||||
w.recordBuf.PutFloatHistograms(floatHistograms)
|
||||
w.recordBuf.PutMetadata(metadata)
|
||||
}()
|
||||
|
||||
dec := record.NewDecoder(labels.NewSymbolTable(), w.logger) // One table per WAL segment means it won't grow indefinitely.
|
||||
for r.Next() && !isClosed(w.quit) {
|
||||
var err error
|
||||
rec := r.Record()
|
||||
@@ -530,6 +554,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
return err
|
||||
}
|
||||
// Reuse the underlying array for efficiency.
|
||||
// It's valid to do, because we override elements that we no longer need to read when filtering.
|
||||
samplesToSend := samples[:0]
|
||||
for _, s := range samples {
|
||||
if s.T > w.startTimestamp {
|
||||
if !w.sendSamples {
|
||||
@@ -542,7 +569,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
}
|
||||
if len(samplesToSend) > 0 {
|
||||
w.writer.Append(samplesToSend)
|
||||
samplesToSend = samplesToSend[:0]
|
||||
}
|
||||
|
||||
case record.Exemplars:
|
||||
@@ -575,6 +601,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
return err
|
||||
}
|
||||
// Reuse the underlying array for efficiency.
|
||||
// It's valid to do, because we override elements that we no longer need to read when filtering.
|
||||
histogramsToSend := histograms[:0]
|
||||
for _, h := range histograms {
|
||||
if h.T > w.startTimestamp {
|
||||
if !w.sendSamples {
|
||||
@@ -587,7 +616,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
}
|
||||
if len(histogramsToSend) > 0 {
|
||||
w.writer.AppendHistograms(histogramsToSend)
|
||||
histogramsToSend = histogramsToSend[:0]
|
||||
}
|
||||
|
||||
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
|
||||
@@ -603,6 +631,9 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
return err
|
||||
}
|
||||
// Reuse the underlying array for efficiency.
|
||||
// It's valid to do, because we override elements that we no longer need to read when filtering.
|
||||
floatHistogramsToSend := floatHistograms[:0]
|
||||
for _, fh := range floatHistograms {
|
||||
if fh.T > w.startTimestamp {
|
||||
if !w.sendSamples {
|
||||
@@ -615,7 +646,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
}
|
||||
if len(floatHistogramsToSend) > 0 {
|
||||
w.writer.AppendFloatHistograms(floatHistogramsToSend)
|
||||
floatHistogramsToSend = floatHistogramsToSend[:0]
|
||||
}
|
||||
|
||||
case record.Metadata:
|
||||
@@ -637,19 +667,18 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
||||
// We're not interested in other types of records.
|
||||
}
|
||||
}
|
||||
if err := r.Err(); err != nil {
|
||||
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
||||
}
|
||||
return nil
|
||||
// NOTE: r.Err == io.EOF is a common case when tailing.
|
||||
// Don't wrap error, callers are expected to handle EOF and wrap accordingly.
|
||||
return r.Err()
|
||||
}
|
||||
|
||||
// Go through all series in a segment updating the segmentNum, so we can delete older series.
|
||||
// Used with readCheckpoint - implements segmentReadFn.
|
||||
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
|
||||
var (
|
||||
dec = record.NewDecoder(labels.NewSymbolTable(), w.logger) // Needed for decoding; labels do not outlive this function.
|
||||
series []record.RefSeries
|
||||
)
|
||||
series := w.recordBuf.GetRefSeries(512)
|
||||
defer w.recordBuf.PutRefSeries(series)
|
||||
|
||||
dec := record.NewDecoder(labels.NewSymbolTable(), w.logger) // Needed for decoding; labels do not outlive this function.
|
||||
for r.Next() && !isClosed(w.quit) {
|
||||
rec := r.Record()
|
||||
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
|
||||
@@ -671,10 +700,9 @@ func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error
|
||||
// We're only interested in series.
|
||||
}
|
||||
}
|
||||
if err := r.Err(); err != nil {
|
||||
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
||||
}
|
||||
return nil
|
||||
// NOTE: r.Err == io.EOF is a common case when tailing.
|
||||
// Don't wrap error, callers are expected to handle EOF and wrap accordingly.
|
||||
return r.Err()
|
||||
}
|
||||
|
||||
func (w *Watcher) SetStartTime(t time.Time) {
|
||||
@@ -712,7 +740,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
|
||||
err = readFn(w, r, index, false)
|
||||
sr.Close()
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return fmt.Errorf("readSegment: %w", err)
|
||||
return fmt.Errorf("readSegment %d: %w", index, err)
|
||||
}
|
||||
|
||||
if r.Offset() != size {
|
||||
|
||||
@@ -253,7 +253,7 @@ func TestWatcher_Tail(t *testing.T) {
|
||||
|
||||
// Start watcher to that reads into a mock.
|
||||
wt := newWriteToMock(0)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true, nil)
|
||||
// Update the time because we just created samples around "now" time and watcher
|
||||
// only starts watching after that time.
|
||||
watcher.SetStartTime(now)
|
||||
@@ -386,7 +386,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wt := newWriteToMock(0)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
|
||||
go watcher.Start()
|
||||
|
||||
expected := seriesCount
|
||||
@@ -475,7 +475,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
overwriteReadTimeout(t, time.Second)
|
||||
wt := newWriteToMock(0)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
|
||||
go watcher.Start()
|
||||
|
||||
expected := seriesCount * 2
|
||||
@@ -547,7 +547,7 @@ func TestReadCheckpoint(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
wt := newWriteToMock(0)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
|
||||
go watcher.Start()
|
||||
|
||||
expectedSeries := seriesCount
|
||||
@@ -616,7 +616,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
|
||||
}
|
||||
|
||||
wt := newWriteToMock(0)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
|
||||
watcher.MaxSegment = -1
|
||||
|
||||
// Set the Watcher's metrics so they're not nil pointers.
|
||||
@@ -689,7 +689,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
|
||||
|
||||
overwriteReadTimeout(t, time.Second)
|
||||
wt := newWriteToMock(0)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
|
||||
watcher.MaxSegment = -1
|
||||
go watcher.Start()
|
||||
|
||||
@@ -769,7 +769,7 @@ func TestRun_StartupTime(t *testing.T) {
|
||||
require.NoError(t, w.Close())
|
||||
|
||||
wt := newWriteToMock(0)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
|
||||
watcher.MaxSegment = segments
|
||||
|
||||
watcher.SetMetrics()
|
||||
@@ -840,7 +840,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
|
||||
|
||||
// Set up the watcher and run it in the background.
|
||||
wt := newWriteToMock(time.Millisecond)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
|
||||
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false, nil)
|
||||
watcher.SetMetrics()
|
||||
watcher.MaxSegment = segmentsToRead
|
||||
|
||||
|
||||
Reference in New Issue
Block a user