mirror of
https://github.com/prometheus/prometheus
synced 2026-04-20 22:41:05 +08:00
* Adding scape on shutdown
Signed-off-by: avilevy <avilevy@google.com>
* scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely
Signed-off-by: avilevy <avilevy@google.com>
* renamed calculateScrapeOffset to getScrapeOffset
Signed-off-by: avilevy <avilevy@google.com>
* discovery: Add skipStartupWait to bypass initial discovery delay
In short-lived environments like agent mode or serverless, the
Prometheus process may only execute for a few seconds. Waiting for
the default 5-second `updatert` ticker before sending the first
target groups means the process could terminate before collecting
any metrics at all.
This commit adds a `skipStartupWait` option to the Discovery Manager
to bypass this initial delay. When enabled, the sender uses an
unthrottled startup loop that instantly forwards all triggers. This
ensures both the initial empty update from `ApplyConfig` and the
first real targets from discoverers are passed downstream immediately.
After the first ticker interval elapses, the sender cleanly breaks out
of the startup phase, resets the ticker, and resumes standard
operations.
Signed-off-by: avilevy <avilevy@google.com>
* scrape: Bypass initial reload delay for ScrapeOnShutdown
In short-lived environments like agent mode or serverless, the default
5-second `DiscoveryReloadInterval` can cause the process to terminate
before the scrape manager has a chance to process targets and collect
any metrics.
Because the discovery manager sends an initial empty update upon
configuration followed rapidly by the actual targets, simply waiting
for a single reload trigger is insufficient—the real targets would
still get trapped behind the ticker delay.
This commit introduces an unthrottled startup loop in the `reloader`
when `ScrapeOnShutdown` is enabled. It processes all incoming
`triggerReload` signals immediately during the first interval. Once
the initial tick fires, the `reloader` resets the ticker and falls
back into its standard throttled loop, ensuring short-lived processes
can discover and scrape targets instantly.
Signed-off-by: avilevy <avilevy@google.com>
* test(scrape): refactor time-based manager tests to use synctest
Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests.
Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic.
- Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble.
- Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable.
- Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest.
- Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven.
Signed-off-by: avilevy <avilevy@google.com>
* Clarify use cases in InitialScrapeOffset comment
Signed-off-by: avilevy <avilevy@google.com>
* test(scrape): use httptest for mock server to respect context cancellation
- Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`.
- Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports.
- Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts.
- Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios.
- Renamed `skipOffsetting` to `skipJitterOffsetting`.
- Addressed other PR comments.
Signed-off-by: avilevy <avilevy@google.com>
* tmp
Signed-off-by: bwplotka <bwplotka@gmail.com>
* exp2
Signed-off-by: bwplotka <bwplotka@gmail.com>
* fix
Signed-off-by: bwplotka <bwplotka@gmail.com>
* scrape: fix scrapeOnShutdown context bug and refactor test helpers
The scrapeOnShutdown feature was failing during manager shutdown because
the scrape pool context was being cancelled before the final shutdown
scrapes could execute. Fix this by delaying context cancellation
in scrapePool.stop() until after all scrape loops have stopped.
In addition:
- Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset.
- Refactored network test helper functions from manager_test.go to
helpers_test.go.
- Addressed other comments.
Signed-off-by: avilevy <avilevy@google.com>
* Update scrape/scrape.go
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
* feat(discovery): add SkipInitialWait to bypass initial startup delay
This adds a SkipInitialWait option to the discovery Manager, allowing consumers sensitive to startup latency to receive the first batch of discovered targets immediately instead of waiting for the updatert ticker.
To support this without breaking the immediate dropped target notifications introduced in #13147, ApplyConfig now uses a keep flag to only trigger immediate downstream syncs for obsolete or updated providers. This prevents sending premature empty target groups for brand-new providers on initial startup.
Additionally, the scrape manager's reloader loop is updated to process the initial triggerReload immediately, ensuring the end-to-end pipeline processes initial targets without artificial delays.
Signed-off-by: avilevy <avilevy@google.com>
* scrape: Add TestManagerReloader and refactor discovery triggerSync
Adds a new TestManagerReloader test suite using synctest to assert
behavior of target updates, discovery reload ticker intervals, and
ScrapeOnShutdown flags.
Updates setupSynctestManager to allow skipping initial config setup by
passing an interval of 0.
Also renames the 'keep' variable to 'triggerSync' in ApplyConfig inside
discovery/manager.go for clarity, and adds a descriptive comment.
Signed-off-by: avilevy <avilevy@google.com>
* feat(discovery,scrape): rename startup wait options and add DiscoveryReloadOnStartup
- discovery: Rename `SkipInitialWait` to `SkipStartupWait` for clarity.
- discovery: Pass `context.Context` to `flushUpdates` to handle cancellation and avoid leaks.
- scrape: Add `DiscoveryReloadOnStartup` to `Options` to decouple startup discovery from `ScrapeOnShutdown`.
- tests: Refactor `TestTargetSetTargetGroupsPresentOnStartup` and `TestManagerReloader` to use table-driven tests and `synctest` for better stability and coverage.
Signed-off-by: avilevy <avilevy@google.com>
* feat(discovery,scrape): importing changes proposed in 043d710
- Refactor sender to use exponential backoff
- Replaces `time.NewTicker` in `sender()` with an exponential backoff
to prevent panics on non-positive intervals and better throttle updates.
- Removes obsolete `skipStartupWait` logic.
- Refactors `setupSynctestManager` to use an explicit `initConfig` argument
Signed-off-by: avilevy <avilevy@google.com>
* fix: updating go mod
Signed-off-by: avilevy <avilevy@google.com>
* fixing merge
Signed-off-by: avilevy <avilevy@google.com>
* fixing issue: 2 variables but NewTestMetrics returns 1 value
Signed-off-by: avilevy <avilevy@google.com>
* Update discovery/manager.go
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
* Refactor setupSynctestManager initConfig into a separate function
Signed-off-by: avilevy <avilevy@google.com>
---------
Signed-off-by: avilevy <avilevy@google.com>
Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
Co-authored-by: bwplotka <bwplotka@gmail.com>
1757 lines
46 KiB
Go
1757 lines
46 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"testing/synctest"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/util/testutil"
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
testutil.TolerantVerifyLeak(m)
|
|
}
|
|
|
|
func NewTestMetrics(t *testing.T, reg prometheus.Registerer) *SDMetrics {
|
|
refreshMetrics := NewRefreshMetrics(reg)
|
|
mechanismMetrics, err := RegisterSDMetrics(reg, refreshMetrics)
|
|
require.NoError(t, err)
|
|
return &SDMetrics{
|
|
MechanismMetrics: mechanismMetrics,
|
|
RefreshManager: refreshMetrics,
|
|
}
|
|
}
|
|
|
|
// TestTargetUpdatesOrder checks that the target updates are received in the expected order.
|
|
func TestTargetUpdatesOrder(t *testing.T) {
|
|
// The order by which the updates are send is determined by the interval passed to the mock discovery adapter
|
|
// Final targets array is ordered alphabetically by the name of the discoverer.
|
|
// For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge.
|
|
testCases := []struct {
|
|
title string
|
|
updates map[string][]update
|
|
expectedTargets [][]*targetgroup.Group
|
|
}{
|
|
{
|
|
title: "Single TP no updates",
|
|
updates: map[string][]update{
|
|
"tp1": {},
|
|
},
|
|
expectedTargets: nil,
|
|
},
|
|
{
|
|
title: "Multiple TPs no updates",
|
|
updates: map[string][]update{
|
|
"tp1": {},
|
|
"tp2": {},
|
|
"tp3": {},
|
|
},
|
|
expectedTargets: nil,
|
|
},
|
|
{
|
|
title: "Single TP empty initials",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{},
|
|
interval: 5 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{},
|
|
},
|
|
},
|
|
{
|
|
title: "Multiple TPs empty initials",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{},
|
|
interval: 5 * time.Millisecond,
|
|
},
|
|
},
|
|
"tp2": {
|
|
{
|
|
targetGroups: []targetgroup.Group{},
|
|
interval: 200 * time.Millisecond,
|
|
},
|
|
},
|
|
"tp3": {
|
|
{
|
|
targetGroups: []targetgroup.Group{},
|
|
interval: 100 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{},
|
|
{},
|
|
{},
|
|
},
|
|
},
|
|
{
|
|
title: "Single TP initials only",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
title: "Multiple TPs initials only",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"tp2": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
},
|
|
interval: 10 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
}, {
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
title: "Single TP initials followed by empty updates",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
interval: 0,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
},
|
|
interval: 10 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
title: "Single TP initials and new groups",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
interval: 0,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
{
|
|
Source: "tp1_group3",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
},
|
|
interval: 10 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
{
|
|
Source: "tp1_group3",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
title: "Multiple TPs initials and new groups",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
interval: 10 * time.Millisecond,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group3",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group4",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
},
|
|
interval: 500 * time.Millisecond,
|
|
},
|
|
},
|
|
"tp2": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "5"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "6"}},
|
|
},
|
|
},
|
|
interval: 100 * time.Millisecond,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp2_group3",
|
|
Targets: []model.LabelSet{{"__instance__": "7"}},
|
|
},
|
|
{
|
|
Source: "tp2_group4",
|
|
Targets: []model.LabelSet{{"__instance__": "8"}},
|
|
},
|
|
},
|
|
interval: 10 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "5"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "6"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "5"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "6"}},
|
|
},
|
|
{
|
|
Source: "tp2_group3",
|
|
Targets: []model.LabelSet{{"__instance__": "7"}},
|
|
},
|
|
{
|
|
Source: "tp2_group4",
|
|
Targets: []model.LabelSet{{"__instance__": "8"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
{
|
|
Source: "tp1_group3",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group4",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "5"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "6"}},
|
|
},
|
|
{
|
|
Source: "tp2_group3",
|
|
Targets: []model.LabelSet{{"__instance__": "7"}},
|
|
},
|
|
{
|
|
Source: "tp2_group4",
|
|
Targets: []model.LabelSet{{"__instance__": "8"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
title: "One TP initials arrive after other TP updates.",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
interval: 10 * time.Millisecond,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
},
|
|
interval: 150 * time.Millisecond,
|
|
},
|
|
},
|
|
"tp2": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "5"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "6"}},
|
|
},
|
|
},
|
|
interval: 200 * time.Millisecond,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "7"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "8"}},
|
|
},
|
|
},
|
|
interval: 100 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "5"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "6"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
{
|
|
Source: "tp2_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "7"}},
|
|
},
|
|
{
|
|
Source: "tp2_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "8"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
|
|
{
|
|
title: "Single TP empty update in between",
|
|
updates: map[string][]update{
|
|
"tp1": {
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
interval: 30 * time.Millisecond,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
},
|
|
interval: 10 * time.Millisecond,
|
|
},
|
|
{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
},
|
|
interval: 300 * time.Millisecond,
|
|
},
|
|
},
|
|
},
|
|
expectedTargets: [][]*targetgroup.Group{
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{},
|
|
},
|
|
},
|
|
{
|
|
{
|
|
Source: "tp1_group1",
|
|
Targets: []model.LabelSet{{"__instance__": "3"}},
|
|
},
|
|
{
|
|
Source: "tp1_group2",
|
|
Targets: []model.LabelSet{{"__instance__": "4"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for i, tc := range testCases {
|
|
t.Run(tc.title, func(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
|
|
var totalUpdatesCount int
|
|
for _, up := range tc.updates {
|
|
if len(up) > 0 {
|
|
totalUpdatesCount += len(up)
|
|
}
|
|
}
|
|
provUpdates := make(chan []*targetgroup.Group, totalUpdatesCount)
|
|
|
|
for _, up := range tc.updates {
|
|
go newMockDiscoveryProvider(up...).Run(ctx, provUpdates)
|
|
}
|
|
|
|
for x := 0; x < totalUpdatesCount; x++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("%d: no update arrived within the timeout limit", x)
|
|
case tgs := <-provUpdates:
|
|
discoveryManager.updateGroup(poolKey{setName: strconv.Itoa(i), provider: tc.title}, tgs)
|
|
for _, got := range discoveryManager.allGroups() {
|
|
assertEqualGroups(t, got, tc.expectedTargets[x])
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func assertEqualGroups(t *testing.T, got, expected []*targetgroup.Group) {
|
|
t.Helper()
|
|
|
|
// Need to sort by the groups's source as the received order is not guaranteed.
|
|
sort.Sort(byGroupSource(got))
|
|
sort.Sort(byGroupSource(expected))
|
|
|
|
require.Equal(t, expected, got)
|
|
}
|
|
|
|
func staticConfig(addrs ...string) StaticConfig {
|
|
var cfg StaticConfig
|
|
for i, addr := range addrs {
|
|
cfg = append(cfg, &targetgroup.Group{
|
|
Source: strconv.Itoa(i),
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: model.LabelValue(addr)},
|
|
},
|
|
})
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
func verifySyncedPresence(t *testing.T, tGroups map[string][]*targetgroup.Group, key, label string, present bool) {
|
|
t.Helper()
|
|
if _, ok := tGroups[key]; !ok {
|
|
t.Fatalf("'%s' should be present in Group map keys: %v", key, tGroups)
|
|
}
|
|
match := false
|
|
var mergedTargets string
|
|
for _, targetGroups := range tGroups[key] {
|
|
for _, l := range targetGroups.Targets {
|
|
mergedTargets = mergedTargets + " " + l.String()
|
|
if l.String() == label {
|
|
match = true
|
|
}
|
|
}
|
|
}
|
|
if match != present {
|
|
msg := ""
|
|
if !present {
|
|
msg = "not"
|
|
}
|
|
t.Fatalf("%q should %s be present in Group labels: %q", label, msg, mergedTargets)
|
|
}
|
|
}
|
|
|
|
func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) {
|
|
t.Helper()
|
|
_, ok := tSets[poolKey]
|
|
require.True(t, ok, "'%s' should be present in Pool keys: %v", poolKey, tSets)
|
|
|
|
match := false
|
|
var mergedTargets string
|
|
for _, targetGroup := range tSets[poolKey] {
|
|
for _, l := range targetGroup.Targets {
|
|
mergedTargets = mergedTargets + " " + l.String()
|
|
if l.String() == label {
|
|
match = true
|
|
}
|
|
}
|
|
}
|
|
if present {
|
|
require.Truef(t, match, "%q must be present in Targets labels: %q", label, mergedTargets)
|
|
} else {
|
|
require.Falsef(t, match, "%q must be absent in Targets labels: %q", label, mergedTargets)
|
|
}
|
|
}
|
|
|
|
func pk(provider, setName string, n int) poolKey {
|
|
return poolKey{
|
|
setName: setName,
|
|
provider: fmt.Sprintf("%s/%d", provider, n),
|
|
}
|
|
}
|
|
|
|
func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
updatert time.Duration
|
|
readTimeout time.Duration
|
|
expectedTargets int
|
|
}{
|
|
{
|
|
name: "startup wait with long interval times out",
|
|
updatert: 100 * time.Hour,
|
|
readTimeout: 10 * time.Millisecond,
|
|
},
|
|
{
|
|
name: "startup wait with short interval succeeds",
|
|
updatert: 10 * time.Millisecond,
|
|
readTimeout: 300 * time.Millisecond,
|
|
expectedTargets: 1,
|
|
},
|
|
{
|
|
name: "skip startup wait",
|
|
updatert: 100 * time.Hour,
|
|
readTimeout: 300 * time.Millisecond,
|
|
expectedTargets: 1,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
synctest.Test(t, func(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
opts := make([]func(*Manager), 0)
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, opts...)
|
|
require.NotNil(t, discoveryManager)
|
|
|
|
discoveryManager.updatert = tc.updatert
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
synctest.Wait()
|
|
|
|
timeout := time.After(tc.readTimeout)
|
|
var lastSyncedTargets map[string][]*targetgroup.Group
|
|
testFor:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
break testFor
|
|
case lastSyncedTargets = <-discoveryManager.SyncCh():
|
|
}
|
|
}
|
|
|
|
if tc.expectedTargets == 0 {
|
|
require.Nil(t, lastSyncedTargets)
|
|
return
|
|
}
|
|
|
|
require.Len(t, lastSyncedTargets, 1)
|
|
require.Len(t, lastSyncedTargets["prometheus"], tc.expectedTargets)
|
|
verifySyncedPresence(t, lastSyncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
}
|
|
|
|
func TestTargetSetTargetGroupsPresentOnConfigRename(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
|
|
c["prometheus2"] = c["prometheus"]
|
|
delete(c, "prometheus")
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
p = pk("static", "prometheus2", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus2"], 1)
|
|
}
|
|
|
|
func TestTargetSetTargetGroupsPresentOnConfigDuplicateAndDeleteOriginal(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
<-discoveryManager.SyncCh()
|
|
|
|
c["prometheus2"] = c["prometheus"]
|
|
discoveryManager.ApplyConfig(c)
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
require.Len(t, syncedTargets, 2)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus2"], 1)
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 2)
|
|
|
|
delete(c, "prometheus")
|
|
discoveryManager.ApplyConfig(c)
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
p = pk("static", "prometheus2", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus2"], 1)
|
|
}
|
|
|
|
func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
|
|
var mu sync.Mutex
|
|
c["prometheus2"] = Configs{
|
|
lockStaticConfig{
|
|
mu: &mu,
|
|
config: staticConfig("bar:9090"),
|
|
},
|
|
}
|
|
mu.Lock()
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
// Original targets should be present as soon as possible.
|
|
// An empty list should be sent for prometheus2 to drop any stale targets
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
mu.Unlock()
|
|
require.Len(t, syncedTargets, 2)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
require.Empty(t, syncedTargets["prometheus2"])
|
|
|
|
// prometheus2 configs should be ready on second sync.
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
require.Len(t, syncedTargets, 2)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"bar:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus2"], 1)
|
|
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
p = pk("lockstatic", "prometheus2", 1)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 2)
|
|
|
|
// Delete part of config and ensure only original targets exist.
|
|
delete(c, "prometheus2")
|
|
discoveryManager.ApplyConfig(c)
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
verifyPresence(t, discoveryManager.targets, pk("static", "prometheus", 0), "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
}
|
|
|
|
func TestTargetSetRecreatesTargetGroupsOnConfigChange(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090", "bar:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"bar:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 2)
|
|
|
|
c["prometheus"] = Configs{
|
|
staticConfig("foo:9090"),
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
p = pk("static", "prometheus", 1)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", false)
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
}
|
|
|
|
func TestDiscovererConfigs(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090", "bar:9090"),
|
|
staticConfig("baz:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"bar:9090\"}", true)
|
|
p = pk("static", "prometheus", 1)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"baz:9090\"}", true)
|
|
require.Len(t, discoveryManager.targets, 2)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"bar:9090\"}", true)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"baz:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 3)
|
|
}
|
|
|
|
// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after
|
|
// removing all targets from the static_configs cleans the corresponding targetGroups entries to avoid leaks and sends an empty update.
|
|
// The update is required to signal the consumers that the previous targets should be dropped.
|
|
func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
p := pk("static", "prometheus", 0)
|
|
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets, 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
|
|
c["prometheus"] = Configs{
|
|
StaticConfig{{}},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets = <-discoveryManager.SyncCh()
|
|
require.Len(t, discoveryManager.targets, 1)
|
|
p = pk("static", "prometheus", 1)
|
|
targetGroups, ok := discoveryManager.targets[p]
|
|
require.True(t, ok, "'%v' should be present in targets", p)
|
|
// Otherwise the targetGroups will leak, see https://github.com/prometheus/prometheus/issues/12436.
|
|
require.Empty(t, targetGroups, "'%v' should no longer have any associated target groups", p)
|
|
require.Len(t, syncedTargets, 1, "an update with no targetGroups should still be sent.")
|
|
require.Empty(t, syncedTargets["prometheus"])
|
|
}
|
|
|
|
func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, nil, reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
"prometheus2": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
syncedTargets := <-discoveryManager.SyncCh()
|
|
verifyPresence(t, discoveryManager.targets, pk("static", "prometheus", 0), "{__address__=\"foo:9090\"}", true)
|
|
verifyPresence(t, discoveryManager.targets, pk("static", "prometheus2", 0), "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, discoveryManager.providers, 1, "Invalid number of providers.")
|
|
require.Len(t, syncedTargets, 2)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus"], 1)
|
|
verifySyncedPresence(t, syncedTargets, "prometheus2", "{__address__=\"foo:9090\"}", true)
|
|
require.Len(t, syncedTargets["prometheus2"], 1)
|
|
}
|
|
|
|
func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) {
|
|
originalConfig := Configs{
|
|
staticConfig("foo:9090", "bar:9090", "baz:9090"),
|
|
}
|
|
processedConfig := Configs{
|
|
staticConfig("foo:9090", "bar:9090", "baz:9090"),
|
|
}
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
cfgs := map[string]Configs{
|
|
"prometheus": processedConfig,
|
|
}
|
|
discoveryManager.ApplyConfig(cfgs)
|
|
<-discoveryManager.SyncCh()
|
|
|
|
for _, cfg := range cfgs {
|
|
require.Equal(t, originalConfig, cfg)
|
|
}
|
|
}
|
|
|
|
type errorConfig struct{ err error }
|
|
|
|
func (errorConfig) Name() string { return "error" }
|
|
func (e errorConfig) NewDiscoverer(DiscovererOptions) (Discoverer, error) { return nil, e.err }
|
|
|
|
// NewDiscovererMetrics implements discovery.Config.
|
|
func (errorConfig) NewDiscovererMetrics(prometheus.Registerer, RefreshMetricsInstantiator) DiscovererMetrics {
|
|
return &NoopDiscovererMetrics{}
|
|
}
|
|
|
|
type lockStaticConfig struct {
|
|
mu *sync.Mutex
|
|
config StaticConfig
|
|
}
|
|
|
|
// NewDiscovererMetrics implements discovery.Config.
|
|
func (lockStaticConfig) NewDiscovererMetrics(prometheus.Registerer, RefreshMetricsInstantiator) DiscovererMetrics {
|
|
return &NoopDiscovererMetrics{}
|
|
}
|
|
|
|
func (lockStaticConfig) Name() string { return "lockstatic" }
|
|
func (s lockStaticConfig) NewDiscoverer(DiscovererOptions) (Discoverer, error) {
|
|
return (lockStaticDiscoverer)(s), nil
|
|
}
|
|
|
|
type lockStaticDiscoverer lockStaticConfig
|
|
|
|
func (s lockStaticDiscoverer) Run(ctx context.Context, up chan<- []*targetgroup.Group) {
|
|
// TODO: existing implementation closes up chan, but documentation explicitly forbids it...?
|
|
defer close(up)
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
case up <- s.config:
|
|
}
|
|
}
|
|
|
|
func TestGaugeFailedConfigs(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
errorConfig{errors.New("tests error 0")},
|
|
errorConfig{errors.New("tests error 1")},
|
|
errorConfig{errors.New("tests error 2")},
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
<-discoveryManager.SyncCh()
|
|
|
|
failedCount := client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs)
|
|
require.Equal(t, 3.0, failedCount, "Expected to have 3 failed configs.")
|
|
|
|
c["prometheus"] = Configs{
|
|
staticConfig("foo:9090"),
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
<-discoveryManager.SyncCh()
|
|
|
|
failedCount = client_testutil.ToFloat64(discoveryManager.metrics.FailedConfigs)
|
|
require.Equal(t, 0.0, failedCount, "Expected to get no failed config.")
|
|
}
|
|
|
|
func TestCoordinationWithReceiver(t *testing.T) {
|
|
updateDelay := 100 * time.Millisecond
|
|
|
|
type expect struct {
|
|
delay time.Duration
|
|
tgs map[string][]*targetgroup.Group
|
|
}
|
|
|
|
testCases := []struct {
|
|
title string
|
|
providers map[string]Discoverer
|
|
expected []expect
|
|
}{
|
|
{
|
|
title: "Receiver should get all updates even when one provider closes its channel",
|
|
providers: map[string]Discoverer{
|
|
"once1": &onceProvider{
|
|
tgs: []*targetgroup.Group{
|
|
{
|
|
Source: "tg1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
},
|
|
},
|
|
"mock1": newMockDiscoveryProvider(
|
|
update{
|
|
interval: 2 * updateDelay,
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tg2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
},
|
|
),
|
|
},
|
|
expected: []expect{
|
|
{
|
|
tgs: map[string][]*targetgroup.Group{
|
|
"once1": {
|
|
{
|
|
Source: "tg1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
},
|
|
"mock1": {},
|
|
},
|
|
},
|
|
{
|
|
tgs: map[string][]*targetgroup.Group{
|
|
"once1": {
|
|
{
|
|
Source: "tg1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
},
|
|
"mock1": {
|
|
{
|
|
Source: "tg2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
title: "Receiver should get all updates even when the channel is blocked",
|
|
providers: map[string]Discoverer{
|
|
"mock1": newMockDiscoveryProvider(
|
|
update{
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tg1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
},
|
|
},
|
|
update{
|
|
interval: 4 * updateDelay,
|
|
targetGroups: []targetgroup.Group{
|
|
{
|
|
Source: "tg2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
},
|
|
),
|
|
},
|
|
expected: []expect{
|
|
{
|
|
delay: 2 * updateDelay,
|
|
tgs: map[string][]*targetgroup.Group{
|
|
"mock1": {
|
|
{
|
|
Source: "tg1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
delay: 4 * updateDelay,
|
|
tgs: map[string][]*targetgroup.Group{
|
|
"mock1": {
|
|
{
|
|
Source: "tg1",
|
|
Targets: []model.LabelSet{{"__instance__": "1"}},
|
|
},
|
|
{
|
|
Source: "tg2",
|
|
Targets: []model.LabelSet{{"__instance__": "2"}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.title, func(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
mgr := NewManager(ctx, nil, reg, sdMetrics)
|
|
require.NotNil(t, mgr)
|
|
mgr.updatert = updateDelay
|
|
go mgr.Run()
|
|
|
|
for name, p := range tc.providers {
|
|
mgr.StartCustomProvider(ctx, name, p)
|
|
}
|
|
|
|
for i, expected := range tc.expected {
|
|
time.Sleep(expected.delay)
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("step %d: no update received in the expected timeframe", i)
|
|
case tgs, ok := <-mgr.SyncCh():
|
|
require.True(t, ok, "step %d: discovery manager channel is closed", i)
|
|
require.Len(t, tgs, len(expected.tgs), "step %d: targets mismatch", i)
|
|
|
|
for k := range expected.tgs {
|
|
_, ok := tgs[k]
|
|
require.True(t, ok, "step %d: target group not found: %s", i, k)
|
|
assertEqualGroups(t, tgs[k], expected.tgs[k])
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
type update struct {
|
|
targetGroups []targetgroup.Group
|
|
interval time.Duration
|
|
}
|
|
|
|
type mockdiscoveryProvider struct {
|
|
updates []update
|
|
}
|
|
|
|
func newMockDiscoveryProvider(updates ...update) mockdiscoveryProvider {
|
|
tp := mockdiscoveryProvider{
|
|
updates: updates,
|
|
}
|
|
return tp
|
|
}
|
|
|
|
func (tp mockdiscoveryProvider) Run(ctx context.Context, upCh chan<- []*targetgroup.Group) {
|
|
for _, u := range tp.updates {
|
|
if u.interval > 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(u.interval):
|
|
}
|
|
}
|
|
tgs := make([]*targetgroup.Group, len(u.targetGroups))
|
|
for i := range u.targetGroups {
|
|
tgs[i] = &u.targetGroups[i]
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case upCh <- tgs:
|
|
}
|
|
}
|
|
<-ctx.Done()
|
|
}
|
|
|
|
// byGroupSource implements sort.Interface so we can sort by the Source field.
|
|
type byGroupSource []*targetgroup.Group
|
|
|
|
func (a byGroupSource) Len() int { return len(a) }
|
|
func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source }
|
|
|
|
// onceProvider sends updates once (if any) and closes the update channel.
|
|
type onceProvider struct {
|
|
tgs []*targetgroup.Group
|
|
}
|
|
|
|
func (o onceProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) {
|
|
if len(o.tgs) > 0 {
|
|
ch <- o.tgs
|
|
}
|
|
close(ch)
|
|
}
|
|
|
|
// TestTargetSetTargetGroupsUpdateDuringApplyConfig is used to detect races when
|
|
// ApplyConfig happens at the same time as targets update.
|
|
func TestTargetSetTargetGroupsUpdateDuringApplyConfig(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
td := newTestDiscoverer()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
td,
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2000)
|
|
|
|
start := make(chan struct{})
|
|
for range 1000 {
|
|
go func() {
|
|
<-start
|
|
td.update([]*targetgroup.Group{
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{model.AddressLabel: model.LabelValue("127.0.0.1:9090")},
|
|
},
|
|
},
|
|
})
|
|
wg.Done()
|
|
}()
|
|
}
|
|
|
|
for i := range 1000 {
|
|
go func(i int) {
|
|
<-start
|
|
c := map[string]Configs{
|
|
fmt.Sprintf("prometheus-%d", i): {
|
|
td,
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
wg.Done()
|
|
}(i)
|
|
}
|
|
|
|
close(start)
|
|
wg.Wait()
|
|
}
|
|
|
|
// testDiscoverer is a config and a discoverer that can adjust targets with a
|
|
// simple function.
|
|
type testDiscoverer struct {
|
|
up chan<- []*targetgroup.Group
|
|
ready chan struct{}
|
|
}
|
|
|
|
func newTestDiscoverer() *testDiscoverer {
|
|
return &testDiscoverer{
|
|
ready: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// NewDiscovererMetrics implements discovery.Config.
|
|
func (*testDiscoverer) NewDiscovererMetrics(prometheus.Registerer, RefreshMetricsInstantiator) DiscovererMetrics {
|
|
return &NoopDiscovererMetrics{}
|
|
}
|
|
|
|
// Name implements Config.
|
|
func (*testDiscoverer) Name() string {
|
|
return "test"
|
|
}
|
|
|
|
// NewDiscoverer implements Config.
|
|
func (t *testDiscoverer) NewDiscoverer(DiscovererOptions) (Discoverer, error) {
|
|
return t, nil
|
|
}
|
|
|
|
// Run implements Discoverer.
|
|
func (t *testDiscoverer) Run(ctx context.Context, up chan<- []*targetgroup.Group) {
|
|
t.up = up
|
|
close(t.ready)
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (t *testDiscoverer) update(tgs []*targetgroup.Group) {
|
|
<-t.ready
|
|
t.up <- tgs
|
|
}
|
|
|
|
func TestUnregisterMetrics(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
// Check that all metrics can be unregistered, allowing a second manager to be created.
|
|
for range 2 {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
// discoveryManager will be nil if there was an error configuring metrics.
|
|
require.NotNil(t, discoveryManager)
|
|
// Unregister all metrics.
|
|
discoveryManager.UnregisterMetrics()
|
|
for _, sdMetric := range sdMetrics.MechanismMetrics {
|
|
sdMetric.Unregister()
|
|
}
|
|
sdMetrics.RefreshManager.Unregister()
|
|
cancel()
|
|
}
|
|
}
|
|
|
|
// Refresh and discovery metrics should be deleted for providers that are removed.
|
|
func TestMetricsCleanupAfterConfigReload(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090", "bar:9090"),
|
|
},
|
|
"other": {
|
|
staticConfig("baz:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
<-discoveryManager.SyncCh()
|
|
|
|
// Manually instantiate refresh metrics to make them visible
|
|
sdMetrics.RefreshManager.Instantiate("static", "prometheus").Failures.Add(0)
|
|
sdMetrics.RefreshManager.Instantiate("static", "other").Failures.Add(0)
|
|
|
|
count, err := client_testutil.GatherAndCount(reg, "prometheus_sd_discovered_targets")
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, count)
|
|
|
|
count, err = client_testutil.GatherAndCount(reg, "prometheus_sd_refresh_failures_total")
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, count)
|
|
|
|
// Simulate a config refresh.
|
|
delete(c, "prometheus")
|
|
discoveryManager.ApplyConfig(c)
|
|
<-discoveryManager.SyncCh()
|
|
|
|
// Ensure we still have metrics for the remaining provider.
|
|
count, err = client_testutil.GatherAndCount(reg, "prometheus_sd_discovered_targets")
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, count)
|
|
|
|
count, err = client_testutil.GatherAndCount(reg, "prometheus_sd_refresh_failures_total")
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, count)
|
|
}
|
|
|
|
// Calling ApplyConfig() that removes providers at the same time as shutting down
|
|
// the manager should not hang.
|
|
func TestConfigReloadAndShutdownRace(t *testing.T) {
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
mgrCtx, mgrCancel := context.WithCancel(context.Background())
|
|
discoveryManager := NewManager(mgrCtx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
|
|
var wgDiscovery sync.WaitGroup
|
|
wgDiscovery.Go(func() {
|
|
discoveryManager.Run()
|
|
})
|
|
time.Sleep(time.Millisecond * 200)
|
|
|
|
var wgBg sync.WaitGroup
|
|
updateChan := discoveryManager.SyncCh()
|
|
wgBg.Add(1)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go func() {
|
|
defer wgBg.Done()
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-updateChan:
|
|
}
|
|
}()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {staticConfig("bar:9090")},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
delete(c, "prometheus")
|
|
wgBg.Go(func() {
|
|
discoveryManager.ApplyConfig(c)
|
|
})
|
|
mgrCancel()
|
|
wgDiscovery.Wait()
|
|
|
|
cancel()
|
|
wgBg.Wait()
|
|
}
|
|
|
|
func TestGaugeLastUpdateTimestamp(t *testing.T) {
|
|
ctx := t.Context()
|
|
|
|
reg := prometheus.NewRegistry()
|
|
sdMetrics := NewTestMetrics(t, reg)
|
|
|
|
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
|
|
require.NotNil(t, discoveryManager)
|
|
discoveryManager.updatert = 100 * time.Millisecond
|
|
go discoveryManager.Run()
|
|
|
|
c := map[string]Configs{
|
|
"prometheus": {
|
|
staticConfig("foo:9090"),
|
|
},
|
|
}
|
|
discoveryManager.ApplyConfig(c)
|
|
|
|
before := time.Now()
|
|
<-discoveryManager.SyncCh()
|
|
after := time.Now()
|
|
|
|
ts := client_testutil.ToFloat64(discoveryManager.metrics.LastUpdated.WithLabelValues("prometheus"))
|
|
require.GreaterOrEqual(t, ts, float64(before.UnixNano())/1e9, "last update timestamp should be >= time before sync")
|
|
require.LessOrEqual(t, ts, float64(after.UnixNano())/1e9, "last update timestamp should be <= time after sync")
|
|
}
|