diff --git a/discovery/manager.go b/discovery/manager.go index 8318ff5bd6..fa52e164f7 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v5" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/promslog" @@ -101,7 +102,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus. targets: make(map[poolKey]map[string]*targetgroup.Group), ctx: ctx, updatert: 5 * time.Second, - triggerSend: make(chan struct{}, 1), + triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read. registerer: registerer, sdMetrics: sdMetrics, } @@ -408,24 +409,34 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ } func (m *Manager) sender() { - ticker := time.NewTicker(m.updatert) defer func() { - ticker.Stop() close(m.syncCh) }() + // Some discoverers send updates too often, so we throttle these with a backoff interval that + // increases the interval up to m.updatert delay. + lastSent := time.Now().Add(-1 * m.updatert) + b := &backoff.ExponentialBackOff{ + InitialInterval: 100 * time.Millisecond, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: m.updatert, + } + for { select { case <-m.ctx.Done(): return - case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. + case <-time.After(b.NextBackOff()): select { case <-m.triggerSend: m.metrics.SentUpdates.Inc() select { case m.syncCh <- m.allGroups(): + lastSent = time.Now() default: m.metrics.DelayedUpdates.Inc() m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle") + // Ensure we don't miss this update. select { case m.triggerSend <- struct{}{}: default: @@ -433,6 +444,9 @@ func (m *Manager) sender() { } default: } + if time.Since(lastSent) > m.updatert { + b.Reset() // Nothing happened for a while, start again from low interval for prompt updates. + } } } } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 2e8e4558f6..b98dc1dd35 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -21,6 +21,7 @@ import ( "strconv" "sync" "testing" + "testing/synctest" "time" "github.com/prometheus/client_golang/prometheus" @@ -785,6 +786,84 @@ func pk(provider, setName string, n int) poolKey { } } +func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { + testCases := []struct { + name string + updatert time.Duration + readTimeout time.Duration + expectedTargets int + }{ + { + name: "startup wait with long interval times out", + updatert: 100 * time.Hour, + readTimeout: 10 * time.Millisecond, + }, + { + name: "startup wait with short interval succeeds", + updatert: 10 * time.Millisecond, + readTimeout: 300 * time.Millisecond, + expectedTargets: 1, + }, + { + name: "skip startup wait", + updatert: 100 * time.Hour, + readTimeout: 300 * time.Millisecond, + expectedTargets: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + reg := prometheus.NewRegistry() + sdMetrics := NewTestMetrics(t, reg) + + opts := make([]func(*Manager), 0) + discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, opts...) + require.NotNil(t, discoveryManager) + + discoveryManager.updatert = tc.updatert + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + synctest.Wait() + + timeout := time.After(tc.readTimeout) + var lastSyncedTargets map[string][]*targetgroup.Group + testFor: + for { + select { + case <-timeout: + break testFor + case lastSyncedTargets = <-discoveryManager.SyncCh(): + } + } + + if tc.expectedTargets == 0 { + require.Nil(t, lastSyncedTargets) + return + } + + require.Len(t, lastSyncedTargets, 1) + require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets) + verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Len(t, discoveryManager.targets, 1) + }) + }) + } +} + func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { ctx := t.Context() diff --git a/go.mod b/go.mod index 4df021e21d..4080c5be26 100644 --- a/go.mod +++ b/go.mod @@ -156,7 +156,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 // indirect github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index 5119a4c66b..8d3e8f74e7 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -309,7 +309,7 @@ func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) { // setupSynctestManager abstracts the boilerplate of creating a mock network, // starting the fake HTTP server, and configuring the scrape manager for synctest. -func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) (*Manager, *teststorage.Appendable, func()) { +func setupSynctestManager(t *testing.T, opts *Options) (*Manager, *teststorage.Appendable, func()) { t.Helper() app := teststorage.NewAppendable() @@ -345,6 +345,12 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) ( ) require.NoError(t, err) + return scrapeManager, app, cleanup +} + +func applyDefaultSynctestConfig(t *testing.T, scrapeManager *Manager, interval time.Duration) { + t.Helper() + cfg := &config.Config{ GlobalConfig: config.GlobalConfig{ ScrapeInterval: model.Duration(interval), @@ -368,6 +374,4 @@ func setupSynctestManager(t *testing.T, opts *Options, interval time.Duration) ( }) scrapeManager.reload() - - return scrapeManager, app, cleanup } diff --git a/scrape/manager.go b/scrape/manager.go index 41cfab95e9..fd5cf4460e 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -151,6 +151,13 @@ type Options struct { // NOTE: This final scrape ignores the configured scrape interval. ScrapeOnShutdown bool + // DiscoveryReloadOnStartup enables discovering targets immediately on start up as opposed + // to waiting for the interval defined in DiscoveryReloadInterval before + // initializing the scrape pools. Disabled by default. Useful for serverless + // flavors of OpenTelemetry contrib's prometheusreceiver where we're + // sensitive to start up delays. + DiscoveryReloadOnStartup bool + // InitialScrapeOffset applies an additional baseline delay before we begin // scraping targets. By default, Prometheus calculates a specific offset for // each target to spread the scraping load evenly across the server. Configuring @@ -225,15 +232,24 @@ func (m *Manager) UnregisterMetrics() { } func (m *Manager) reloader() { - reloadIntervalDuration := m.opts.DiscoveryReloadInterval - if reloadIntervalDuration == model.Duration(0) { - reloadIntervalDuration = model.Duration(5 * time.Second) + reloadIntervalDuration := time.Duration(m.opts.DiscoveryReloadInterval) + if reloadIntervalDuration == 0 { + reloadIntervalDuration = 5 * time.Second } - ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) - + ticker := time.NewTicker(reloadIntervalDuration) defer ticker.Stop() + if m.opts.DiscoveryReloadOnStartup { + select { + case <-m.graceShut: + return + case <-m.triggerReload: + m.reload() + } + ticker.Reset(reloadIntervalDuration) + } + for { select { case <-m.graceShut: diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 1fc5b355d6..d61f0dfe62 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -1633,9 +1633,11 @@ func TestManager_InitialScrapeOffset(t *testing.T) { t.Run(tcase.name, func(t *testing.T) { synctest.Test(t, func(t *testing.T) { opts := &Options{InitialScrapeOffset: tcase.initialScrapeOffset} - scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts) defer cleanupConns() + applyDefaultSynctestConfig(t, scrapeManager, interval) + // Wait for the scrape manager to block on its timers. synctest.Wait() @@ -1699,9 +1701,11 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { ScrapeOnShutdown: tcase.scrapeOnShutdown, InitialScrapeOffset: tcase.initialScrapeOffset, } - scrapeManager, app, cleanupConns := setupSynctestManager(t, opts, interval) + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts) defer cleanupConns() + applyDefaultSynctestConfig(t, scrapeManager, interval) + // Wait for the initial scrape to happen exactly at t=0. synctest.Wait() @@ -1719,3 +1723,148 @@ func TestManager_ScrapeOnShutdown(t *testing.T) { }) } } + +func TestManagerReloader(t *testing.T) { + for _, tcase := range []struct { + name string + discoveryReloadOnStartup bool + scrapeOnShutdown bool + discoveryReloadInterval time.Duration + updateTarget bool + runDuration time.Duration + expectedSamplesTotal int + }{ + { + name: "no startup reload default interval", + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload short interval", + discoveryReloadInterval: 1 * time.Second, + runDuration: 1 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload default interval with target update", + updateTarget: true, + runDuration: 12 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload short interval after ticker", + discoveryReloadInterval: 1 * time.Second, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload default interval after ticker with update", + updateTarget: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "startup reload", + discoveryReloadOnStartup: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "startup reload with target update", + discoveryReloadOnStartup: true, + updateTarget: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "startup reload with update after ticker", + discoveryReloadOnStartup: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 1, + }, + { + name: "no startup reload with scrape on shutdown after reload", + scrapeOnShutdown: true, + runDuration: 6 * time.Second, + expectedSamplesTotal: 2, + }, + { + name: "stop before no startup reload", + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 0, + }, + { + name: "startup reload and scrape on shutdown", + discoveryReloadOnStartup: true, + scrapeOnShutdown: true, + runDuration: 2 * time.Second, + expectedSamplesTotal: 2, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + opts := &Options{ + DiscoveryReloadOnStartup: tcase.discoveryReloadOnStartup, + ScrapeOnShutdown: tcase.scrapeOnShutdown, + DiscoveryReloadInterval: model.Duration(tcase.discoveryReloadInterval), + } + scrapeManager, app, cleanupConns := setupSynctestManager(t, opts) + defer cleanupConns() + + cfg := &config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(10 * time.Second), + ScrapeTimeout: model.Duration(10 * time.Second), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + } + cfgText, err := yaml.Marshal(*cfg) + require.NoError(t, err) + cfg = loadConfiguration(t, string(cfgText)) + require.NoError(t, scrapeManager.ApplyConfig(cfg)) + + tsetsCh := make(chan map[string][]*targetgroup.Group) + go scrapeManager.Run(tsetsCh) + + // Send initial target to trigger the first reload via the normal flow. + initialTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Targets: []model.LabelSet{initialTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh and for reloader to trigger reload. + + if tcase.updateTarget { + newTargetLabels := model.LabelSet{ + model.SchemeLabel: "http", + model.AddressLabel: "test-updated.local", + } + tsetsCh <- map[string][]*targetgroup.Group{ + "test": {{ + Source: "test", + Targets: []model.LabelSet{newTargetLabels}, + }}, + } + synctest.Wait() // Wait for Run to process tsetsCh. + } + + if tcase.runDuration > 0 { + time.Sleep(tcase.runDuration) + synctest.Wait() + } + + scrapeManager.Stop() + synctest.Wait() + + require.Len(t, findSamplesForMetric(app.ResultSamples(), "expected_metric"), tcase.expectedSamplesTotal) + }) + }) + } +}