mirror of
https://github.com/prometheus/prometheus
synced 2026-04-30 23:11:34 +08:00
Merge pull request #18102 from prometheus/bwplotka/rename-test
refactor: sed enableStStorage/enableSTStorage
This commit is contained in:
@@ -96,10 +96,10 @@ func TestCommit_AppendV2(t *testing.T) {
|
||||
numHistograms = 100
|
||||
numSeries = 8
|
||||
)
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.EnableSTStorage = enableStStorage
|
||||
opts.EnableSTStorage = enableSTStorage
|
||||
s := createTestAgentDB(t, nil, opts)
|
||||
|
||||
app := s.AppenderV2(context.TODO())
|
||||
@@ -196,7 +196,7 @@ func TestCommit_AppendV2(t *testing.T) {
|
||||
walSeriesCount += len(series)
|
||||
|
||||
case record.Samples:
|
||||
if enableStStorage {
|
||||
if enableSTStorage {
|
||||
t.Errorf("Got V1 Samples when ST enabled")
|
||||
}
|
||||
var samples []record.RefSample
|
||||
@@ -205,7 +205,7 @@ func TestCommit_AppendV2(t *testing.T) {
|
||||
walSamplesCount += len(samples)
|
||||
|
||||
case record.SamplesV2:
|
||||
if !enableStStorage {
|
||||
if !enableSTStorage {
|
||||
t.Errorf("Got V2 Samples when ST disabled")
|
||||
}
|
||||
var samples []record.RefSample
|
||||
@@ -256,9 +256,9 @@ func TestRollbackAppendV2(t *testing.T) {
|
||||
numSeries = 8
|
||||
)
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
opts := DefaultOptions()
|
||||
opts.EnableSTStorage = enableStStorage
|
||||
opts.EnableSTStorage = enableSTStorage
|
||||
s := createTestAgentDB(t, nil, opts)
|
||||
app := s.AppenderV2(context.TODO())
|
||||
|
||||
|
||||
@@ -968,18 +968,18 @@ func TestWALReplayRaceOnSamplesLoggedBeforeSeries_AppendV2(t *testing.T) {
|
||||
|
||||
// We test both with few and many samples appended after series creation. If samples are < 120 then there's no
|
||||
// mmap-ed chunk, otherwise there's at least 1 mmap-ed chunk when replaying the WAL.
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, numSamplesAfterSeriesCreation := range []int{1, 1000} {
|
||||
for run := 1; run <= numRuns; run++ {
|
||||
t.Run(fmt.Sprintf("samples after series creation = %d, run = %d, stStorage = %v", numSamplesAfterSeriesCreation, run, enableStStorage), func(t *testing.T) {
|
||||
testWALReplayRaceOnSamplesLoggedBeforeSeriesAppendV2(t, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation, enableStStorage)
|
||||
t.Run(fmt.Sprintf("samples after series creation = %d, run = %d, stStorage = %v", numSamplesAfterSeriesCreation, run, enableSTStorage), func(t *testing.T) {
|
||||
testWALReplayRaceOnSamplesLoggedBeforeSeriesAppendV2(t, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation, enableSTStorage)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testWALReplayRaceOnSamplesLoggedBeforeSeriesAppendV2(t *testing.T, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation int, enableStStorage bool) {
|
||||
func testWALReplayRaceOnSamplesLoggedBeforeSeriesAppendV2(t *testing.T, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation int, enableSTStorage bool) {
|
||||
const numSeries = 1000
|
||||
|
||||
db := newTestDB(t)
|
||||
@@ -987,7 +987,7 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeriesAppendV2(t *testing.T, numSampl
|
||||
|
||||
for seriesRef := 1; seriesRef <= numSeries; seriesRef++ {
|
||||
// Log samples before the series is logged to the WAL.
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
var samples []record.RefSample
|
||||
|
||||
for ts := range numSamplesBeforeSeriesCreation {
|
||||
@@ -1178,8 +1178,8 @@ func TestTombstoneCleanResultEmptyBlock_AppendV2(t *testing.T) {
|
||||
|
||||
func TestSizeRetention_AppendV2(t *testing.T) {
|
||||
t.Parallel()
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderTimeWindow = 100
|
||||
db := newTestDB(t, withOpts(opts), withRngs(100))
|
||||
@@ -1243,7 +1243,7 @@ func TestSizeRetention_AppendV2(t *testing.T) {
|
||||
// Create a WAL checkpoint, and compare sizes.
|
||||
first, last, err := wlog.Segments(db.Head().wal.Dir())
|
||||
require.NoError(t, err)
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef) bool { return false }, 0, enableStStorage)
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef) bool { return false }, 0, enableSTStorage)
|
||||
require.NoError(t, err)
|
||||
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
|
||||
walSize, err = db.Head().wal.Size()
|
||||
@@ -1506,15 +1506,15 @@ func TestInitializeHeadTimestamp_AppendV2(t *testing.T) {
|
||||
require.True(t, db.head.initialized())
|
||||
})
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("wal-only,stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("wal-only,stStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
err = w.Log(
|
||||
enc.Series([]record.RefSeries{
|
||||
{Ref: 123, Labels: labels.FromStrings("a", "1")},
|
||||
@@ -1546,8 +1546,8 @@ func TestInitializeHeadTimestamp_AppendV2(t *testing.T) {
|
||||
require.Equal(t, int64(2000), db.head.MaxTime())
|
||||
require.True(t, db.head.initialized())
|
||||
})
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("existing-block-and-wal,stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("existing-block-and-wal,stStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
|
||||
@@ -1556,7 +1556,7 @@ func TestInitializeHeadTimestamp_AppendV2(t *testing.T) {
|
||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
err = w.Log(
|
||||
enc.Series([]record.RefSeries{
|
||||
{Ref: 123, Labels: labels.FromStrings("a", "1")},
|
||||
@@ -3441,8 +3441,8 @@ func TestMetadataInWAL_AppenderV2(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMetadataCheckpointingOnlyKeepsLatestEntry_AppendV2(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
numSamples := 10000
|
||||
hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false)
|
||||
@@ -3519,7 +3519,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry_AppendV2(t *testing.T) {
|
||||
keep := func(id chunks.HeadSeriesRef) bool {
|
||||
return id != 3
|
||||
}
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0, enableStStorage)
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0, enableSTStorage)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Confirm there's been a checkpoint.
|
||||
|
||||
@@ -1170,25 +1170,25 @@ func TestWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T) {
|
||||
|
||||
// We test both with few and many samples appended after series creation. If samples are < 120 then there's no
|
||||
// mmap-ed chunk, otherwise there's at least 1 mmap-ed chunk when replaying the WAL.
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, numSamplesAfterSeriesCreation := range []int{1, 1000} {
|
||||
for run := 1; run <= numRuns; run++ {
|
||||
t.Run(fmt.Sprintf("samples after series creation = %d, run = %d, stStorage=%v", numSamplesAfterSeriesCreation, run, enableStStorage), func(t *testing.T) {
|
||||
testWALReplayRaceOnSamplesLoggedBeforeSeries(t, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation, enableStStorage)
|
||||
t.Run(fmt.Sprintf("samples after series creation = %d, run = %d, stStorage=%v", numSamplesAfterSeriesCreation, run, enableSTStorage), func(t *testing.T) {
|
||||
testWALReplayRaceOnSamplesLoggedBeforeSeries(t, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation, enableSTStorage)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation int, enableStStorage bool) {
|
||||
func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBeforeSeriesCreation, numSamplesAfterSeriesCreation int, enableSTStorage bool) {
|
||||
const numSeries = 1000
|
||||
db := newTestDB(t)
|
||||
db.DisableCompactions()
|
||||
|
||||
for seriesRef := 1; seriesRef <= numSeries; seriesRef++ {
|
||||
// Log samples before the series is logged to the WAL.
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
var samples []record.RefSample
|
||||
|
||||
for ts := range numSamplesBeforeSeriesCreation {
|
||||
@@ -1552,8 +1552,8 @@ func TestRetentionDurationMetric(t *testing.T) {
|
||||
|
||||
func TestSizeRetention(t *testing.T) {
|
||||
t.Parallel()
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderTimeWindow = 100
|
||||
db := newTestDB(t, withOpts(opts), withRngs(100))
|
||||
@@ -1617,7 +1617,7 @@ func TestSizeRetention(t *testing.T) {
|
||||
// Create a WAL checkpoint, and compare sizes.
|
||||
first, last, err := wlog.Segments(db.Head().wal.Dir())
|
||||
require.NoError(t, err)
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef) bool { return false }, 0, enableStStorage)
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef) bool { return false }, 0, enableSTStorage)
|
||||
require.NoError(t, err)
|
||||
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
|
||||
walSize, err = db.Head().wal.Size()
|
||||
@@ -2078,15 +2078,15 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||
require.True(t, db.head.initialized())
|
||||
})
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("wal-only-st-"+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("wal-only-st-"+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
|
||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
err = w.Log(
|
||||
enc.Series([]record.RefSeries{
|
||||
{Ref: 123, Labels: labels.FromStrings("a", "1")},
|
||||
@@ -2119,8 +2119,8 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||
require.True(t, db.head.initialized())
|
||||
})
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("existing-block-and-wal,enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("existing-block-and-wal,enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
|
||||
@@ -2129,7 +2129,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
|
||||
w, err := wlog.New(nil, nil, path.Join(dir, "wal"), compression.None)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
err = w.Log(
|
||||
enc.Series([]record.RefSeries{
|
||||
{Ref: 123, Labels: labels.FromStrings("a", "1")},
|
||||
@@ -4703,8 +4703,8 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
numSamples := 10000
|
||||
hb, w := newTestHead(t, int64(numSamples)*10, compression.None, false)
|
||||
@@ -4771,7 +4771,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
|
||||
keep := func(id chunks.HeadSeriesRef) bool {
|
||||
return id != 3
|
||||
}
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0, enableStStorage)
|
||||
_, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0, enableSTStorage)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Confirm there's been a checkpoint.
|
||||
|
||||
@@ -1867,8 +1867,8 @@ func TestHistogramInWALAndMmapChunk_AppenderV2(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestChunkSnapshot_AppenderV2(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
head, _ := newTestHead(t, 120*4, compression.None, false)
|
||||
defer func() {
|
||||
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||
@@ -2017,7 +2017,7 @@ func TestChunkSnapshot_AppenderV2(t *testing.T) {
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Add some tombstones.
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
for i := 1; i <= numSeries; i++ {
|
||||
ref := storage.SeriesRef(i)
|
||||
itvs := tombstones.Intervals{
|
||||
@@ -2095,7 +2095,7 @@ func TestChunkSnapshot_AppenderV2(t *testing.T) {
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Add more tombstones.
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
for i := 1; i <= numSeries; i++ {
|
||||
ref := storage.SeriesRef(i)
|
||||
itvs := tombstones.Intervals{
|
||||
|
||||
@@ -256,7 +256,7 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
// Rough estimates of most common % of samples that have an exemplar for each scrape.
|
||||
exemplarsPercentages := []float64{0, 0.5, 1, 5}
|
||||
lastExemplarsPerSeries := -1
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, c := range cases {
|
||||
missingSeriesPercentages := []float64{0, 0.1}
|
||||
for _, missingSeriesPct := range missingSeriesPercentages {
|
||||
@@ -268,7 +268,7 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
continue
|
||||
}
|
||||
lastExemplarsPerSeries = exemplarsPerSeries
|
||||
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d,missingSeriesPct=%.3f,stStorage=%v", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct, enableStStorage),
|
||||
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d,missingSeriesPct=%.3f,stStorage=%v", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct, enableSTStorage),
|
||||
func(b *testing.B) {
|
||||
dir := b.TempDir()
|
||||
|
||||
@@ -307,7 +307,7 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
writeSeries = newWriteSeries
|
||||
}
|
||||
|
||||
buf = populateTestWL(b, wal, []any{writeSeries}, buf, enableStStorage)
|
||||
buf = populateTestWL(b, wal, []any{writeSeries}, buf, enableSTStorage)
|
||||
}
|
||||
|
||||
// Write samples.
|
||||
@@ -333,7 +333,7 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
V: float64(i) * 100,
|
||||
})
|
||||
}
|
||||
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableStStorage)
|
||||
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableSTStorage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,7 +372,7 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
|
||||
})
|
||||
}
|
||||
buf = populateTestWL(b, wal, []any{refExemplars}, buf, enableStStorage)
|
||||
buf = populateTestWL(b, wal, []any{refExemplars}, buf, enableSTStorage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -401,10 +401,10 @@ func BenchmarkLoadWLs(b *testing.B) {
|
||||
})
|
||||
}
|
||||
if shouldAddMarkers {
|
||||
populateTestWL(b, wbl, []any{refMarkers}, buf, enableStStorage)
|
||||
populateTestWL(b, wbl, []any{refMarkers}, buf, enableSTStorage)
|
||||
}
|
||||
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableStStorage)
|
||||
buf = populateTestWL(b, wbl, []any{refSamples}, buf, enableStStorage)
|
||||
buf = populateTestWL(b, wal, []any{refSamples}, buf, enableSTStorage)
|
||||
buf = populateTestWL(b, wbl, []any{refSamples}, buf, enableSTStorage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -713,9 +713,9 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHead_ReadWAL(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
entries := []any{
|
||||
[]record.RefSeries{
|
||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||
@@ -756,7 +756,7 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||
|
||||
head, w := newTestHead(t, 1000, compress, false)
|
||||
|
||||
populateTestWL(t, w, entries, nil, enableStStorage)
|
||||
populateTestWL(t, w, entries, nil, enableSTStorage)
|
||||
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
require.Equal(t, uint64(101), head.lastSeriesID.Load())
|
||||
@@ -1103,11 +1103,11 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name+",stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
t.Run(tc.name+",stStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
h, w := newTestHead(t, 1000, compression.None, false)
|
||||
populateTestWL(t, w, tc.walEntries, nil, enableStStorage)
|
||||
populateTestWL(t, w, tc.walEntries, nil, enableSTStorage)
|
||||
first, _, err := wlog.Segments(w.Dir())
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -1690,9 +1690,9 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
entries := []any{
|
||||
[]record.RefSeries{
|
||||
{Ref: 10, Labels: labels.FromStrings("a", "1")},
|
||||
@@ -1708,7 +1708,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
||||
}
|
||||
head, w := newTestHead(t, 1000, compress, false)
|
||||
|
||||
populateTestWL(t, w, entries, nil, enableStStorage)
|
||||
populateTestWL(t, w, entries, nil, enableSTStorage)
|
||||
|
||||
require.NoError(t, head.Init(math.MinInt64))
|
||||
|
||||
@@ -2575,8 +2575,8 @@ func TestHead_ReturnsSortedLabelValues(t *testing.T) {
|
||||
// TestWalRepair_DecodingError ensures that a repair is run for an error
|
||||
// when decoding a record.
|
||||
func TestWalRepair_DecodingError(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
for name, test := range map[string]struct {
|
||||
corrFunc func(rec []byte) []byte // Func that applies the corruption to a record.
|
||||
rec []byte
|
||||
@@ -2609,7 +2609,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} {
|
||||
t.Run(fmt.Sprintf("%s,compress=%s,stStorage=%v", name, compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("%s,compress=%s,stStorage=%v", name, compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
// Fill the wal and corrupt it.
|
||||
@@ -2672,9 +2672,9 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
||||
// TestWblRepair_DecodingError ensures that a repair is run for an error
|
||||
// when decoding a record.
|
||||
func TestWblRepair_DecodingError(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
corrFunc := func(rec []byte) []byte {
|
||||
return rec[:3]
|
||||
}
|
||||
@@ -4378,8 +4378,8 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestChunkSnapshot(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
head, _ := newTestHead(t, 120*4, compression.None, false)
|
||||
defer func() {
|
||||
head.opts.EnableMemorySnapshotOnShutdown = false
|
||||
@@ -4525,7 +4525,7 @@ func TestChunkSnapshot(t *testing.T) {
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Add some tombstones.
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
for i := 1; i <= numSeries; i++ {
|
||||
ref := storage.SeriesRef(i)
|
||||
itvs := tombstones.Intervals{
|
||||
@@ -4599,7 +4599,7 @@ func TestChunkSnapshot(t *testing.T) {
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Add more tombstones.
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
for i := 1; i <= numSeries; i++ {
|
||||
ref := storage.SeriesRef(i)
|
||||
itvs := tombstones.Intervals{
|
||||
@@ -5392,8 +5392,8 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
||||
|
||||
// Tests https://github.com/prometheus/prometheus/issues/9725.
|
||||
func TestChunkSnapshotReplayBug(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
|
||||
require.NoError(t, err)
|
||||
@@ -5418,7 +5418,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
|
||||
}
|
||||
// Add a sample so that the series is not garbage collected.
|
||||
samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000}
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
|
||||
rec := enc.Series([]record.RefSeries{seriesRec}, buf)
|
||||
buf = rec[:0]
|
||||
|
||||
@@ -34,7 +34,7 @@ func zeroOutSTs(samples []record.RefSample) []record.RefSample {
|
||||
}
|
||||
|
||||
func TestEncodeDecode(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, tcase := range []testrecord.RefSamplesCase{
|
||||
testrecord.Realistic1000Samples,
|
||||
testrecord.Realistic1000WithVariableSTSamples,
|
||||
@@ -45,7 +45,7 @@ func TestEncodeDecode(t *testing.T) {
|
||||
var (
|
||||
dec record.Decoder
|
||||
buf []byte
|
||||
enc = record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc = record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
)
|
||||
|
||||
s := testrecord.GenTestRefSamplesCase(t, tcase)
|
||||
@@ -55,7 +55,7 @@ func TestEncodeDecode(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
// if ST is off, we expect all STs to be zero
|
||||
expected := s
|
||||
if !enableStStorage {
|
||||
if !enableSTStorage {
|
||||
expected = zeroOutSTs(s)
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ func TestEncodeDecode(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := s
|
||||
if !enableStStorage {
|
||||
if !enableSTStorage {
|
||||
expected = zeroOutSTs(s)
|
||||
}
|
||||
require.Equal(t, expected, got)
|
||||
@@ -81,7 +81,7 @@ func TestEncodeDecode(t *testing.T) {
|
||||
got, err := dec.Samples(enc.Samples(s, nil), samples)
|
||||
require.NoError(t, err)
|
||||
expected := s
|
||||
if !enableStStorage {
|
||||
if !enableSTStorage {
|
||||
expected = zeroOutSTs(s)
|
||||
}
|
||||
require.Equal(t, expected, got)
|
||||
@@ -102,7 +102,7 @@ func TestEncodeDecode(t *testing.T) {
|
||||
got, err := dec.Samples(buf, nil)
|
||||
require.NoError(t, err)
|
||||
expected := s
|
||||
if !enableStStorage {
|
||||
if !enableSTStorage {
|
||||
expected = zeroOutSTs(s)
|
||||
}
|
||||
require.Equal(t, expected, got)
|
||||
|
||||
@@ -275,10 +275,10 @@ func TestRecord_EncodeDecode(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRecord_DecodeInvalidHistogramSchema(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, schema := range []int32{-100, 100} {
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableStStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableStStorage}
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableSTStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableSTStorage}
|
||||
|
||||
var output bytes.Buffer
|
||||
logger := promslog.New(&promslog.Config{Writer: &output})
|
||||
@@ -312,10 +312,10 @@ func TestRecord_DecodeInvalidHistogramSchema(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRecord_DecodeInvalidFloatHistogramSchema(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, schema := range []int32{-100, 100} {
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableStStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableStStorage}
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableSTStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableSTStorage}
|
||||
|
||||
var output bytes.Buffer
|
||||
logger := promslog.New(&promslog.Config{Writer: &output})
|
||||
@@ -349,10 +349,10 @@ func TestRecord_DecodeInvalidFloatHistogramSchema(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRecord_DecodeTooHighResolutionHistogramSchema(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, schema := range []int32{9, 52} {
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableStStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableStStorage}
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableSTStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableSTStorage}
|
||||
|
||||
var output bytes.Buffer
|
||||
logger := promslog.New(&promslog.Config{Writer: &output})
|
||||
@@ -386,10 +386,10 @@ func TestRecord_DecodeTooHighResolutionHistogramSchema(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRecord_DecodeTooHighResolutionFloatHistogramSchema(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, schema := range []int32{9, 52} {
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableStStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableStStorage}
|
||||
t.Run(fmt.Sprintf("schema=%d,stStorage=%v", schema, enableSTStorage), func(t *testing.T) {
|
||||
enc := Encoder{EnableSTStorage: enableSTStorage}
|
||||
|
||||
var output bytes.Buffer
|
||||
logger := promslog.New(&promslog.Config{Writer: &output})
|
||||
@@ -425,8 +425,8 @@ func TestRecord_DecodeTooHighResolutionFloatHistogramSchema(t *testing.T) {
|
||||
// TestRecord_Corrupted ensures that corrupted records return the correct error.
|
||||
// Bugfix check for pull/521 and pull/523.
|
||||
func TestRecord_Corrupted(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
enc := Encoder{EnableSTStorage: enableStStorage}
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
enc := Encoder{EnableSTStorage: enableSTStorage}
|
||||
dec := NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger())
|
||||
|
||||
t.Run("Test corrupted series record", func(t *testing.T) {
|
||||
@@ -784,13 +784,13 @@ func BenchmarkWAL_HistogramEncoding(b *testing.B) {
|
||||
make: initNHCBRefs,
|
||||
},
|
||||
} {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, labelCount := range []int{0, 10, 50} {
|
||||
for _, histograms := range []int{10, 100, 1000} {
|
||||
for _, buckets := range []int{0, 1, 10, 100} {
|
||||
b.Run(fmt.Sprintf("type=%s/labels=%d/histograms=%d/buckets=%d", maker.name, labelCount, histograms, buckets), func(b *testing.B) {
|
||||
series, samples, nhcbs := maker.make(labelCount, histograms, buckets)
|
||||
enc := Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := Encoder{EnableSTStorage: enableSTStorage}
|
||||
for b.Loop() {
|
||||
var buf []byte
|
||||
enc.Series(series, buf)
|
||||
|
||||
@@ -92,7 +92,7 @@ const CheckpointPrefix = "checkpoint."
|
||||
// segmented format as the original WAL itself.
|
||||
// This makes it easy to read it through the WAL package and concatenate
|
||||
// it with the original WAL.
|
||||
func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64, enableStStorage bool) (*CheckpointStats, error) {
|
||||
func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64, enableSTStorage bool) (*CheckpointStats, error) {
|
||||
stats := &CheckpointStats{}
|
||||
var sgmReader io.ReadCloser
|
||||
|
||||
@@ -156,7 +156,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
|
||||
metadata []record.RefMetadata
|
||||
st = labels.NewSymbolTable() // Needed for decoding; labels do not outlive this function.
|
||||
dec = record.NewDecoder(st, logger)
|
||||
enc = record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc = record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
buf []byte
|
||||
recs [][]byte
|
||||
|
||||
|
||||
@@ -171,12 +171,12 @@ func TestCheckpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range compression.Types() {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
// Create a dummy segment to bump the initial number.
|
||||
seg, err := CreateSegment(dir, 100)
|
||||
require.NoError(t, err)
|
||||
@@ -295,7 +295,7 @@ func TestCheckpoint(t *testing.T) {
|
||||
|
||||
stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool {
|
||||
return x%2 == 0
|
||||
}, last/2, enableStStorage)
|
||||
}, last/2, enableSTStorage)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Truncate(107))
|
||||
require.NoError(t, DeleteCheckpoints(w.Dir(), 106))
|
||||
@@ -386,13 +386,13 @@ func TestCheckpoint(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) {
|
||||
// Create a new wlog with invalid data.
|
||||
dir := t.TempDir()
|
||||
w, err := NewSize(nil, nil, dir, 64*1024, compression.None)
|
||||
require.NoError(t, err)
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
require.NoError(t, w.Log(enc.Series([]record.RefSeries{
|
||||
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "2")},
|
||||
}, nil)))
|
||||
@@ -406,7 +406,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
|
||||
require.NoError(t, f.Close())
|
||||
|
||||
// Run the checkpoint and since the wlog contains corrupt data this should return an error.
|
||||
_, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1, nil, 0, enableStStorage)
|
||||
_, err = Checkpoint(promslog.NewNopLogger(), w, 0, 1, nil, 0, enableSTStorage)
|
||||
require.Error(t, err)
|
||||
|
||||
// Walk the wlog dir to make sure there are no tmp folder left behind after the error.
|
||||
|
||||
@@ -145,8 +145,8 @@ func TestTailSamples(t *testing.T) {
|
||||
const exemplarsCount = 25
|
||||
const histogramsCount = 50
|
||||
for _, compress := range compression.Types() {
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
dir := t.TempDir()
|
||||
@@ -155,7 +155,7 @@ func TestTailSamples(t *testing.T) {
|
||||
err := os.Mkdir(wdir, 0o777)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
@@ -294,9 +294,9 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
|
||||
const seriesCount = 10
|
||||
const samplesCount = 250
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range compression.Types() {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
wdir := path.Join(dir, "wal")
|
||||
err := os.Mkdir(wdir, 0o777)
|
||||
@@ -310,7 +310,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
|
||||
|
||||
var recs [][]byte
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
|
||||
for i := range seriesCount {
|
||||
series := enc.Series([]record.RefSeries{
|
||||
@@ -364,16 +364,16 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
|
||||
const seriesCount = 10
|
||||
const samplesCount = 250
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range compression.Types() {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
wdir := path.Join(dir, "wal")
|
||||
err := os.Mkdir(wdir, 0o777)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
w, err := NewSize(nil, nil, wdir, segmentSize, compress)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
@@ -406,7 +406,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef) bool { return true }, 0, enableStStorage)
|
||||
Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef) bool { return true }, 0, enableSTStorage)
|
||||
w.Truncate(1)
|
||||
|
||||
// Write more records after checkpointing.
|
||||
@@ -454,9 +454,9 @@ func TestReadCheckpoint(t *testing.T) {
|
||||
const seriesCount = 10
|
||||
const samplesCount = 250
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range compression.Types() {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
wdir := path.Join(dir, "wal")
|
||||
@@ -467,7 +467,7 @@ func TestReadCheckpoint(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
@@ -499,7 +499,7 @@ func TestReadCheckpoint(t *testing.T) {
|
||||
}
|
||||
_, err = w.NextSegmentSync()
|
||||
require.NoError(t, err)
|
||||
_, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef) bool { return true }, 0, enableStStorage)
|
||||
_, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef) bool { return true }, 0, enableSTStorage)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Truncate(32))
|
||||
|
||||
@@ -529,16 +529,16 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
|
||||
const seriesCount = 40
|
||||
const samplesCount = 500
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range compression.Types() {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
wdir := path.Join(dir, "wal")
|
||||
err := os.Mkdir(wdir, 0o777)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
w, err := NewSize(nil, nil, wdir, pageSize, compress)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -603,26 +603,26 @@ func TestCheckpointSeriesReset(t *testing.T) {
|
||||
const samplesCount = 700
|
||||
testCases := []struct {
|
||||
compress compression.Type
|
||||
enableStStorage bool
|
||||
enableSTStorage bool
|
||||
segments int
|
||||
}{
|
||||
{compress: compression.None, enableStStorage: false, segments: 24},
|
||||
{compress: compression.Snappy, enableStStorage: false, segments: 23},
|
||||
{compress: compression.None, enableStStorage: true, segments: 20},
|
||||
{compress: compression.Snappy, enableStStorage: true, segments: 20},
|
||||
{compress: compression.None, enableSTStorage: false, segments: 24},
|
||||
{compress: compression.Snappy, enableSTStorage: false, segments: 23},
|
||||
{compress: compression.None, enableSTStorage: true, segments: 20},
|
||||
{compress: compression.Snappy, enableSTStorage: true, segments: 20},
|
||||
}
|
||||
|
||||
dir := t.TempDir()
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", tc.compress, tc.enableStStorage), func(t *testing.T) {
|
||||
subdir := filepath.Join(dir, fmt.Sprintf("%s-%v", tc.compress, tc.enableStStorage))
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", tc.compress, tc.enableSTStorage), func(t *testing.T) {
|
||||
subdir := filepath.Join(dir, fmt.Sprintf("%s-%v", tc.compress, tc.enableSTStorage))
|
||||
err := os.MkdirAll(subdir, 0o777)
|
||||
require.NoError(t, err)
|
||||
wdir := filepath.Join(subdir, "wal")
|
||||
err = os.MkdirAll(wdir, 0o777)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: tc.enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: tc.enableSTStorage}
|
||||
w, err := NewSize(nil, nil, wdir, segmentSize, tc.compress)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
@@ -698,16 +698,16 @@ func TestRun_StartupTime(t *testing.T) {
|
||||
const seriesCount = 40
|
||||
const samplesCount = 500
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range compression.Types() {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
wdir := path.Join(dir, "wal")
|
||||
err := os.Mkdir(wdir, 0o777)
|
||||
require.NoError(t, err)
|
||||
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
w, err := NewSize(nil, nil, wdir, pageSize, compress)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -752,8 +752,8 @@ func TestRun_StartupTime(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func generateWALRecords(w *WL, segment, seriesCount, samplesCount int, enableStStorage bool) error {
|
||||
enc := record.Encoder{EnableSTStorage: enableStStorage}
|
||||
func generateWALRecords(w *WL, segment, seriesCount, samplesCount int, enableSTStorage bool) error {
|
||||
enc := record.Encoder{EnableSTStorage: enableSTStorage}
|
||||
for j := range seriesCount {
|
||||
ref := j + (segment * 100)
|
||||
series := enc.Series([]record.RefSeries{
|
||||
@@ -793,9 +793,9 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
|
||||
const seriesCount = 10
|
||||
const samplesCount = 50
|
||||
|
||||
for _, enableStStorage := range []bool{false, true} {
|
||||
for _, enableSTStorage := range []bool{false, true} {
|
||||
for _, compress := range compression.Types() {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
wdir := path.Join(dir, "wal")
|
||||
@@ -805,7 +805,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
|
||||
w, err := NewSize(nil, nil, wdir, segmentSize, compress)
|
||||
require.NoError(t, err)
|
||||
// Write to 00000000, the watcher will read series from it.
|
||||
require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount, enableStStorage))
|
||||
require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount, enableSTStorage))
|
||||
// Create 00000001, the watcher will tail it once started.
|
||||
w.NextSegment()
|
||||
|
||||
@@ -838,7 +838,7 @@ func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
|
||||
// In the meantime, add some new segments in bulk.
|
||||
// We should end up with segmentsToWrite + 1 segments now.
|
||||
for i := 1; i < segmentsToWrite; i++ {
|
||||
require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount, enableStStorage))
|
||||
require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount, enableSTStorage))
|
||||
w.NextSegment()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user