discovery, scrape: Use backoff interval for throttling discovery updates; add DiscoveryReloadOnStartup option for short-lived environments (#18187)

* Adding scape on shutdown

Signed-off-by: avilevy <avilevy@google.com>

* scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely

Signed-off-by: avilevy <avilevy@google.com>

* renamed calculateScrapeOffset to getScrapeOffset

Signed-off-by: avilevy <avilevy@google.com>

* discovery: Add skipStartupWait to bypass initial discovery delay

In short-lived environments like agent mode or serverless, the
Prometheus process may only execute for a few seconds. Waiting for
the default 5-second `updatert` ticker before sending the first
target groups means the process could terminate before collecting
any metrics at all.

This commit adds a `skipStartupWait` option to the Discovery Manager
to bypass this initial delay. When enabled, the sender uses an
unthrottled startup loop that instantly forwards all triggers. This
ensures both the initial empty update from `ApplyConfig` and the
first real targets from discoverers are passed downstream immediately.

After the first ticker interval elapses, the sender cleanly breaks out
of the startup phase, resets the ticker, and resumes standard
operations.

Signed-off-by: avilevy <avilevy@google.com>

* scrape: Bypass initial reload delay for ScrapeOnShutdown

In short-lived environments like agent mode or serverless, the default
5-second `DiscoveryReloadInterval` can cause the process to terminate
before the scrape manager has a chance to process targets and collect
any metrics.

Because the discovery manager sends an initial empty update upon
configuration followed rapidly by the actual targets, simply waiting
for a single reload trigger is insufficient—the real targets would
still get trapped behind the ticker delay.

This commit introduces an unthrottled startup loop in the `reloader`
when `ScrapeOnShutdown` is enabled. It processes all incoming
`triggerReload` signals immediately during the first interval. Once
the initial tick fires, the `reloader` resets the ticker and falls
back into its standard throttled loop, ensuring short-lived processes
can discover and scrape targets instantly.

Signed-off-by: avilevy <avilevy@google.com>

* test(scrape): refactor time-based manager tests to use synctest

Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests.

Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic.

- Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble.
- Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable.
- Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest.
- Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven.

Signed-off-by: avilevy <avilevy@google.com>

* Clarify use cases in InitialScrapeOffset comment

Signed-off-by: avilevy <avilevy@google.com>

* test(scrape): use httptest for mock server to respect context cancellation

- Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`.
- Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports.
- Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts.
- Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios.
- Renamed `skipOffsetting` to `skipJitterOffsetting`.
- Addressed other PR comments.

Signed-off-by: avilevy <avilevy@google.com>

* tmp

Signed-off-by: bwplotka <bwplotka@gmail.com>

* exp2

Signed-off-by: bwplotka <bwplotka@gmail.com>

* fix

Signed-off-by: bwplotka <bwplotka@gmail.com>

* scrape: fix scrapeOnShutdown context bug and refactor test helpers
The scrapeOnShutdown feature was failing during manager shutdown because
the scrape pool context was being cancelled before the final shutdown
scrapes could execute. Fix this by delaying context cancellation
in scrapePool.stop() until after all scrape loops have stopped.
In addition:
- Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset.
- Refactored network test helper functions from manager_test.go to
  helpers_test.go.
- Addressed other comments.

Signed-off-by: avilevy <avilevy@google.com>

* Update scrape/scrape.go

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>

* feat(discovery): add SkipInitialWait to bypass initial startup delay

This adds a SkipInitialWait option to the discovery Manager, allowing consumers sensitive to startup latency to receive the first batch of discovered targets immediately instead of waiting for the updatert ticker.

To support this without breaking the immediate dropped target notifications introduced in #13147, ApplyConfig now uses a keep flag to only trigger immediate downstream syncs for obsolete or updated providers. This prevents sending premature empty target groups for brand-new providers on initial startup.

Additionally, the scrape manager's reloader loop is updated to process the initial triggerReload immediately, ensuring the end-to-end pipeline processes initial targets without artificial delays.

Signed-off-by: avilevy <avilevy@google.com>

* scrape: Add TestManagerReloader and refactor discovery triggerSync

Adds a new TestManagerReloader test suite using synctest to assert
behavior of target updates, discovery reload ticker intervals, and
ScrapeOnShutdown flags.

Updates setupSynctestManager to allow skipping initial config setup by
passing an interval of 0.

Also renames the 'keep' variable to 'triggerSync' in ApplyConfig inside
discovery/manager.go for clarity, and adds a descriptive comment.

Signed-off-by: avilevy <avilevy@google.com>

* feat(discovery,scrape): rename startup wait options and add DiscoveryReloadOnStartup

- discovery: Rename `SkipInitialWait` to `SkipStartupWait` for clarity.
- discovery: Pass `context.Context` to `flushUpdates` to handle cancellation and avoid leaks.
- scrape: Add `DiscoveryReloadOnStartup` to `Options` to decouple startup discovery from `ScrapeOnShutdown`.
- tests: Refactor `TestTargetSetTargetGroupsPresentOnStartup` and `TestManagerReloader` to use table-driven tests and `synctest` for better stability and coverage.

Signed-off-by: avilevy <avilevy@google.com>

* feat(discovery,scrape): importing changes proposed in 043d710

- Refactor sender to use exponential backoff
- Replaces `time.NewTicker` in `sender()` with an exponential backoff
  to prevent panics on non-positive intervals and better throttle updates.
- Removes obsolete `skipStartupWait` logic.
- Refactors `setupSynctestManager` to use an explicit `initConfig` argument

Signed-off-by: avilevy <avilevy@google.com>

* fix: updating go mod

Signed-off-by: avilevy <avilevy@google.com>

* fixing merge

Signed-off-by: avilevy <avilevy@google.com>

* fixing issue: 2 variables but NewTestMetrics returns 1 value

Signed-off-by: avilevy <avilevy@google.com>

* Update discovery/manager.go

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>

* Refactor setupSynctestManager initConfig into a separate function

Signed-off-by: avilevy <avilevy@google.com>

---------

Signed-off-by: avilevy <avilevy@google.com>
Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
Co-authored-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
avilevy18
2026-04-03 06:01:49 -04:00
committed by GitHub
parent 622d6b33f4
commit eb220862e5
6 changed files with 277 additions and 15 deletions

View File

@@ -22,6 +22,7 @@ import (
"sync"
"time"
"github.com/cenkalti/backoff/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/promslog"
@@ -101,7 +102,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.
targets: make(map[poolKey]map[string]*targetgroup.Group),
ctx: ctx,
updatert: 5 * time.Second,
triggerSend: make(chan struct{}, 1),
triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read.
registerer: registerer,
sdMetrics: sdMetrics,
}
@@ -408,24 +409,34 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
}
func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer func() {
ticker.Stop()
close(m.syncCh)
}()
// Some discoverers send updates too often, so we throttle these with a backoff interval that
// increases the interval up to m.updatert delay.
lastSent := time.Now().Add(-1 * m.updatert)
b := &backoff.ExponentialBackOff{
InitialInterval: 100 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: m.updatert,
}
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
case <-time.After(b.NextBackOff()):
select {
case <-m.triggerSend:
m.metrics.SentUpdates.Inc()
select {
case m.syncCh <- m.allGroups():
lastSent = time.Now()
default:
m.metrics.DelayedUpdates.Inc()
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
// Ensure we don't miss this update.
select {
case m.triggerSend <- struct{}{}:
default:
@@ -433,6 +444,9 @@ func (m *Manager) sender() {
}
default:
}
if time.Since(lastSent) > m.updatert {
b.Reset() // Nothing happened for a while, start again from low interval for prompt updates.
}
}
}
}

View File

@@ -21,6 +21,7 @@ import (
"strconv"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -785,6 +786,84 @@ func pk(provider, setName string, n int) poolKey {
}
}
func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
testCases := []struct {
name string
updatert time.Duration
readTimeout time.Duration
expectedTargets int
}{
{
name: "startup wait with long interval times out",
updatert: 100 * time.Hour,
readTimeout: 10 * time.Millisecond,
},
{
name: "startup wait with short interval succeeds",
updatert: 10 * time.Millisecond,
readTimeout: 300 * time.Millisecond,
expectedTargets: 1,
},
{
name: "skip startup wait",
updatert: 100 * time.Hour,
readTimeout: 300 * time.Millisecond,
expectedTargets: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
sdMetrics := NewTestMetrics(t, reg)
opts := make([]func(*Manager), 0)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, opts...)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = tc.updatert
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)
synctest.Wait()
timeout := time.After(tc.readTimeout)
var lastSyncedTargets map[string][]*targetgroup.Group
testFor:
for {
select {
case <-timeout:
break testFor
case lastSyncedTargets = <-discoveryManager.SyncCh():
}
}
if tc.expectedTargets == 0 {
require.Nil(t, lastSyncedTargets)
return
}
require.Len(t, lastSyncedTargets, 1)
require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets)
verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Len(t, discoveryManager.targets, 1)
})
})
}
}
func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) {
ctx := t.Context()

2
go.mod
View File

@@ -156,7 +156,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cenkalti/backoff/v5 v5.0.3
github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect

View File

@@ -309,7 +309,7 @@ func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) {
// setupSynctestManager abstracts the boilerplate of creating a mock network,
// starting the fake HTTP server, and configuring the scrape manager for synctest.
func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) {
func setupSynctestManager(t *testing.T, opts *Options) (*Manager, *teststorage.Appendable, func()) {
t.Helper()
app := teststorage.NewAppendable()
@@ -345,6 +345,12 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (
)
require.NoError(t, err)
return scrapeManager, app, cleanup
}
func applyDefaultSynctestConfig(t *testing.T, scrapeManager *Manager, interval time.Duration) {
t.Helper()
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(interval),
@@ -368,6 +374,4 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (
})
scrapeManager.reload()
return scrapeManager, app, cleanup
}

View File

@@ -151,6 +151,13 @@ type Options struct {
// NOTE: This final scrape ignores the configured scrape interval.
ScrapeOnShutdown bool
// DiscoveryReloadOnStartup enables discovering targets immediately on start up as opposed
// to waiting for the interval defined in DiscoveryReloadInterval before
// initializing the scrape pools. Disabled by default. Useful for serverless
// flavors of OpenTelemetry contrib's prometheusreceiver where we're
// sensitive to start up delays.
DiscoveryReloadOnStartup bool
// InitialScrapeOffset applies an additional baseline delay before we begin
// scraping targets. By default, Prometheus calculates a specific offset for
// each target to spread the scraping load evenly across the server. Configuring
@@ -225,15 +232,24 @@ func (m *Manager) UnregisterMetrics() {
}
func (m *Manager) reloader() {
reloadIntervalDuration := m.opts.DiscoveryReloadInterval
if reloadIntervalDuration == model.Duration(0) {
reloadIntervalDuration = model.Duration(5 * time.Second)
reloadIntervalDuration := time.Duration(m.opts.DiscoveryReloadInterval)
if reloadIntervalDuration == 0 {
reloadIntervalDuration = 5 * time.Second
}
ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
ticker := time.NewTicker(reloadIntervalDuration)
defer ticker.Stop()
if m.opts.DiscoveryReloadOnStartup {
select {
case <-m.graceShut:
return
case <-m.triggerReload:
m.reload()
}
ticker.Reset(reloadIntervalDuration)
}
for {
select {
case <-m.graceShut:

View File

@@ -1633,9 +1633,11 @@ func TestManager_InitialScrapeOffset(t *testing.T) {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts)
defer cleanupConns()
applyDefaultSynctestConfig(t, scrapeManager, interval)
// Wait for the scrape manager to block on its timers.
synctest.Wait()
@@ -1699,9 +1701,11 @@ func TestManager_ScrapeOnShutdown(t *testing.T) {
ScrapeOnShutdown: tcase.scrapeOnShutdown,
InitialScrapeOffset: tcase.initialScrapeOffset,
}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval)
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts)
defer cleanupConns()
applyDefaultSynctestConfig(t, scrapeManager, interval)
// Wait for the initial scrape to happen exactly at t=0.
synctest.Wait()
@@ -1719,3 +1723,148 @@ func TestManager_ScrapeOnShutdown(t *testing.T) {
})
}
}
func TestManagerReloader(t *testing.T) {
for _, tcase := range []struct {
name string
discoveryReloadOnStartup bool
scrapeOnShutdown bool
discoveryReloadInterval time.Duration
updateTarget bool
runDuration time.Duration
expectedSamplesTotal int
}{
{
name: "no startup reload default interval",
runDuration: 6 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no startup reload short interval",
discoveryReloadInterval: 1 * time.Second,
runDuration: 1 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no startup reload default interval with target update",
updateTarget: true,
runDuration: 12 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no startup reload short interval after ticker",
discoveryReloadInterval: 1 * time.Second,
updateTarget: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no startup reload default interval after ticker with update",
updateTarget: true,
runDuration: 6 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "startup reload",
discoveryReloadOnStartup: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "startup reload with target update",
discoveryReloadOnStartup: true,
updateTarget: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "startup reload with update after ticker",
discoveryReloadOnStartup: true,
runDuration: 6 * time.Second,
expectedSamplesTotal: 1,
},
{
name: "no startup reload with scrape on shutdown after reload",
scrapeOnShutdown: true,
runDuration: 6 * time.Second,
expectedSamplesTotal: 2,
},
{
name: "stop before no startup reload",
scrapeOnShutdown: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 0,
},
{
name: "startup reload and scrape on shutdown",
discoveryReloadOnStartup: true,
scrapeOnShutdown: true,
runDuration: 2 * time.Second,
expectedSamplesTotal: 2,
},
} {
t.Run(tcase.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
opts := &Options{
DiscoveryReloadOnStartup: tcase.discoveryReloadOnStartup,
ScrapeOnShutdown: tcase.scrapeOnShutdown,
DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval),
}
scrapeManager, app, cleanupConns := setupSynctestManager(t, opts)
defer cleanupConns()
cfg := &config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(10 * time.Second),
ScrapeTimeout: model.Duration(10 * time.Second),
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}
cfgText, err := yaml.Marshal(*cfg)
require.NoError(t, err)
cfg = loadConfiguration(t, string(cfgText))
require.NoError(t, scrapeManager.ApplyConfig(cfg))
tsetsCh := make(chan map[string][]*targetgroup.Group)
go scrapeManager.Run(tsetsCh)
// Send initial target to trigger the first reload via the normal flow.
initialTargetLabels := model.LabelSet{
model.SchemeLabel: "http",
model.AddressLabel: "test.local",
}
tsetsCh <- map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{initialTargetLabels},
}},
}
synctest.Wait() // Wait for Run to process tsetsCh and for reloader to trigger reload.
if tcase.updateTarget {
newTargetLabels := model.LabelSet{
model.SchemeLabel: "http",
model.AddressLabel: "test-updated.local",
}
tsetsCh <- map[string][]*targetgroup.Group{
"test": {{
Source: "test",
Targets: []model.LabelSet{newTargetLabels},
}},
}
synctest.Wait() // Wait for Run to process tsetsCh.
}
if tcase.runDuration > 0 {
time.Sleep(tcase.runDuration)
synctest.Wait()
}
scrapeManager.Stop()
synctest.Wait()
require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal)
})
})
}
}