mirror of
https://github.com/prometheus/prometheus
synced 2026-04-20 22:41:05 +08:00
* Adding scape on shutdown
Signed-off-by: avilevy <avilevy@google.com>
* scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely
Signed-off-by: avilevy <avilevy@google.com>
* renamed calculateScrapeOffset to getScrapeOffset
Signed-off-by: avilevy <avilevy@google.com>
* discovery: Add skipStartupWait to bypass initial discovery delay
In short-lived environments like agent mode or serverless, the
Prometheus process may only execute for a few seconds. Waiting for
the default 5-second `updatert` ticker before sending the first
target groups means the process could terminate before collecting
any metrics at all.
This commit adds a `skipStartupWait` option to the Discovery Manager
to bypass this initial delay. When enabled, the sender uses an
unthrottled startup loop that instantly forwards all triggers. This
ensures both the initial empty update from `ApplyConfig` and the
first real targets from discoverers are passed downstream immediately.
After the first ticker interval elapses, the sender cleanly breaks out
of the startup phase, resets the ticker, and resumes standard
operations.
Signed-off-by: avilevy <avilevy@google.com>
* scrape: Bypass initial reload delay for ScrapeOnShutdown
In short-lived environments like agent mode or serverless, the default
5-second `DiscoveryReloadInterval` can cause the process to terminate
before the scrape manager has a chance to process targets and collect
any metrics.
Because the discovery manager sends an initial empty update upon
configuration followed rapidly by the actual targets, simply waiting
for a single reload trigger is insufficient—the real targets would
still get trapped behind the ticker delay.
This commit introduces an unthrottled startup loop in the `reloader`
when `ScrapeOnShutdown` is enabled. It processes all incoming
`triggerReload` signals immediately during the first interval. Once
the initial tick fires, the `reloader` resets the ticker and falls
back into its standard throttled loop, ensuring short-lived processes
can discover and scrape targets instantly.
Signed-off-by: avilevy <avilevy@google.com>
* test(scrape): refactor time-based manager tests to use synctest
Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests.
Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic.
- Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble.
- Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable.
- Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest.
- Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven.
Signed-off-by: avilevy <avilevy@google.com>
* Clarify use cases in InitialScrapeOffset comment
Signed-off-by: avilevy <avilevy@google.com>
* test(scrape): use httptest for mock server to respect context cancellation
- Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`.
- Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports.
- Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts.
- Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios.
- Renamed `skipOffsetting` to `skipJitterOffsetting`.
- Addressed other PR comments.
Signed-off-by: avilevy <avilevy@google.com>
* tmp
Signed-off-by: bwplotka <bwplotka@gmail.com>
* exp2
Signed-off-by: bwplotka <bwplotka@gmail.com>
* fix
Signed-off-by: bwplotka <bwplotka@gmail.com>
* scrape: fix scrapeOnShutdown context bug and refactor test helpers
The scrapeOnShutdown feature was failing during manager shutdown because
the scrape pool context was being cancelled before the final shutdown
scrapes could execute. Fix this by delaying context cancellation
in scrapePool.stop() until after all scrape loops have stopped.
In addition:
- Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset.
- Refactored network test helper functions from manager_test.go to
helpers_test.go.
- Addressed other comments.
Signed-off-by: avilevy <avilevy@google.com>
* Update scrape/scrape.go
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
* feat(discovery): add SkipInitialWait to bypass initial startup delay
This adds a SkipInitialWait option to the discovery Manager, allowing consumers sensitive to startup latency to receive the first batch of discovered targets immediately instead of waiting for the updatert ticker.
To support this without breaking the immediate dropped target notifications introduced in #13147, ApplyConfig now uses a keep flag to only trigger immediate downstream syncs for obsolete or updated providers. This prevents sending premature empty target groups for brand-new providers on initial startup.
Additionally, the scrape manager's reloader loop is updated to process the initial triggerReload immediately, ensuring the end-to-end pipeline processes initial targets without artificial delays.
Signed-off-by: avilevy <avilevy@google.com>
* scrape: Add TestManagerReloader and refactor discovery triggerSync
Adds a new TestManagerReloader test suite using synctest to assert
behavior of target updates, discovery reload ticker intervals, and
ScrapeOnShutdown flags.
Updates setupSynctestManager to allow skipping initial config setup by
passing an interval of 0.
Also renames the 'keep' variable to 'triggerSync' in ApplyConfig inside
discovery/manager.go for clarity, and adds a descriptive comment.
Signed-off-by: avilevy <avilevy@google.com>
* feat(discovery,scrape): rename startup wait options and add DiscoveryReloadOnStartup
- discovery: Rename `SkipInitialWait` to `SkipStartupWait` for clarity.
- discovery: Pass `context.Context` to `flushUpdates` to handle cancellation and avoid leaks.
- scrape: Add `DiscoveryReloadOnStartup` to `Options` to decouple startup discovery from `ScrapeOnShutdown`.
- tests: Refactor `TestTargetSetTargetGroupsPresentOnStartup` and `TestManagerReloader` to use table-driven tests and `synctest` for better stability and coverage.
Signed-off-by: avilevy <avilevy@google.com>
* feat(discovery,scrape): importing changes proposed in 043d710
- Refactor sender to use exponential backoff
- Replaces `time.NewTicker` in `sender()` with an exponential backoff
to prevent panics on non-positive intervals and better throttle updates.
- Removes obsolete `skipStartupWait` logic.
- Refactors `setupSynctestManager` to use an explicit `initConfig` argument
Signed-off-by: avilevy <avilevy@google.com>
* fix: updating go mod
Signed-off-by: avilevy <avilevy@google.com>
* fixing merge
Signed-off-by: avilevy <avilevy@google.com>
* fixing issue: 2 variables but NewTestMetrics returns 1 value
Signed-off-by: avilevy <avilevy@google.com>
* Update discovery/manager.go
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
* Refactor setupSynctestManager initConfig into a separate function
Signed-off-by: avilevy <avilevy@google.com>
---------
Signed-off-by: avilevy <avilevy@google.com>
Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
Co-authored-by: bwplotka <bwplotka@gmail.com>
533 lines
17 KiB
Go
533 lines
17 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scrape
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"hash/fnv"
|
|
"log/slog"
|
|
"reflect"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
config_util "github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/features"
|
|
"github.com/prometheus/prometheus/util/logging"
|
|
"github.com/prometheus/prometheus/util/osutil"
|
|
"github.com/prometheus/prometheus/util/pool"
|
|
)
|
|
|
|
// NewManager is the Manager constructor using storage.Appendable or storage.AppendableV2.
|
|
//
|
|
// If unsure which one to use/implement, implement AppendableV2 as it significantly simplifies implementation and allows more
|
|
// (passing ST, always-on metadata, exemplars per sample).
|
|
//
|
|
// NewManager returns error if both appendable and appendableV2 are specified.
|
|
//
|
|
// Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
|
// storage.Appendable will be removed soon (ETA: Q2 2026).
|
|
func NewManager(
|
|
o *Options,
|
|
logger *slog.Logger,
|
|
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error),
|
|
appendable storage.Appendable,
|
|
appendableV2 storage.AppendableV2,
|
|
registerer prometheus.Registerer,
|
|
) (*Manager, error) {
|
|
if o == nil {
|
|
o = &Options{}
|
|
}
|
|
if logger == nil {
|
|
logger = promslog.NewNopLogger()
|
|
}
|
|
if appendable != nil && appendableV2 != nil {
|
|
return nil, errors.New("scrape.NewManager: appendable and appendableV2 cannot be provided at the same time")
|
|
}
|
|
if appendable == nil && appendableV2 == nil {
|
|
return nil, errors.New("scrape.NewManager: provide either appendable or appendableV2")
|
|
}
|
|
|
|
sm, err := newScrapeMetrics(registerer)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create scrape manager due to error: %w", err)
|
|
}
|
|
|
|
m := &Manager{
|
|
appendable: appendable,
|
|
appendableV2: appendableV2,
|
|
opts: o,
|
|
logger: logger,
|
|
newScrapeFailureLogger: newScrapeFailureLogger,
|
|
scrapeConfigs: make(map[string]*config.ScrapeConfig),
|
|
scrapePools: make(map[string]*scrapePool),
|
|
graceShut: make(chan struct{}),
|
|
triggerReload: make(chan struct{}, 1),
|
|
metrics: sm,
|
|
buffers: pool.New(1e3, 100e6, 3, func(sz int) any { return make([]byte, 0, sz) }),
|
|
}
|
|
|
|
m.metrics.setTargetMetadataCacheGatherer(m)
|
|
|
|
// Register scrape features.
|
|
if r := o.FeatureRegistry; r != nil {
|
|
// "Extra scrape metrics" is always enabled because it moved from feature flag to config file.
|
|
r.Enable(features.Scrape, "extra_scrape_metrics")
|
|
r.Set(features.Scrape, "start_timestamp_zero_ingestion", o.EnableStartTimestampZeroIngestion)
|
|
r.Set(features.Scrape, "type_and_unit_labels", o.EnableTypeAndUnitLabels)
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// Options are the configuration parameters to the scrape manager.
|
|
type Options struct {
|
|
// Option used by downstream scraper users like OpenTelemetry Collector
|
|
// to help lookup metric metadata. Should be false for Prometheus.
|
|
PassMetadataInContext bool
|
|
// Option to enable appending of scraped Metadata to the TSDB/other appenders. Individual appenders
|
|
// can decide what to do with metadata, but for practical purposes this flag exists so that metadata
|
|
// can be written to the WAL and thus read for remote write.
|
|
AppendMetadata bool
|
|
// Option to increase the interval used by scrape manager to throttle target groups updates.
|
|
DiscoveryReloadInterval model.Duration
|
|
|
|
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
|
|
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
|
|
//
|
|
// NOTE: This option has no effect for AppenderV2 and will be removed with the AppenderV1
|
|
// removal.
|
|
EnableStartTimestampZeroIngestion bool
|
|
|
|
// ParseST controls if ST should be parsed and appended from the scrape formats.
|
|
// This should be by default true, but it's opt-in for OpenMetrics (OM) 1.0 reasons and might be moved
|
|
// to OM 1.0 only flow.
|
|
//
|
|
// Specifically for OpenMetrics 1.0 flow, it can have some additional effects that might not be desired for non-ST users:
|
|
//
|
|
// * OpenMetrics 1.0 <metric>_created series will be parsed as ST instead of normal sample. Could be breaking
|
|
// if downstream user depends on _created metric. TODO(bwplotka): Add "preserveOMLines" hidden option?
|
|
// * Add relatively small (but still) overhead.
|
|
// * Can yield wrong ST values in rare edge cases (unknown metadata and metric name collisions).
|
|
//
|
|
// This only applies to AppenderV2 flow (Prometheus default).
|
|
// TODO: Move this option to OM1 parser and use only on OM1 flow.
|
|
ParseST bool
|
|
|
|
// EnableTypeAndUnitLabels represents type-and-unit-labels feature flag.
|
|
EnableTypeAndUnitLabels bool
|
|
|
|
// Optional HTTP client options to use when scraping.
|
|
HTTPClientOptions []config_util.HTTPClientOption
|
|
|
|
// FeatureRegistry is the registry for tracking enabled/disabled features.
|
|
FeatureRegistry features.Collector
|
|
|
|
// ScrapeOnShutdown enables a final scrape before the manager closes. This is useful
|
|
// for Prometheus in agent mode or OTel's prometheusreceiver when used in serverless
|
|
// job scenarios, allowing an extra scrape for the short-living edge cases.
|
|
//
|
|
// NOTE: This final scrape ignores the configured scrape interval.
|
|
ScrapeOnShutdown bool
|
|
|
|
// DiscoveryReloadOnStartup enables discovering targets immediately on start up as opposed
|
|
// to waiting for the interval defined in DiscoveryReloadInterval before
|
|
// initializing the scrape pools. Disabled by default. Useful for serverless
|
|
// flavors of OpenTelemetry contrib's prometheusreceiver where we're
|
|
// sensitive to start up delays.
|
|
DiscoveryReloadOnStartup bool
|
|
|
|
// InitialScrapeOffset applies an additional baseline delay before we begin
|
|
// scraping targets. By default, Prometheus calculates a specific offset for
|
|
// each target to spread the scraping load evenly across the server. Configuring
|
|
// this option adds a fixed duration to that target-specific offset. This allows
|
|
// tuning the initial startup delay without overriding the underlying target
|
|
// jitter, preserving proper load balancing across the scraper pools.
|
|
//
|
|
// Setting this offset (e.g., to 10s) is particularly useful in Prometheus
|
|
// agent mode and OTel's prometheusreceiver when used in serverless job
|
|
// scenarios. It helps avoid readiness races where targets might not be fully
|
|
// initialized immediately upon startup. It also prevents capturing
|
|
// intermediate state (such as applications crashing shortly after booting),
|
|
// and ensures backend rate limits don't drop valuable shutdown scrapes
|
|
// because of an early startup scrape.
|
|
InitialScrapeOffset time.Duration
|
|
|
|
// private option for testability.
|
|
skipJitterOffsetting bool
|
|
}
|
|
|
|
// Manager maintains a set of scrape pools and manages start/stop cycles
|
|
// when receiving new target groups from the discovery manager.
|
|
type Manager struct {
|
|
opts *Options
|
|
logger *slog.Logger
|
|
|
|
appendable storage.Appendable
|
|
appendableV2 storage.AppendableV2
|
|
|
|
graceShut chan struct{}
|
|
|
|
offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup.
|
|
mtxScrape sync.Mutex // Guards the fields below.
|
|
scrapeConfigs map[string]*config.ScrapeConfig
|
|
scrapePools map[string]*scrapePool
|
|
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error)
|
|
scrapeFailureLoggers map[string]FailureLogger
|
|
targetSets map[string][]*targetgroup.Group
|
|
buffers *pool.Pool
|
|
|
|
triggerReload chan struct{}
|
|
|
|
metrics *scrapeMetrics
|
|
}
|
|
|
|
// Run receives and saves target set updates and triggers the scraping loops reloading.
|
|
// Reloading happens in the background so that it doesn't block receiving targets updates.
|
|
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
|
|
go m.reloader()
|
|
for {
|
|
select {
|
|
case ts, ok := <-tsets:
|
|
if !ok {
|
|
break
|
|
}
|
|
m.updateTsets(ts)
|
|
|
|
select {
|
|
case m.triggerReload <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
case <-m.graceShut:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// UnregisterMetrics unregisters manager metrics.
|
|
func (m *Manager) UnregisterMetrics() {
|
|
m.metrics.Unregister()
|
|
}
|
|
|
|
func (m *Manager) reloader() {
|
|
reloadIntervalDuration := time.Duration(m.opts.DiscoveryReloadInterval)
|
|
if reloadIntervalDuration == 0 {
|
|
reloadIntervalDuration = 5 * time.Second
|
|
}
|
|
|
|
ticker := time.NewTicker(reloadIntervalDuration)
|
|
defer ticker.Stop()
|
|
|
|
if m.opts.DiscoveryReloadOnStartup {
|
|
select {
|
|
case <-m.graceShut:
|
|
return
|
|
case <-m.triggerReload:
|
|
m.reload()
|
|
}
|
|
ticker.Reset(reloadIntervalDuration)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-m.graceShut:
|
|
return
|
|
case <-ticker.C:
|
|
select {
|
|
case <-m.triggerReload:
|
|
m.reload()
|
|
case <-m.graceShut:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) reload() {
|
|
m.mtxScrape.Lock()
|
|
var wg sync.WaitGroup
|
|
for setName, groups := range m.targetSets {
|
|
if _, ok := m.scrapePools[setName]; !ok {
|
|
scrapeConfig, ok := m.scrapeConfigs[setName]
|
|
if !ok {
|
|
m.logger.Error("error reloading target set", "err", "invalid config id:"+setName)
|
|
continue
|
|
}
|
|
m.metrics.targetScrapePools.Inc()
|
|
sp, err := newScrapePool(scrapeConfig, m.appendable, m.appendableV2, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
|
|
if err != nil {
|
|
m.metrics.targetScrapePoolsFailed.Inc()
|
|
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)
|
|
continue
|
|
}
|
|
m.scrapePools[setName] = sp
|
|
if l, ok := m.scrapeFailureLoggers[scrapeConfig.ScrapeFailureLogFile]; ok {
|
|
sp.SetScrapeFailureLogger(l)
|
|
} else {
|
|
sp.logger.Error("No logger found. This is a bug in Prometheus that should be reported upstream.", "scrape_pool", setName)
|
|
}
|
|
}
|
|
|
|
wg.Add(1)
|
|
// Run the sync in parallel as these take a while and at high load can't catch up.
|
|
go func(sp *scrapePool, groups []*targetgroup.Group) {
|
|
sp.Sync(groups)
|
|
wg.Done()
|
|
}(m.scrapePools[setName], groups)
|
|
}
|
|
m.mtxScrape.Unlock()
|
|
wg.Wait()
|
|
}
|
|
|
|
// setOffsetSeed calculates a global offsetSeed per server relying on extra label set.
|
|
func (m *Manager) setOffsetSeed(labels labels.Labels) error {
|
|
h := fnv.New64a()
|
|
hostname, err := osutil.GetFQDN()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := fmt.Fprintf(h, "%s%s", hostname, labels.String()); err != nil {
|
|
return err
|
|
}
|
|
m.offsetSeed = h.Sum64()
|
|
return nil
|
|
}
|
|
|
|
// Stop cancels all running scrape pools and blocks until all have exited.
|
|
func (m *Manager) Stop() {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
for _, sp := range m.scrapePools {
|
|
sp.stop()
|
|
}
|
|
close(m.graceShut)
|
|
}
|
|
|
|
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
|
|
m.mtxScrape.Lock()
|
|
m.targetSets = tsets
|
|
m.mtxScrape.Unlock()
|
|
}
|
|
|
|
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
|
|
func (m *Manager) ApplyConfig(cfg *config.Config) error {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
scfgs, err := cfg.GetScrapeConfigs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c := make(map[string]*config.ScrapeConfig)
|
|
scrapeFailureLoggers := map[string]FailureLogger{
|
|
"": nil, // Emptying the file name sets the scrape logger to nil.
|
|
}
|
|
for _, scfg := range scfgs {
|
|
c[scfg.JobName] = scfg
|
|
if _, ok := scrapeFailureLoggers[scfg.ScrapeFailureLogFile]; !ok {
|
|
// We promise to reopen the file on each reload.
|
|
var (
|
|
logger FailureLogger
|
|
err error
|
|
)
|
|
if m.newScrapeFailureLogger != nil {
|
|
if logger, err = m.newScrapeFailureLogger(scfg.ScrapeFailureLogFile); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
scrapeFailureLoggers[scfg.ScrapeFailureLogFile] = logger
|
|
}
|
|
}
|
|
m.scrapeConfigs = c
|
|
|
|
oldScrapeFailureLoggers := m.scrapeFailureLoggers
|
|
for _, s := range oldScrapeFailureLoggers {
|
|
if s != nil {
|
|
defer s.Close()
|
|
}
|
|
}
|
|
|
|
m.scrapeFailureLoggers = scrapeFailureLoggers
|
|
|
|
// Skip offset seed calculation during tests.
|
|
// setOffsetSeed relies on osutil.GetFQDN(), which triggers a DNS lookup using
|
|
// a global singleflight goroutine. This cross-boundary communication breaks
|
|
// synctest's isolation bubble and causes a fatal panic.
|
|
if m.opts.skipJitterOffsetting {
|
|
m.offsetSeed = 0
|
|
} else {
|
|
if err := m.setOffsetSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Cleanup and reload pool if the configuration has changed.
|
|
var (
|
|
failed atomic.Bool
|
|
wg sync.WaitGroup
|
|
toDelete sync.Map // Stores the list of names of pools to delete.
|
|
)
|
|
|
|
// Use a buffered channel to limit reload concurrency.
|
|
// Each scrape pool writes the channel before we start to reload it and read from it at the end.
|
|
// This means only N pools can be reloaded at the same time.
|
|
canReload := make(chan int, runtime.GOMAXPROCS(0))
|
|
for poolName, pool := range m.scrapePools {
|
|
canReload <- 1
|
|
wg.Add(1)
|
|
cfg, ok := m.scrapeConfigs[poolName]
|
|
// Reload each scrape pool in a dedicated goroutine so we don't have to wait a long time
|
|
// if we have a lot of scrape pools to update.
|
|
go func(name string, sp *scrapePool, cfg *config.ScrapeConfig, ok bool) {
|
|
defer func() {
|
|
wg.Done()
|
|
<-canReload
|
|
}()
|
|
if !ok {
|
|
sp.stop()
|
|
toDelete.Store(name, struct{}{})
|
|
return
|
|
}
|
|
// Update the scrape failure logger before reloading so that
|
|
// restartLoops captures the new logger when starting new loops.
|
|
if l, ok := m.scrapeFailureLoggers[cfg.ScrapeFailureLogFile]; ok {
|
|
sp.SetScrapeFailureLogger(l)
|
|
} else {
|
|
sp.logger.Error("No logger found. This is a bug in Prometheus that should be reported upstream.", "scrape_pool", name)
|
|
}
|
|
if !reflect.DeepEqual(sp.config, cfg) {
|
|
err := sp.reload(cfg)
|
|
if err != nil {
|
|
m.logger.Error("error reloading scrape pool", "err", err, "scrape_pool", name)
|
|
failed.Store(true)
|
|
}
|
|
}
|
|
}(poolName, pool, cfg, ok)
|
|
}
|
|
wg.Wait()
|
|
|
|
toDelete.Range(func(name, _ any) bool {
|
|
delete(m.scrapePools, name.(string))
|
|
return true
|
|
})
|
|
|
|
if failed.Load() {
|
|
return errors.New("failed to apply the new configuration")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TargetsAll returns active and dropped targets grouped by job_name.
|
|
func (m *Manager) TargetsAll() map[string][]*Target {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
targets := make(map[string][]*Target, len(m.scrapePools))
|
|
for tset, sp := range m.scrapePools {
|
|
targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)
|
|
}
|
|
return targets
|
|
}
|
|
|
|
// ScrapePools returns the list of all scrape pool names.
|
|
func (m *Manager) ScrapePools() []string {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
names := make([]string, 0, len(m.scrapePools))
|
|
for name := range m.scrapePools {
|
|
names = append(names, name)
|
|
}
|
|
return names
|
|
}
|
|
|
|
// TargetsActive returns the active targets currently being scraped.
|
|
func (m *Manager) TargetsActive() map[string][]*Target {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
targets := make(map[string][]*Target, len(m.scrapePools))
|
|
for tset, sp := range m.scrapePools {
|
|
targets[tset] = sp.ActiveTargets()
|
|
}
|
|
return targets
|
|
}
|
|
|
|
// TargetsDropped returns the dropped targets during relabelling, subject to KeepDroppedTargets limit.
|
|
func (m *Manager) TargetsDropped() map[string][]*Target {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
targets := make(map[string][]*Target, len(m.scrapePools))
|
|
for tset, sp := range m.scrapePools {
|
|
targets[tset] = sp.DroppedTargets()
|
|
}
|
|
return targets
|
|
}
|
|
|
|
func (m *Manager) TargetsDroppedCounts() map[string]int {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
counts := make(map[string]int, len(m.scrapePools))
|
|
for tset, sp := range m.scrapePools {
|
|
counts[tset] = sp.droppedTargetsCount
|
|
}
|
|
return counts
|
|
}
|
|
|
|
func (m *Manager) ScrapePoolConfig(scrapePool string) (*config.ScrapeConfig, error) {
|
|
m.mtxScrape.Lock()
|
|
defer m.mtxScrape.Unlock()
|
|
|
|
sp, ok := m.scrapePools[scrapePool]
|
|
if !ok {
|
|
return nil, fmt.Errorf("scrape pool %q not found", scrapePool)
|
|
}
|
|
|
|
return sp.config, nil
|
|
}
|
|
|
|
// DisableEndOfRunStalenessMarkers disables the end-of-run staleness markers for the provided targets in the given
|
|
// targetSet. When the end-of-run staleness is disabled for a target, when it goes away, there will be no staleness
|
|
// markers written for its series.
|
|
func (m *Manager) DisableEndOfRunStalenessMarkers(targetSet string, targets []*Target) {
|
|
// This avoids mutex lock contention.
|
|
if len(targets) == 0 {
|
|
return
|
|
}
|
|
|
|
// Only hold the lock to find the scrape pool
|
|
m.mtxScrape.Lock()
|
|
sp, ok := m.scrapePools[targetSet]
|
|
m.mtxScrape.Unlock()
|
|
|
|
if ok {
|
|
sp.disableEndOfRunStalenessMarkers(targets)
|
|
}
|
|
}
|