diff --git a/scrape/scrape.go b/scrape/scrape.go index b68d2db1d9..9b37a356cf 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1259,9 +1259,8 @@ func (sl *scrapeLoop) getScrapeOffset() time.Duration { func (sl *scrapeLoop) run(errc chan<- error) { var ( - last time.Time - alignedScrapeTime = time.Now().Round(0) - ticker = time.NewTicker(sl.interval) + last time.Time + ticker = time.NewTicker(sl.interval) ) defer func() { if sl.scrapeOnShutdown { @@ -1288,6 +1287,10 @@ func (sl *scrapeLoop) run(errc chan<- error) { } } + // Reset the ticker so target scrape times are aligned to the offset+intervals. + ticker.Reset(sl.interval) + alignedScrapeTime := time.Now().Round(0) + for { select { case <-sl.ctx.Done(): diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 4657d99b9f..d5cd765a5a 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -24,6 +24,7 @@ import ( "log/slog" "maps" "math" + "net" "net/http" "net/http/httptest" "net/url" @@ -51,6 +52,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/atomic" "go.uber.org/goleak" + "go.yaml.in/yaml/v2" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" @@ -69,6 +71,7 @@ import ( "github.com/prometheus/prometheus/util/pool" "github.com/prometheus/prometheus/util/teststorage" "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func TestMain(m *testing.M) { @@ -6856,3 +6859,100 @@ func TestScrapePoolSetScrapeFailureLoggerRace(t *testing.T) { wg.Wait() } + +func TestScrapeOffsetDistribution(t *testing.T) { + interval := 5 * time.Second + + synctest.Test(t, func(t *testing.T) { + startTime := time.Now() + + listener := newPipeListener() + + var mu sync.Mutex + scrapeTimes := make(map[string][]time.Duration) + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-r.Context().Done(): + return + default: + mu.Lock() + target := r.URL.Path + scrapeTimes[target] = append(scrapeTimes[target], time.Since(startTime)) + mu.Unlock() + + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + fmt.Fprintln(w, "expected_metric 1") + } + }) + + srv := httptest.NewUnstartedServer(handler) + srv.Listener = listener + srv.Start() + t.Cleanup(srv.Close) + + app := teststorage.NewAppendable() + opts := &Options{ + HTTPClientOptions: []config_util.HTTPClientOption{ + config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) { + srvConn, cliConn := net.Pipe() + select { + case listener.conns <- srvConn: + return cliConn, nil + case <-listener.closed: + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + } + }), + }, + } + scrapeManager, err := NewManager(opts, promslog.NewNopLogger(), nil, app, nil, prometheus.NewRegistry()) + scrapeManager.offsetSeed = 1 // Set a fixed offset seed for deterministic testing. + require.NoError(t, err) + + var targets []model.LabelSet + for i := range 5 { + targets = append(targets, model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: model.LabelValue(fmt.Sprintf("target-%d.local", i)), + model.MetricsPathLabel: model.LabelValue(fmt.Sprintf("/metrics/%d", i)), + }) + } + + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": {{Targets: targets}}, + }) + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(interval), + ScrapeTimeout: model.Duration(interval), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + scrapeManager.reload() + + numScrapes := 4 + time.Sleep((time.Duration(numScrapes) * interval) + time.Second) + synctest.Wait() + + scrapeManager.Stop() + + for i := range numScrapes { + uniqueTimes := make(map[time.Duration]struct{}) + for _, times := range scrapeTimes { + if i < len(times) { + uniqueTimes[times[i]] = struct{}{} + } + } + require.Greater(t, len(uniqueTimes), 2, "Expected targets to be scraped at staggered offsets rather than simultaneously at scrape index %d", i) + } + }) +}