Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka
2026-03-06 10:11:25 +00:00
parent f8cc73c80c
commit 319fb81011
2 changed files with 69 additions and 94 deletions

View File

@@ -17,6 +17,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/util/zeropool"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
)
@@ -43,6 +45,8 @@ type fakeRW2Receiver struct {
receivedExpectedSamples int
receivedRequests int
done chan struct{}
reqPool zeropool.Pool[writev2.Request]
}
func newFakeRW2Receiver() *fakeRW2Receiver {
@@ -63,8 +67,14 @@ func (r *fakeRW2Receiver) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
rwReq := r.reqPool.Get()
defer func() {
newRWReq := writev2.Request{}
newRWReq.Symbols = rwReq.Symbols[:0]
newRWReq.Timeseries = rwReq.Timeseries[:0]
r.reqPool.Put(newRWReq)
}()
var rwReq writev2.Request
if err := rwReq.Unmarshal(decoded); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
@@ -117,8 +127,8 @@ func (r *fakeRW2Receiver) wait(b *testing.B, timeout time.Duration) {
select {
case <-b.Context().Done():
case <-done:
b.ReportMetric(float64(r.receivedRequests), "recv-requests")
b.ReportMetric(float64(r.receivedExpectedSamples), "recv-samples")
b.ReportMetric(float64(r.receivedRequests), "recv_requests/op")
b.ReportMetric(float64(r.receivedExpectedSamples), "recv_samples/op")
case <-time.After(timeout):
r.mu.Lock()
close(done)
@@ -140,7 +150,12 @@ func (r *fakeRW2Receiver) wait(b *testing.B, timeout time.Duration) {
/*
export bench=e2erw && go test ./cmd/prometheus/... \
-run '^$' -bench '^BenchmarkE2EScrapeAndRemoteWriteNoChurn' \
-benchtime 50x -count 7 -cpu 2 -timeout 999m -benchmem \
-benchtime 20x -count 6 -cpu 4 -timeout 999m -benchmem \
| tee ${bench}.txt
export bench=e2erwp && go test ./cmd/prometheus/... \
-run '^$' -bench '^BenchmarkE2EScrapeAndRemoteWriteNoChurn$' \
-benchtime 30x -count 1 -cpu 2 -timeout 999m -memprofile=${bench}.mem.pprof \
| tee ${bench}.txt
*/
func BenchmarkE2EScrapeAndRemoteWriteNoChurn(b *testing.B) {
@@ -254,9 +269,9 @@ scrape_configs:
// Wait until RW2 endpoint receives all metrics.
rw.wait(b, 2*time.Minute)
toReport := map[string]map[string]float64{
"prometheus_wal_watcher_reads_total": {},
"prometheus_wal_watcher_notifications_total": {},
toReport := map[string]float64{
"prometheus_wal_watcher_reads_total": 0,
"prometheus_wal_watcher_notifications_total": 0,
}
b.ReportAllocs()
b.ResetTimer()
@@ -273,62 +288,25 @@ scrape_configs:
}
}
reads: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "reads_total",
Help: "Number of WAL reads attempted; triggered by notifications, timeouts or segment changes.",
},
),
notifications: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "notifications_total",
Help: "The number of WAL write notifications that the Watcher received in total; skipped or not.",
},
),
func (w *Watcher) Notify() {
w.notifications.Inc()
select {
case w.readNotify <- struct{}{}:
default:
// We don't need a buffered channel or any buffering since
// for each notification it recv's the watcher will read until EOF.
w.notificationsSkipped.Inc()
}
}
func reportMetrics(b *testing.B, g prometheus.Gatherer, counters map[string]map[string]float64) {
func reportMetrics(b *testing.B, g prometheus.Gatherer, counters map[string]float64) {
got, err := g.Gather()
require.NoError(b, err)
ref := strings.Builder{}
for _, m := range got {
mf, ok := counters[m.GetName()]
v, ok := counters[m.GetName()]
if !ok {
continue
}
for _, metric := range m.Metric {
ref.Reset()
ref.WriteString(m.GetName())
ref.WriteString("{")
for _, label := range metric.GetLabel() {
ref.WriteString(label.GetName())
ref.WriteString(label.GetValue())
ref.WriteString(",")
}
ref.WriteString("}/op")
refStr := ref.String()
v := mf[refStr]
delta := metric.GetCounter().GetValue() - v
counters[m.GetName()][refStr] = metric.GetCounter().GetValue()
b.ReportMetric(delta, ref.String())
reportName := strings.TrimPrefix(m.GetName(), "prometheus_")
if len(m.Metric) == 0 {
b.ReportMetric(0, reportName)
continue
}
if len(m.Metric) != 1 {
b.Fatalf("expected 1 metric, got %d", len(m.Metric))
}
delta := m.Metric[0].GetCounter().GetValue() - v
counters[m.GetName()] = m.Metric[0].GetCounter().GetValue()
b.ReportMetric(delta, reportName)
}
}

View File

@@ -463,7 +463,7 @@ func TestGrowth(t *testing.T) {
fmt.Println(float64(sizes[3]-sizes[2]) / 5.0)
}
func syntheticSegment(t testing.TB, appendCase testwal.RecordsCase, appends int, compress compression.Type) (string, []int) {
func syntheticSegment(t testing.TB, appendCase testwal.RecordsCase, appends int, compress compression.Type) (string, []int64) {
var (
now = time.Now()
dir = t.TempDir()
@@ -483,12 +483,12 @@ func syntheticSegment(t testing.TB, appendCase testwal.RecordsCase, appends int,
_ = w.Close()
})
offsets := make([]int, 0, appends)
offsets := make([]int64, 0, appends)
for range appends {
_ = PopulateTest(t, w, appendRecords, nil)
_, off, err := w.LastSegmentAndOffset()
require.NoError(t, err)
offsets = append(offsets, off)
offsets = append(offsets, int64(off))
}
require.NoError(t, w.Close())
@@ -505,46 +505,34 @@ func syntheticSegment(t testing.TB, appendCase testwal.RecordsCase, appends int,
-run '^$' -bench '^BenchmarkWatcher_ReadSegment' \
-benchtime 10x -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
export bench=watcherReadp1 && go test ./tsdb/wlog/... \
-run '^$' -bench '^BenchmarkWatcher_ReadSegment/case=pr18062/compression=snappy$' \
-benchtime 30x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \
| tee ${bench}.txt
export bench=watcherReadp2 && go test ./tsdb/wlog/... \
-run '^$' -bench '^BenchmarkWatcher_ReadSegment/case=main18062/compression=snappy$' \
-benchtime 30x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \
| tee ${bench}.txt
*/
func BenchmarkWatcher_ReadSegment(b *testing.B) {
synthSeg, synthOffsets := syntheticSegment(b, testwal.RecordsCase{
Name: "1000samples",
// The exact shape is wrong.
Series: 100, // 100 series, 100 metadata
SamplesPerSeries: 2, // 200 samples overall
HistogramsPerSeries: 2, // 200 histograms
ExemplarsPerSeries: 1, // 100 exemplars
}, 1000, compression.Snappy)
offsetsBuf := make([]int64, 0, 60e3)
for _, tc := range []struct {
name string
segmentPath string
offsets []int // If empty, a single record is a single offset.
name string
segmentAndOffsetFn func(b *testing.B) (string, []int64)
}{
{
name: "data=pr18062/compression=snappy",
segmentPath: prombenchSegment,
name: "data=pr18062/compression=snappy",
segmentAndOffsetFn: func(b *testing.B) (string, []int64) { return prombenchSegment, nil },
},
{
name: "data=main18062/compression=snappy",
segmentPath: prombenchSegment2,
name: "data=main18062/compression=snappy",
segmentAndOffsetFn: func(b *testing.B) (string, []int64) { return prombenchSegment2, nil },
},
{
name: "data=synth5Rec/compression=snappy",
segmentPath: synthSeg,
offsets: synthOffsets,
name: "data=synth5Rec/compression=snappy",
segmentAndOffsetFn: func(b *testing.B) (string, []int64) {
return syntheticSegment(b, testwal.RecordsCase{
Name: "1000samples",
// The exact shape is wrong, but it's fine, it's better to rely on exact shape
// from prod data.
Series: 100, // 100 series, 100 metadata
SamplesPerSeries: 2, // 200 samples overall
HistogramsPerSeries: 2, // 200 histograms
ExemplarsPerSeries: 1, // 100 exemplars
}, 10e3, compression.Snappy)
},
},
} {
b.Run(tc.name, func(b *testing.B) {
@@ -557,7 +545,8 @@ func BenchmarkWatcher_ReadSegment(b *testing.B) {
// only starts reading data after that time.
watcher.SetStartTime(timestamp.Time(math.MinInt64))
segment, err := OpenReadSegment(tc.segmentPath)
segmentPath, _ := tc.segmentAndOffsetFn(b)
segment, err := OpenReadSegment(segmentPath)
require.NoError(b, err)
b.Cleanup(func() {
_ = segment.Close()
@@ -578,6 +567,7 @@ func BenchmarkWatcher_ReadSegment(b *testing.B) {
b.StartTimer()
require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64)))
b.ReportMetric(1.0, "reads/op")
b.ReportMetric(float64(segReader.Offset()), "readBytes/op")
b.ReportMetric(float64(wt.sampleAppends), "sampleAppends/op")
}
@@ -591,7 +581,12 @@ func BenchmarkWatcher_ReadSegment(b *testing.B) {
// only starts reading data after that time.
watcher.SetStartTime(timestamp.Time(math.MinInt64))
segment, err := OpenReadSegment(tc.segmentPath)
segmentPath, offsets := tc.segmentAndOffsetFn(b)
if len(offsets) > 0 {
offsetsBuf = offsets
}
segment, err := OpenReadSegment(segmentPath)
require.NoError(b, err)
b.Cleanup(func() {
_ = segment.Close()
@@ -600,9 +595,10 @@ func BenchmarkWatcher_ReadSegment(b *testing.B) {
segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment)
// As per https://docs.google.com/document/d/1efVAMcEw7-R_KatHHcobcFBlNsre-DoThVHI8AO2SDQ/edit?tab=t.0
// We could assume in a worst case watcher will tail segment scrape by scrape (appender commit Notifies watcher).
if len(tc.offsets) == 0 {
// Find offsets of all records (samples for now as we know input has 100% samples).
// We should expect read on every second concurrent append. This case assumes every commit which will
// present possible, worse case scenario, good enough for optimizations needs.
if segmentPath == prombenchSegment || segmentPath == prombenchSegment2 {
// Find offsets of all records (samples for now as we know for the known prombench input we have 100% samples).
// Pack all offsets to read and we will iterate through all. It's naive, but effective (~50k elems).
offsetsBuf = offsetsBuf[:0]
for segReader.Next() {
@@ -629,6 +625,7 @@ func BenchmarkWatcher_ReadSegment(b *testing.B) {
limitReader.N = offsetsBuf[i]
require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64)))
}
b.ReportMetric(float64(len(offsetsBuf)), "reads/op")
b.ReportMetric(float64(segReader.Offset()), "readBytes/op")
b.ReportMetric(float64(wt.sampleAppends), "sampleAppends/op")
}