mirror of
https://github.com/prometheus/prometheus
synced 2026-04-20 14:31:49 +08:00
Merge pull request #18314 from ridwanmsharif/scrape/fix-jitter
scrape: reset ticker to align target scrape times with offset and intervals
This commit is contained in:
@@ -1259,9 +1259,8 @@ func (sl *scrapeLoop) getScrapeOffset() time.Duration {
|
||||
|
||||
func (sl *scrapeLoop) run(errc chan<- error) {
|
||||
var (
|
||||
last time.Time
|
||||
alignedScrapeTime = time.Now().Round(0)
|
||||
ticker = time.NewTicker(sl.interval)
|
||||
last time.Time
|
||||
ticker = time.NewTicker(sl.interval)
|
||||
)
|
||||
defer func() {
|
||||
if sl.scrapeOnShutdown {
|
||||
@@ -1288,6 +1287,10 @@ func (sl *scrapeLoop) run(errc chan<- error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the ticker so target scrape times are aligned to the offset+intervals.
|
||||
ticker.Reset(sl.interval)
|
||||
alignedScrapeTime := time.Now().Round(0)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sl.ctx.Done():
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"log/slog"
|
||||
"maps"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
@@ -51,6 +52,7 @@ import (
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/goleak"
|
||||
"go.yaml.in/yaml/v2"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
@@ -69,6 +71,7 @@ import (
|
||||
"github.com/prometheus/prometheus/util/pool"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
"github.com/prometheus/prometheus/util/testutil/synctest"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
@@ -6856,3 +6859,100 @@ func TestScrapePoolSetScrapeFailureLoggerRace(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestScrapeOffsetDistribution(t *testing.T) {
|
||||
interval := 5 * time.Second
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
startTime := time.Now()
|
||||
|
||||
listener := newPipeListener()
|
||||
|
||||
var mu sync.Mutex
|
||||
scrapeTimes := make(map[string][]time.Duration)
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
default:
|
||||
mu.Lock()
|
||||
target := r.URL.Path
|
||||
scrapeTimes[target] = append(scrapeTimes[target], time.Since(startTime))
|
||||
mu.Unlock()
|
||||
|
||||
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
|
||||
fmt.Fprintln(w, "expected_metric 1")
|
||||
}
|
||||
})
|
||||
|
||||
srv := httptest.NewUnstartedServer(handler)
|
||||
srv.Listener = listener
|
||||
srv.Start()
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
app := teststorage.NewAppendable()
|
||||
opts := &Options{
|
||||
HTTPClientOptions: []config_util.HTTPClientOption{
|
||||
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
srvConn, cliConn := net.Pipe()
|
||||
select {
|
||||
case listener.conns <- srvConn:
|
||||
return cliConn, nil
|
||||
case <-listener.closed:
|
||||
return nil, net.ErrClosed
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}),
|
||||
},
|
||||
}
|
||||
scrapeManager, err := NewManager(opts, promslog.NewNopLogger(), nil, app, nil, prometheus.NewRegistry())
|
||||
scrapeManager.offsetSeed = 1 // Set a fixed offset seed for deterministic testing.
|
||||
require.NoError(t, err)
|
||||
|
||||
var targets []model.LabelSet
|
||||
for i := range 5 {
|
||||
targets = append(targets, model.LabelSet{
|
||||
model.SchemeLabel: "http",
|
||||
model.AddressLabel: model.LabelValue(fmt.Sprintf("target-%d.local", i)),
|
||||
model.MetricsPathLabel: model.LabelValue(fmt.Sprintf("/metrics/%d", i)),
|
||||
})
|
||||
}
|
||||
|
||||
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
|
||||
"test": {{Targets: targets}},
|
||||
})
|
||||
|
||||
cfg := &config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
ScrapeInterval: model.Duration(interval),
|
||||
ScrapeTimeout: model.Duration(interval),
|
||||
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
|
||||
},
|
||||
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
||||
}
|
||||
cfgText, err := yaml.Marshal(*cfg)
|
||||
require.NoError(t, err)
|
||||
cfg = loadConfiguration(t, string(cfgText))
|
||||
require.NoError(t, scrapeManager.ApplyConfig(cfg))
|
||||
|
||||
scrapeManager.reload()
|
||||
|
||||
numScrapes := 4
|
||||
time.Sleep((time.Duration(numScrapes) * interval) + time.Second)
|
||||
synctest.Wait()
|
||||
|
||||
scrapeManager.Stop()
|
||||
|
||||
for i := range numScrapes {
|
||||
uniqueTimes := make(map[time.Duration]struct{})
|
||||
for _, times := range scrapeTimes {
|
||||
if i < len(times) {
|
||||
uniqueTimes[times[i]] = struct{}{}
|
||||
}
|
||||
}
|
||||
require.Greater(t, len(uniqueTimes), 2, "Expected targets to be scraped at staggered offsets rather than simultaneously at scrape index %d", i)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user