mirror of
https://github.com/prometheus/prometheus
synced 2026-04-30 14:50:25 +08:00
@@ -22,6 +22,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v5"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/config"
|
"github.com/prometheus/common/config"
|
||||||
"github.com/prometheus/common/promslog"
|
"github.com/prometheus/common/promslog"
|
||||||
@@ -95,7 +96,7 @@ func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.
|
|||||||
targets: make(map[poolKey]map[string]*targetgroup.Group),
|
targets: make(map[poolKey]map[string]*targetgroup.Group),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
updatert: 5 * time.Second,
|
updatert: 5 * time.Second,
|
||||||
triggerSend: make(chan struct{}, 1),
|
triggerSend: make(chan struct{}, 1), // At least one element to ensure we can do a delayed read.
|
||||||
registerer: registerer,
|
registerer: registerer,
|
||||||
sdMetrics: sdMetrics,
|
sdMetrics: sdMetrics,
|
||||||
}
|
}
|
||||||
@@ -158,17 +159,6 @@ func FeatureRegistry(fr features.Collector) func(*Manager) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SkipStartupWait configures the manager to skip the initial wait on startup.
|
|
||||||
// This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
|
|
||||||
// which are sensitive to startup latencies.
|
|
||||||
func SkipStartupWait() func(*Manager) {
|
|
||||||
return func(m *Manager) {
|
|
||||||
m.mtx.Lock()
|
|
||||||
defer m.mtx.Unlock()
|
|
||||||
m.skipStartupWait = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Manager maintains a set of discovery providers and sends each update to a map channel.
|
// Manager maintains a set of discovery providers and sends each update to a map channel.
|
||||||
// Targets are grouped by the target set name.
|
// Targets are grouped by the target set name.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
@@ -206,11 +196,6 @@ type Manager struct {
|
|||||||
|
|
||||||
// featureRegistry is used to track which service discovery providers are configured.
|
// featureRegistry is used to track which service discovery providers are configured.
|
||||||
featureRegistry features.Collector
|
featureRegistry features.Collector
|
||||||
|
|
||||||
// skipStartupWait allows the discovery manager to skip the initial wait before sending updates
|
|
||||||
// to the channel. This is useful for Prometheus in agent mode or serverless flavours of OTel's prometheusreceiver
|
|
||||||
// which are sensitive to startup latencies.
|
|
||||||
skipStartupWait bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Providers returns the currently configured SD providers.
|
// Providers returns the currently configured SD providers.
|
||||||
@@ -253,8 +238,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
newProviders []*Provider
|
newProviders []*Provider
|
||||||
// triggerSync shows if we should trigger send to notify downstream of changes.
|
|
||||||
triggerSync bool
|
|
||||||
)
|
)
|
||||||
for _, prov := range m.providers {
|
for _, prov := range m.providers {
|
||||||
// Cancel obsolete providers if it has no new subs and it has a cancel function.
|
// Cancel obsolete providers if it has no new subs and it has a cancel function.
|
||||||
@@ -269,7 +252,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||||||
|
|
||||||
prov.cancel()
|
prov.cancel()
|
||||||
prov.mu.RUnlock()
|
prov.mu.RUnlock()
|
||||||
triggerSync = true // Trigger send to notify downstream of dropped targets
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
prov.mu.RUnlock()
|
prov.mu.RUnlock()
|
||||||
@@ -281,7 +263,6 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||||||
|
|
||||||
m.targetsMtx.Lock()
|
m.targetsMtx.Lock()
|
||||||
for s := range prov.subs {
|
for s := range prov.subs {
|
||||||
triggerSync = true // Trigger send because this is an existing provider (reload)
|
|
||||||
refTargets = m.targets[poolKey{s, prov.name}]
|
refTargets = m.targets[poolKey{s, prov.name}]
|
||||||
// Remove obsolete subs' targets.
|
// Remove obsolete subs' targets.
|
||||||
if _, ok := prov.newSubs[s]; !ok {
|
if _, ok := prov.newSubs[s]; !ok {
|
||||||
@@ -314,7 +295,7 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
|
|||||||
// See https://github.com/prometheus/prometheus/pull/8639 for details.
|
// See https://github.com/prometheus/prometheus/pull/8639 for details.
|
||||||
// This also helps making the downstream managers drop stale targets as soon as possible.
|
// This also helps making the downstream managers drop stale targets as soon as possible.
|
||||||
// See https://github.com/prometheus/prometheus/pull/13147 for details.
|
// See https://github.com/prometheus/prometheus/pull/13147 for details.
|
||||||
if triggerSync {
|
if len(m.providers) > 0 {
|
||||||
select {
|
select {
|
||||||
case m.triggerSend <- struct{}{}:
|
case m.triggerSend <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
@@ -402,59 +383,47 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) flushUpdates(ctx context.Context, timeout <-chan time.Time) {
|
|
||||||
m.metrics.SentUpdates.Inc()
|
|
||||||
select {
|
|
||||||
case m.syncCh <- m.allGroups():
|
|
||||||
case <-timeout:
|
|
||||||
m.metrics.DelayedUpdates.Inc()
|
|
||||||
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
|
|
||||||
select {
|
|
||||||
case m.triggerSend <- struct{}{}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) sender() {
|
func (m *Manager) sender() {
|
||||||
ticker := time.NewTicker(m.updatert)
|
|
||||||
defer func() {
|
defer func() {
|
||||||
ticker.Stop()
|
|
||||||
close(m.syncCh)
|
close(m.syncCh)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if m.skipStartupWait {
|
// Some discoverers send updates too often, so we throttle these with a backoff interval that
|
||||||
select {
|
// increased the interval up to m.updatert delay.
|
||||||
case <-m.triggerSend:
|
lastSent := time.Now().Add(-1 * m.updatert)
|
||||||
m.flushUpdates(m.ctx, ticker.C)
|
b := &backoff.ExponentialBackOff{
|
||||||
case <-m.ctx.Done():
|
InitialInterval: 100 * time.Millisecond,
|
||||||
return
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||||
}
|
Multiplier: backoff.DefaultMultiplier,
|
||||||
ticker.Reset(m.updatert)
|
MaxInterval: m.updatert,
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-m.ctx.Done():
|
case <-m.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
|
case <-time.After(b.NextBackOff()):
|
||||||
select {
|
select {
|
||||||
case <-m.triggerSend:
|
case <-m.triggerSend:
|
||||||
m.metrics.SentUpdates.Inc()
|
m.metrics.SentUpdates.Inc()
|
||||||
select {
|
select {
|
||||||
case m.syncCh <- m.allGroups():
|
case m.syncCh <- m.allGroups():
|
||||||
|
lastSent = time.Now()
|
||||||
default:
|
default:
|
||||||
m.metrics.DelayedUpdates.Inc()
|
m.metrics.DelayedUpdates.Inc()
|
||||||
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
|
m.logger.Debug("Discovery receiver's channel was full so will retry the next cycle")
|
||||||
|
// Ensure we don't miss this update.
|
||||||
select {
|
select {
|
||||||
case m.triggerSend <- struct{}{}:
|
case m.triggerSend <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
if time.Since(lastSent) > m.updatert {
|
||||||
|
b.Reset() // Nothing happened for a while, start again from low interval for prompt updates.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -799,14 +799,14 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "startup wait with short interval succeeds",
|
name: "startup wait with short interval succeeds",
|
||||||
updatert: 10 * time.Millisecond,
|
updatert: 10 * time.Millisecond,
|
||||||
readTimeout: 100 * time.Millisecond,
|
readTimeout: 300 * time.Millisecond,
|
||||||
expectedTargets: 1,
|
expectedTargets: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "skip startup wait",
|
name: "skip startup wait",
|
||||||
skipInitialWait: true,
|
skipInitialWait: true,
|
||||||
updatert: 100 * time.Hour,
|
updatert: 100 * time.Hour,
|
||||||
readTimeout: 100 * time.Millisecond,
|
readTimeout: 300 * time.Millisecond,
|
||||||
expectedTargets: 1,
|
expectedTargets: 1,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -839,20 +839,25 @@ func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
|
|||||||
|
|
||||||
synctest.Wait()
|
synctest.Wait()
|
||||||
|
|
||||||
var syncedTargets map[string][]*targetgroup.Group
|
timeout := time.After(tc.readTimeout)
|
||||||
select {
|
var lastSyncedTargets map[string][]*targetgroup.Group
|
||||||
case syncedTargets = <-discoveryManager.SyncCh():
|
testFor:
|
||||||
case <-time.After(tc.readTimeout):
|
for {
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
break testFor
|
||||||
|
case lastSyncedTargets = <-discoveryManager.SyncCh():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if tc.expectedTargets == 0 {
|
if tc.expectedTargets == 0 {
|
||||||
require.Nil(t, syncedTargets)
|
require.Nil(t, lastSyncedTargets)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Len(t, syncedTargets, 1)
|
require.Len(t, lastSyncedTargets, 1)
|
||||||
require.Len(t, syncedTargets["prometheus"], tc.expectedTargets)
|
require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets)
|
||||||
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
||||||
|
|
||||||
p := pk("static", "prometheus", 0)
|
p := pk("static", "prometheus", 0)
|
||||||
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
||||||
|
|||||||
Reference in New Issue
Block a user