From caa250a29c0dbdf926f4988cae92fedd8771644e Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Tue, 17 Mar 2026 17:43:14 +0000 Subject: [PATCH 1/4] scrape: reset ticker to align target scrape times with offset and intervals Signed-off-by: Ridwan Sharif --- scrape/scrape.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scrape/scrape.go b/scrape/scrape.go index 55d0eaf70b..2866a7fa61 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1282,6 +1282,9 @@ func (sl *scrapeLoop) run(errc chan<- error) { } } + // Reset the ticker so target scrape times are aligned to the offset+intervals. + ticker.Reset(sl.interval) + for { select { case <-sl.ctx.Done(): From 695db71c68252646586ba1d90c6f35bb850cc8f2 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Tue, 17 Mar 2026 18:10:10 +0000 Subject: [PATCH 2/4] scrape: add test for distribution of scrapes Signed-off-by: Ridwan Sharif --- scrape/scrape_test.go | 106 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 63547869be..432230219b 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -24,6 +24,7 @@ import ( "log/slog" "maps" "math" + "net" "net/http" "net/http/httptest" "net/url" @@ -51,6 +52,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/atomic" "go.uber.org/goleak" + "go.yaml.in/yaml/v2" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -69,6 +71,7 @@ import ( "github.com/prometheus/prometheus/util/pool" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -6786,3 +6789,106 @@ func TestScrapePoolSetScrapeFailureLoggerRace(t *testing.T) { wg.Wait() } + +func TestScrapeOffsetDistribution(t *testing.T) { + interval := 5 * time.Second + + synctest.Test(t, func(t *testing.T) { + startTime := time.Now() + + listener := newPipeListener() + + var mu sync.Mutex + scrapeTimes := make(map[string][]time.Duration) + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + return + default: + mu.Lock() + target := r.URL.Path + scrapeTimes[target] = append(scrapeTimes[target], time.Since(startTime)) + mu.Unlock() + + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintln(w, "expected_metric 1") + } + }) + + srv := httptest.NewUnstartedServer(handler) + srv.Listener = listener + srv.Start() + t.Cleanup(srv.Close) + + app := teststorage.NewAppendable() + opts := &Options{ + HTTPClientOptions: []config_util.HTTPClientOption{ + config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { + srvConn, cliConn := net.Pipe() + select { + case listener.conns <- srvConn: + return cliConn, nil + case <-listener.closed: + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + } + }), + }, + } + scrapeManager, err := NewManager(opts, promslog.NewNopLogger(), nil, app, nil, prometheus.NewRegistry()) + require.NoError(t, err) + + var targets []model.LabelSet + for i := range 5 { + targets = append(targets, model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: model.LabelValue(fmt.Sprintf("target-%d.local", i)), + model.MetricsPathLabel: model.LabelValue(fmt.Sprintf("/metrics/%d", i)), + }) + } + + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{Targets: targets}}, + }) + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + scrapeManager.reload() + + time.Sleep(22 * time.Second) + synctest.Wait() + + scrapeManager.Stop() + + maxScrapes := 0 + for _, times := range scrapeTimes { + if len(times) > maxScrapes { + maxScrapes = len(times) + } + } + require.Positive(t, maxScrapes, "Expected at least one scrape") + + for i := 0; i < maxScrapes; i++ { + uniqueTimes := make(map[time.Duration]struct{}) + for _, times := range scrapeTimes { + if i < len(times) { + uniqueTimes[times[i]] = struct{}{} + } + } + require.Greater(t, len(uniqueTimes), 2, "Expected targets to be scraped at staggered offsets rather than simultaneously at scrape index %d", i) + } + }) +} From 8e8cd480cbe90da79d07c80ec81a4dd6adffc63f Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Tue, 17 Mar 2026 20:15:56 +0000 Subject: [PATCH 3/4] scrape: Introduce an `offsetSeed` option for deterministic scrape offset calculation and utilize it in tests Signed-off-by: Ridwan Sharif --- scrape/manager.go | 8 +++++++- scrape/scrape_test.go | 14 ++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/scrape/manager.go b/scrape/manager.go index e632b015d7..7a4a4463d9 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -149,8 +149,9 @@ type Options struct { // because of an early startup scrape. InitialScrapeOffset time.Duration - // private option for testability. + // private options for testability. skipJitterOffsetting bool + offsetSeed uint64 } // Manager maintains a set of scrape pools and manages start/stop cycles @@ -269,6 +270,11 @@ func (m *Manager) reload() { // setOffsetSeed calculates a global offsetSeed per server relying on extra label set. func (m *Manager) setOffsetSeed(labels labels.Labels) error { + if m.opts.offsetSeed != 0 { + m.offsetSeed = m.opts.offsetSeed + return nil + } + h := fnv.New64a() hostname, err := osutil.GetFQDN() if err != nil { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 432230219b..9cb3adff45 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -6823,6 +6823,7 @@ func TestScrapeOffsetDistribution(t *testing.T) { app := teststorage.NewAppendable() opts := &Options{ + offsetSeed: 1, HTTPClientOptions: []config_util.HTTPClientOption{ config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { srvConn, cliConn := net.Pipe() @@ -6868,20 +6869,13 @@ func TestScrapeOffsetDistribution(t *testing.T) { scrapeManager.reload() - time.Sleep(22 * time.Second) + numScrapes := 4 + time.Sleep((time.Duration(numScrapes) * interval) + time.Second) synctest.Wait() scrapeManager.Stop() - maxScrapes := 0 - for _, times := range scrapeTimes { - if len(times) > maxScrapes { - maxScrapes = len(times) - } - } - require.Positive(t, maxScrapes, "Expected at least one scrape") - - for i := 0; i < maxScrapes; i++ { + for i := range numScrapes { uniqueTimes := make(map[time.Duration]struct{}) for _, times := range scrapeTimes { if i < len(times) { From 101ae7338030e5ee2e6b78fc60ebf4916e211f2b Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Fri, 20 Mar 2026 05:58:18 +0000 Subject: [PATCH 4/4] scrape: address comments on PR Signed-off-by: Ridwan Sharif --- scrape/manager.go | 8 +------- scrape/scrape.go | 6 +++--- scrape/scrape_test.go | 2 +- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/scrape/manager.go b/scrape/manager.go index 7a4a4463d9..e632b015d7 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -149,9 +149,8 @@ type Options struct { // because of an early startup scrape. InitialScrapeOffset time.Duration - // private options for testability. + // private option for testability. skipJitterOffsetting bool - offsetSeed uint64 } // Manager maintains a set of scrape pools and manages start/stop cycles @@ -270,11 +269,6 @@ func (m *Manager) reload() { // setOffsetSeed calculates a global offsetSeed per server relying on extra label set. func (m *Manager) setOffsetSeed(labels labels.Labels) error { - if m.opts.offsetSeed != 0 { - m.offsetSeed = m.opts.offsetSeed - return nil - } - h := fnv.New64a() hostname, err := osutil.GetFQDN() if err != nil { diff --git a/scrape/scrape.go b/scrape/scrape.go index 2866a7fa61..a0c5da10d6 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1253,9 +1253,8 @@ func (sl *scrapeLoop) getScrapeOffset() time.Duration { func (sl *scrapeLoop) run(errc chan<- error) { var ( - last time.Time - alignedScrapeTime = time.Now().Round(0) - ticker = time.NewTicker(sl.interval) + last time.Time + ticker = time.NewTicker(sl.interval) ) defer func() { if sl.scrapeOnShutdown { @@ -1284,6 +1283,7 @@ func (sl *scrapeLoop) run(errc chan<- error) { // Reset the ticker so target scrape times are aligned to the offset+intervals. ticker.Reset(sl.interval) + alignedScrapeTime := time.Now().Round(0) for { select { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 9cb3adff45..37807cea98 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -6823,7 +6823,6 @@ func TestScrapeOffsetDistribution(t *testing.T) { app := teststorage.NewAppendable() opts := &Options{ - offsetSeed: 1, HTTPClientOptions: []config_util.HTTPClientOption{ config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { srvConn, cliConn := net.Pipe() @@ -6839,6 +6838,7 @@ func TestScrapeOffsetDistribution(t *testing.T) { }, } scrapeManager, err := NewManager(opts, promslog.NewNopLogger(), nil, app, nil, prometheus.NewRegistry()) + scrapeManager.offsetSeed = 1 // Set a fixed offset seed for deterministic testing. require.NoError(t, err) var targets []model.LabelSet