mirror of
https://github.com/prometheus/prometheus
synced 2026-04-22 07:46:30 +08:00
* Adding scape on shutdown
Signed-off-by: avilevy <avilevy@google.com>
* scrape: replace skipOffsetting to make the test offset deterministic instead of skipping it entirely
Signed-off-by: avilevy <avilevy@google.com>
* renamed calculateScrapeOffset to getScrapeOffset
Signed-off-by: avilevy <avilevy@google.com>
* discovery: Add skipStartupWait to bypass initial discovery delay
In short-lived environments like agent mode or serverless, the
Prometheus process may only execute for a few seconds. Waiting for
the default 5-second `updatert` ticker before sending the first
target groups means the process could terminate before collecting
any metrics at all.
This commit adds a `skipStartupWait` option to the Discovery Manager
to bypass this initial delay. When enabled, the sender uses an
unthrottled startup loop that instantly forwards all triggers. This
ensures both the initial empty update from `ApplyConfig` and the
first real targets from discoverers are passed downstream immediately.
After the first ticker interval elapses, the sender cleanly breaks out
of the startup phase, resets the ticker, and resumes standard
operations.
Signed-off-by: avilevy <avilevy@google.com>
* scrape: Bypass initial reload delay for ScrapeOnShutdown
In short-lived environments like agent mode or serverless, the default
5-second `DiscoveryReloadInterval` can cause the process to terminate
before the scrape manager has a chance to process targets and collect
any metrics.
Because the discovery manager sends an initial empty update upon
configuration followed rapidly by the actual targets, simply waiting
for a single reload trigger is insufficient—the real targets would
still get trapped behind the ticker delay.
This commit introduces an unthrottled startup loop in the `reloader`
when `ScrapeOnShutdown` is enabled. It processes all incoming
`triggerReload` signals immediately during the first interval. Once
the initial tick fires, the `reloader` resets the ticker and falls
back into its standard throttled loop, ensuring short-lived processes
can discover and scrape targets instantly.
Signed-off-by: avilevy <avilevy@google.com>
* test(scrape): refactor time-based manager tests to use synctest
Addresses PR feedback to remove flaky, time-based sleeping in the scrape manager tests.
Add TestManager_InitialScrapeOffset and TestManager_ScrapeOnShutdown to use the testing/synctest package, completely eliminating real-world time.Sleep delays and making the assertions 100% deterministic.
- Replaced httptest.Server with net.Pipe and a custom startFakeHTTPServer helper to ensure all network I/O remains durably blocked inside the synctest bubble.
- Leveraged the skipOffsetting option to eliminate random scrape jitter, making the time-travel math exact and predictable.
- Using skipOffsetting also safely bypasses the global singleflight DNS lookup in setOffsetSeed, which previously caused cross-bubble panics in synctest.
- Extracted shared boilerplate into a setupSynctestManager helper to keep the test cases highly readable and data-driven.
Signed-off-by: avilevy <avilevy@google.com>
* Clarify use cases in InitialScrapeOffset comment
Signed-off-by: avilevy <avilevy@google.com>
* test(scrape): use httptest for mock server to respect context cancellation
- Replaced manual HTTP string formatting over `net.Pipe` with `httptest.NewUnstartedServer`.
- Implemented an in-memory `pipeListener` to allow the server to handle `net.Pipe` connections directly. This preserves `synctest` time isolation without opening real OS ports.
- Added explicit `r.Context().Done()` handling in the mock HTTP handler to properly simulate aborted requests and scrape timeouts.
- Validates that the request context remains active and is not prematurely cancelled during `ScrapeOnShutdown` scenarios.
- Renamed `skipOffsetting` to `skipJitterOffsetting`.
- Addressed other PR comments.
Signed-off-by: avilevy <avilevy@google.com>
* tmp
Signed-off-by: bwplotka <bwplotka@gmail.com>
* exp2
Signed-off-by: bwplotka <bwplotka@gmail.com>
* fix
Signed-off-by: bwplotka <bwplotka@gmail.com>
* scrape: fix scrapeOnShutdown context bug and refactor test helpers
The scrapeOnShutdown feature was failing during manager shutdown because
the scrape pool context was being cancelled before the final shutdown
scrapes could execute. Fix this by delaying context cancellation
in scrapePool.stop() until after all scrape loops have stopped.
In addition:
- Added test cases to verify scrapeOnShutdown works with InitialScrapeOffset.
- Refactored network test helper functions from manager_test.go to
helpers_test.go.
- Addressed other comments.
Signed-off-by: avilevy <avilevy@google.com>
* Update scrape/scrape.go
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
* feat(discovery): add SkipInitialWait to bypass initial startup delay
This adds a SkipInitialWait option to the discovery Manager, allowing consumers sensitive to startup latency to receive the first batch of discovered targets immediately instead of waiting for the updatert ticker.
To support this without breaking the immediate dropped target notifications introduced in #13147, ApplyConfig now uses a keep flag to only trigger immediate downstream syncs for obsolete or updated providers. This prevents sending premature empty target groups for brand-new providers on initial startup.
Additionally, the scrape manager's reloader loop is updated to process the initial triggerReload immediately, ensuring the end-to-end pipeline processes initial targets without artificial delays.
Signed-off-by: avilevy <avilevy@google.com>
* scrape: Add TestManagerReloader and refactor discovery triggerSync
Adds a new TestManagerReloader test suite using synctest to assert
behavior of target updates, discovery reload ticker intervals, and
ScrapeOnShutdown flags.
Updates setupSynctestManager to allow skipping initial config setup by
passing an interval of 0.
Also renames the 'keep' variable to 'triggerSync' in ApplyConfig inside
discovery/manager.go for clarity, and adds a descriptive comment.
Signed-off-by: avilevy <avilevy@google.com>
* feat(discovery,scrape): rename startup wait options and add DiscoveryReloadOnStartup
- discovery: Rename `SkipInitialWait` to `SkipStartupWait` for clarity.
- discovery: Pass `context.Context` to `flushUpdates` to handle cancellation and avoid leaks.
- scrape: Add `DiscoveryReloadOnStartup` to `Options` to decouple startup discovery from `ScrapeOnShutdown`.
- tests: Refactor `TestTargetSetTargetGroupsPresentOnStartup` and `TestManagerReloader` to use table-driven tests and `synctest` for better stability and coverage.
Signed-off-by: avilevy <avilevy@google.com>
* feat(discovery,scrape): importing changes proposed in 043d710
- Refactor sender to use exponential backoff
- Replaces `time.NewTicker` in `sender()` with an exponential backoff
to prevent panics on non-positive intervals and better throttle updates.
- Removes obsolete `skipStartupWait` logic.
- Refactors `setupSynctestManager` to use an explicit `initConfig` argument
Signed-off-by: avilevy <avilevy@google.com>
* fix: updating go mod
Signed-off-by: avilevy <avilevy@google.com>
* fixing merge
Signed-off-by: avilevy <avilevy@google.com>
* fixing issue: 2 variables but NewTestMetrics returns 1 value
Signed-off-by: avilevy <avilevy@google.com>
* Update discovery/manager.go
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
* Refactor setupSynctestManager initConfig into a separate function
Signed-off-by: avilevy <avilevy@google.com>
---------
Signed-off-by: avilevy <avilevy@google.com>
Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: avilevy18 <105948922+avilevy18@users.noreply.github.com>
Co-authored-by: bwplotka <bwplotka@gmail.com>
378 lines
11 KiB
Go
378 lines
11 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scrape
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
dto "github.com/prometheus/client_model/go"
|
|
config_util "github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"github.com/stretchr/testify/require"
|
|
"go.yaml.in/yaml/v2"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/pool"
|
|
"github.com/prometheus/prometheus/util/teststorage"
|
|
)
|
|
|
|
// For readability.
|
|
type sample = teststorage.Sample
|
|
|
|
type compatAppendable interface {
|
|
storage.Appendable
|
|
storage.AppendableV2
|
|
}
|
|
|
|
func withCtx(ctx context.Context) func(sl *scrapeLoop) {
|
|
return func(sl *scrapeLoop) {
|
|
sl.ctx = ctx
|
|
}
|
|
}
|
|
|
|
func withAppendable(app compatAppendable, appV2 bool) func(sl *scrapeLoop) {
|
|
return func(sl *scrapeLoop) {
|
|
sa := selectAppendable(app, appV2)
|
|
sl.appendable = sa.V1()
|
|
sl.appendableV2 = sa.V2()
|
|
}
|
|
}
|
|
|
|
// newTestScrapeLoop is the initial scrape loop for all tests.
|
|
// It returns scrapeLoop and mock scraper you can customize.
|
|
//
|
|
// It's recommended to use withXYZ functions for simple option customizations, e.g:
|
|
//
|
|
// sl, _ := newTestScrapeLoop(t, withCtx(customCtx))
|
|
//
|
|
// However, when changing more than one scrapeLoop options it's more readable to have one explicit opt function:
|
|
//
|
|
// ctx, cancel := context.WithCancel(t.Context())
|
|
// appTest := teststorage.NewAppendable()
|
|
// sl, scraper := newTestScrapeLoop(t, func(sl *scrapeLoop) {
|
|
// sl.ctx = ctx
|
|
// sl.appendableV2 = appTest
|
|
// // Since we're writing samples directly below we need to provide a protocol fallback.
|
|
// sl.fallbackScrapeProtocol = "text/plain"
|
|
// })
|
|
//
|
|
// NOTE: Try to NOT add more parameter to this function. Try to NOT add more
|
|
// newTestScrapeLoop-like constructors. It should be flexible enough with scrapeLoop
|
|
// used for initial options.
|
|
func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoop, scraper *testScraper) {
|
|
metrics := newTestScrapeMetrics(t)
|
|
sl := &scrapeLoop{
|
|
stopped: make(chan struct{}),
|
|
|
|
l: promslog.NewNopLogger(),
|
|
cache: newScrapeCache(metrics),
|
|
|
|
interval: 10 * time.Millisecond,
|
|
timeout: 1 * time.Hour,
|
|
sampleMutator: nopMutator,
|
|
reportSampleMutator: nopMutator,
|
|
buffers: pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) }),
|
|
metrics: metrics,
|
|
maxSchema: histogram.ExponentialSchemaMax,
|
|
honorTimestamps: true,
|
|
enableCompression: true,
|
|
validationScheme: model.UTF8Validation,
|
|
symbolTable: labels.NewSymbolTable(),
|
|
// Tests assume those features are enabled, unless explicitly turned off.
|
|
appendMetadataToWAL: true,
|
|
parseST: true,
|
|
}
|
|
for _, o := range opts {
|
|
o(sl)
|
|
}
|
|
|
|
if sl.appendable != nil && sl.appendableV2 != nil {
|
|
t.Fatal("select the appendable to use, both were passed, likely a bug")
|
|
}
|
|
|
|
// Validate user opts for convenience.
|
|
require.Nil(t, sl.parentCtx, "newTestScrapeLoop does not support injecting non-nil parent context")
|
|
require.Nil(t, sl.appenderCtx, "newTestScrapeLoop does not support injecting non-nil appender context")
|
|
require.Nil(t, sl.cancel, "newTestScrapeLoop does not support injecting custom cancel function")
|
|
require.Nil(t, sl.scraper, "newTestScrapeLoop does not support injecting scraper, it's mocked, use the returned scraper")
|
|
|
|
rootCtx := t.Context()
|
|
// Use sl.ctx for context injection.
|
|
// True contexts (sl.appenderCtx, sl.parentCtx, sl.ctx) are populated from it
|
|
if sl.ctx != nil {
|
|
rootCtx = sl.ctx
|
|
}
|
|
ctx, cancel := context.WithCancel(rootCtx)
|
|
sl.ctx = ctx
|
|
sl.cancel = cancel
|
|
sl.appenderCtx = rootCtx
|
|
sl.parentCtx = rootCtx
|
|
|
|
scraper = &testScraper{}
|
|
sl.scraper = scraper
|
|
return sl, scraper
|
|
}
|
|
|
|
func newTestScrapePool(t *testing.T, app compatAppendable, appV2 bool, injectNewLoop func(options scrapeLoopOptions) loop) *scrapePool {
|
|
sa := selectAppendable(app, appV2)
|
|
return &scrapePool{
|
|
ctx: t.Context(),
|
|
cancel: func() {},
|
|
logger: promslog.NewNopLogger(),
|
|
config: &config.ScrapeConfig{},
|
|
options: &Options{},
|
|
client: http.DefaultClient,
|
|
|
|
activeTargets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
injectTestNewLoop: injectNewLoop,
|
|
|
|
appendable: sa.V1(), appendableV2: sa.V2(),
|
|
|
|
symbolTable: labels.NewSymbolTable(),
|
|
metrics: newTestScrapeMetrics(t),
|
|
}
|
|
}
|
|
|
|
// protoMarshalDelimited marshals a MetricFamily into a delimited
|
|
// Prometheus proto exposition format bytes (known as `encoding=delimited`)
|
|
//
|
|
// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
|
|
func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte {
|
|
t.Helper()
|
|
|
|
protoBuf, err := proto.Marshal(mf)
|
|
require.NoError(t, err)
|
|
|
|
varintBuf := make([]byte, binary.MaxVarintLen32)
|
|
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
|
|
|
|
buf := &bytes.Buffer{}
|
|
buf.Write(varintBuf[:varintLength])
|
|
buf.Write(protoBuf)
|
|
return buf.Bytes()
|
|
}
|
|
|
|
type selectedAppendable struct {
|
|
useV2 bool
|
|
app compatAppendable
|
|
}
|
|
|
|
// V1 returns Appendable if V1 is selected, otherwise nil.
|
|
func (s selectedAppendable) V1() storage.Appendable {
|
|
if s.useV2 {
|
|
return nil
|
|
}
|
|
return s.app
|
|
}
|
|
|
|
// V2 returns AppendableV2 if V2 is selected, otherwise nil.
|
|
func (s selectedAppendable) V2() storage.AppendableV2 {
|
|
if !s.useV2 {
|
|
return nil
|
|
}
|
|
return s.app
|
|
}
|
|
|
|
// selectAppendable allows to specify which appendable callers should use when the struct
|
|
// implements both. This is how all callers are making the decision - if one appendable is nil, they
|
|
// take another. selectAppendable allows to inject nil to e.g. storage.AppendableV2 when appV2 is false.
|
|
func selectAppendable(app compatAppendable, appV2 bool) selectedAppendable {
|
|
s := selectedAppendable{
|
|
app: app,
|
|
useV2: appV2,
|
|
}
|
|
return s
|
|
}
|
|
|
|
func foreachAppendable(t *testing.T, f func(t *testing.T, appV2 bool)) {
|
|
for _, appV2 := range []bool{false, true} {
|
|
t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) {
|
|
f(t, appV2)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSelectAppendable(t *testing.T) {
|
|
var i int
|
|
foreachAppendable(t, func(t *testing.T, appV2 bool) {
|
|
defer func() { i++ }()
|
|
switch i {
|
|
case 0:
|
|
require.False(t, appV2)
|
|
|
|
s := selectAppendable(teststorage.NewAppendable(), appV2)
|
|
require.NotNil(t, s.V1())
|
|
require.Nil(t, s.V2())
|
|
case 1:
|
|
require.True(t, appV2)
|
|
|
|
s := selectAppendable(teststorage.NewAppendable(), appV2)
|
|
require.Nil(t, s.V1())
|
|
require.NotNil(t, s.V2())
|
|
default:
|
|
t.Fatal("too many iterations")
|
|
}
|
|
})
|
|
}
|
|
|
|
// pipeListener is an in-memory net.Listener that connects a custom DialContext
|
|
// directly to the httptest Server without opening real OS ports.
|
|
type pipeListener struct {
|
|
conns chan net.Conn
|
|
closed chan struct{}
|
|
once sync.Once
|
|
}
|
|
|
|
func newPipeListener() *pipeListener {
|
|
return &pipeListener{
|
|
conns: make(chan net.Conn),
|
|
closed: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (l *pipeListener) Accept() (net.Conn, error) {
|
|
select {
|
|
case c := <-l.conns:
|
|
return c, nil
|
|
case <-l.closed:
|
|
return nil, net.ErrClosed
|
|
}
|
|
}
|
|
|
|
func (l *pipeListener) Close() error {
|
|
l.once.Do(func() { close(l.closed) })
|
|
return nil
|
|
}
|
|
|
|
// Dummy Addr implementation to satisfy the net.Listener interface.
|
|
type pipeAddr struct{}
|
|
|
|
func (pipeAddr) Network() string { return "pipe" }
|
|
func (pipeAddr) String() string { return "pipe" }
|
|
func (*pipeListener) Addr() net.Addr { return pipeAddr{} }
|
|
|
|
// startFakeHTTPServer spins up a httptest.Server bound to an in-memory
|
|
// pipeListener. It returns the listener (to be wired to a custom dialer) and a
|
|
// cleanup function to shut down the server.
|
|
func startFakeHTTPServer(t *testing.T) (*pipeListener, func()) {
|
|
t.Helper()
|
|
|
|
listener := newPipeListener()
|
|
|
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
// Abort if the request context is canceled (e.g., due to a scrape timeout).
|
|
select {
|
|
case <-r.Context().Done():
|
|
return
|
|
default:
|
|
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
|
|
fmt.Fprintln(w, "expected_metric 1")
|
|
}
|
|
})
|
|
|
|
srv := httptest.NewUnstartedServer(handler)
|
|
srv.Listener = listener
|
|
|
|
// Background goroutines inherit the synctest bubble safely.
|
|
srv.Start()
|
|
|
|
return listener, srv.Close
|
|
}
|
|
|
|
// setupSynctestManager abstracts the boilerplate of creating a mock network,
|
|
// starting the fake HTTP server, and configuring the scrape manager for synctest.
|
|
func setupSynctestManager(t *testing.T, opts *Options) (*Manager, *teststorage.Appendable, func()) {
|
|
t.Helper()
|
|
app := teststorage.NewAppendable()
|
|
|
|
listener, cleanup := startFakeHTTPServer(t)
|
|
|
|
if opts == nil {
|
|
opts = &Options{}
|
|
}
|
|
opts.skipJitterOffsetting = true
|
|
|
|
// Ensure the scraper creates a new net.Pipe on every dial attempt
|
|
// and hands the server-side connection to the mock server's listener.
|
|
opts.HTTPClientOptions = []config_util.HTTPClientOption{
|
|
config_util.WithDialContextFunc(func(ctx context.Context, _, _ string) (net.Conn, error) {
|
|
srvConn, cliConn := net.Pipe()
|
|
|
|
select {
|
|
case listener.conns <- srvConn:
|
|
// Give the client side to the scraper.
|
|
return cliConn, nil
|
|
case <-listener.closed:
|
|
return nil, net.ErrClosed
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}),
|
|
}
|
|
|
|
scrapeManager, err := NewManager(
|
|
opts,
|
|
promslog.New(&promslog.Config{}),
|
|
nil, nil, app, prometheus.NewRegistry(),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
return scrapeManager, app, cleanup
|
|
}
|
|
|
|
func applyDefaultSynctestConfig(t *testing.T, scrapeManager *Manager, interval time.Duration) {
|
|
t.Helper()
|
|
|
|
cfg := &config.Config{
|
|
GlobalConfig: config.GlobalConfig{
|
|
ScrapeInterval: model.Duration(interval),
|
|
ScrapeTimeout: model.Duration(interval),
|
|
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto, config.OpenMetricsText1_0_0},
|
|
},
|
|
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
|
|
}
|
|
cfgText, err := yaml.Marshal(*cfg)
|
|
require.NoError(t, err)
|
|
cfg = loadConfiguration(t, string(cfgText))
|
|
require.NoError(t, scrapeManager.ApplyConfig(cfg))
|
|
|
|
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
|
|
"test": {{
|
|
Targets: []model.LabelSet{{
|
|
model.SchemeLabel: "http",
|
|
model.AddressLabel: "test.local",
|
|
}},
|
|
}},
|
|
})
|
|
|
|
scrapeManager.reload()
|
|
}
|