Add tests for staleness markers appended to TSDB when sample_limit is set

Signed-off-by: Lukasz Mierzwa <l.mierzwa@gmail.com>
This commit is contained in:
Lukasz Mierzwa
2025-04-14 15:42:19 +01:00
parent d902abc50d
commit 1f7a23cced

View File

@@ -5568,3 +5568,219 @@ func TestScrapeAppendWithParseError(t *testing.T) {
}
requireEqual(t, want, capp.resultFloats, "Appended samples not as expected:\n%s", capp)
}
// This test covers a case where there's a target with sample_limit set and the some of exporter samples
// changes between scrapes.
func TestScrapeLoopAppendSampleLimitWithDisappearingSeries(t *testing.T) {
const sampleLimit = 4
resApp := &collectResultAppender{}
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender {
return resApp
}, 0)
sl.sampleLimit = sampleLimit
now := time.Now()
slApp := sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append(
slApp,
// Start with 3 samples, all accepted.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 3, samplesScraped) // All on scrape.
require.Equal(t, 3, samplesAfterRelabel) // This is series after relabeling.
require.Equal(t, 3, createdSeries) // Newly added to TSDB.
want := []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: 1,
},
}
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
now = now.Add(time.Minute)
slApp = sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append(
slApp,
// Start exporting 3 more samples, so we're over the limit now.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\n"),
"text/plain",
now,
)
require.ErrorIs(t, err, errSampleLimit)
require.NoError(t, slApp.Rollback())
require.Equal(t, 6, samplesScraped)
require.Equal(t, 6, samplesAfterRelabel)
require.Equal(t, 1, createdSeries) // We've added one series before hitting the limit.
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
sl.cache.iterDone(false)
now = now.Add(time.Minute)
slApp = sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append(
slApp,
// Remove all samples except first 2.
[]byte("metric_a 1\nmetric_b 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 2, samplesScraped)
require.Equal(t, 2, samplesAfterRelabel)
require.Equal(t, 0, createdSeries)
// This is where important things happen. We should now see:
// - Appends for samples from metric_a & metric_b.
// - Append with stale markers for metric_c - this series was added during first scrape but disappeared during last scrape.
// - Append with stale marker for metric_d - this series was added during second scrape before we hit the sample_limit.
// We should NOT see:
// - Appends with stale markers for metric_e & metric_f - both over the limit during second scrape and so they never made it into TSDB.
want = append(want, []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_d"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
}...)
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
}
// This test covers a case where there's a target with sample_limit set and each scrape sees a completely
// different set of samples.
func TestScrapeLoopAppendSampleLimitReplaceAllSamples(t *testing.T) {
const sampleLimit = 4
resApp := &collectResultAppender{}
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender {
return resApp
}, 0)
sl.sampleLimit = sampleLimit
now := time.Now()
slApp := sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err := sl.append(
slApp,
// Start with 4 samples, all accepted.
[]byte("metric_a 1\nmetric_b 1\nmetric_c 1\nmetric_d 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 4, samplesScraped) // All on scrape.
require.Equal(t, 4, samplesAfterRelabel) // This is series after relabeling.
require.Equal(t, 4, createdSeries) // Newly added to TSDB.
want := []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_d"),
t: timestamp.FromTime(now),
f: 1,
},
}
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
now = now.Add(time.Minute)
slApp = sl.appender(context.Background())
samplesScraped, samplesAfterRelabel, createdSeries, err = sl.append(
slApp,
// Replace all samples with new time series.
[]byte("metric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h 1\n"),
"text/plain",
now,
)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
require.Equal(t, 4, samplesScraped)
require.Equal(t, 4, samplesAfterRelabel)
require.Equal(t, 4, createdSeries)
// We replaced all samples from first scrape with new set of samples.
// We expect to see:
// - 4 appends for new samples.
// - 4 appends with staleness markers for old samples.
want = append(want, []floatSample{
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_e"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_f"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_g"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_h"),
t: timestamp.FromTime(now),
f: 1,
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_a"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_b"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_c"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
{
metric: labels.FromStrings(model.MetricNameLabel, "metric_d"),
t: timestamp.FromTime(now),
f: math.Float64frombits(value.StaleNaN),
},
}...)
requireEqual(t, want, resApp.resultFloats, "Appended samples not as expected:\n%s", slApp)
}