alternative

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka
2026-03-30 11:06:23 +01:00
parent 0f38319b92
commit 043d710282
2 changed files with 33 additions and 59 deletions

View File

@@ -22,6 +22,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/cenkalti/backoff/v5"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config" "github.com/prometheus/common/config"
"github.com/prometheus/common/promslog" "github.com/prometheus/common/promslog"
@@ -95,7 +96,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.
targets: make(map[poolKey]map[string]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group),
ctx: ctx, ctx: ctx,
updatert: 5 * time.Second, updatert: 5 * time.Second,
triggerSend: make(chan struct{}, 1), triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read.
registerer: registerer, registerer: registerer,
sdMetrics: sdMetrics, sdMetrics: sdMetrics,
} }
@@ -158,17 +159,6 @@ func FeatureRegistry(fr features.Collector) func(*Manager) {
} }
} }
// SkipStartupWait configures the manager to skip the initial wait on startup.
// This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
// which are sensitive to startup latencies.
func SkipStartupWait() func(*Manager) {
return func(m *Manager) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.skipStartupWait = true
}
}
// Manager maintains a set of discovery providers and sends each update to a map channel. // Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name. // Targets are grouped by the target set name.
type Manager struct { type Manager struct {
@@ -206,11 +196,6 @@ type Manager struct {
// featureRegistry is used to track which service discovery providers are configured. // featureRegistry is used to track which service discovery providers are configured.
featureRegistry features.Collector featureRegistry features.Collector
// skipStartupWait allows the discovery manager to skip the initial wait before sending updates
// to the channel. This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
// which are sensitive to startup latencies.
skipStartupWait bool
} }
// Providers returns the currently configured SD providers. // Providers returns the currently configured SD providers.
@@ -253,8 +238,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
newProviders []*Provider newProviders []*Provider
// triggerSync shows if we should trigger send to notify downstream of changes.
triggerSync bool
) )
for _, prov := range m.providers { for _, prov := range m.providers {
// Cancel obsolete providers if it has no new subs and it has a cancel function. // Cancel obsolete providers if it has no new subs and it has a cancel function.
@@ -269,7 +252,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
prov.cancel() prov.cancel()
prov.mu.RUnlock() prov.mu.RUnlock()
triggerSync = true // Trigger send to notify downstream of dropped targets
continue continue
} }
prov.mu.RUnlock() prov.mu.RUnlock()
@@ -281,7 +263,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
m.targetsMtx.Lock() m.targetsMtx.Lock()
for s := range prov.subs { for s := range prov.subs {
triggerSync = true // Trigger send because this is an existing provider (reload)
refTargets = m.targets[poolKey{s, prov.name}] refTargets = m.targets[poolKey{s, prov.name}]
// Remove obsolete subs' targets. // Remove obsolete subs' targets.
if _, ok := prov.newSubs[s]; !ok { if _, ok := prov.newSubs[s]; !ok {
@@ -314,7 +295,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
// See https://github.com/prometheus/prometheus/pull/8639 for details. // See https://github.com/prometheus/prometheus/pull/8639 for details.
// This also helps making the downstream managers drop stale targets as soon as possible. // This also helps making the downstream managers drop stale targets as soon as possible.
// See https://github.com/prometheus/prometheus/pull/13147 for details. // See https://github.com/prometheus/prometheus/pull/13147 for details.
if triggerSync { if len(m.providers) > 0 {
select { select {
case m.triggerSend <- struct{}{}: case m.triggerSend <- struct{}{}:
default: default:
@@ -402,59 +383,47 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
} }
} }
func (m *Manager) flushUpdates(ctx context.Context, timeout <-chan time.Time) {
m.metrics.SentUpdates.Inc()
select {
case m.syncCh <- m.allGroups():
case <-timeout:
m.metrics.DelayedUpdates.Inc()
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
case <-ctx.Done():
return
default:
}
}
}
func (m *Manager) sender() { func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer func() { defer func() {
ticker.Stop()
close(m.syncCh) close(m.syncCh)
}() }()
if m.skipStartupWait { // Some discoverers send updates too often, so we throttle these with a backoff interval that
select { // increased the interval up to m.updatert delay.
case <-m.triggerSend: lastSent := time.Now().Add(-1 * m.updatert)
m.flushUpdates(m.ctx, ticker.C) b := &backoff.ExponentialBackOff{
case <-m.ctx.Done(): InitialInterval: 100 * time.Millisecond,
return RandomizationFactor: backoff.DefaultRandomizationFactor,
} Multiplier: backoff.DefaultMultiplier,
ticker.Reset(m.updatert) MaxInterval: m.updatert,
} }
for { for {
select { select {
case <-m.ctx.Done(): case <-m.ctx.Done():
return return
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. case <-time.After(b.NextBackOff()):
select { select {
case <-m.triggerSend: case <-m.triggerSend:
m.metrics.SentUpdates.Inc() m.metrics.SentUpdates.Inc()
select { select {
case m.syncCh <- m.allGroups(): case m.syncCh <- m.allGroups():
lastSent = time.Now()
default: default:
m.metrics.DelayedUpdates.Inc() m.metrics.DelayedUpdates.Inc()
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle") m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
// Ensure we don't miss this update.
select { select {
case m.triggerSend <- struct{}{}: case m.triggerSend <- struct{}{}:
default: default:
} }
} }
default: default:
} }
if time.Since(lastSent) > m.updatert {
b.Reset() // Nothing happened for a while, start again from low interval for prompt updates.
}
} }
} }
} }

View File

@@ -799,14 +799,14 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
{ {
name: "startup wait with short interval succeeds", name: "startup wait with short interval succeeds",
updatert: 10 * time.Millisecond, updatert: 10 * time.Millisecond,
readTimeout: 100 * time.Millisecond, readTimeout: 300 * time.Millisecond,
expectedTargets: 1, expectedTargets: 1,
}, },
{ {
name: "skip startup wait", name: "skip startup wait",
skipInitialWait: true, skipInitialWait: true,
updatert: 100 * time.Hour, updatert: 100 * time.Hour,
readTimeout: 100 * time.Millisecond, readTimeout: 300 * time.Millisecond,
expectedTargets: 1, expectedTargets: 1,
}, },
} }
@@ -839,20 +839,25 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
synctest.Wait() synctest.Wait()
var syncedTargets map[string][]*targetgroup.Group timeout := time.After(tc.readTimeout)
var lastSyncedTargets map[string][]*targetgroup.Group
testFor:
for {
select { select {
case syncedTargets = <-discoveryManager.SyncCh(): case <-timeout:
case <-time.After(tc.readTimeout): break testFor
case lastSyncedTargets = <-discoveryManager.SyncCh():
}
} }
if tc.expectedTargets == 0 { if tc.expectedTargets == 0 {
require.Nil(t, syncedTargets) require.Nil(t, lastSyncedTargets)
return return
} }
require.Len(t, syncedTargets, 1) require.Len(t, lastSyncedTargets, 1)
require.Len(t, syncedTargets["prometheus"], tc.expectedTargets) require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets)
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
p := pk("static", "prometheus", 0) p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)