diff --git a/tsdb/db.go b/tsdb/db.go index 293ba646ea..5078f447c4 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1365,6 +1365,14 @@ func (db *DB) compactHead(head *RangeHead) error { func (db *DB) compactBlocks() (err error) { // Check for compactions of multiple blocks. for { + // If we have a lot of blocks to compact the whole process might take + // long enough that we end up with a HEAD block that needs to be written. + // Check if that's the case and stop compactions early. + if db.head.compactable() { + level.Warn(db.logger).Log("msg", "aborting block compactions to persit the head block") + return nil + } + plan, err := db.compactor.Plan(db.dir) if err != nil { return fmt.Errorf("plan compaction: %w", err) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 8b1ad106bf..ba60d83eaf 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6986,3 +6986,63 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) { prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat)), "number of ooo appended samples mismatch") } + +type mockCompactorFn struct { + planFn func() ([]string, error) + compactFn func() (ulid.ULID, error) + writeFn func() (ulid.ULID, error) +} + +func (c *mockCompactorFn) Plan(_ string) ([]string, error) { + return c.planFn() +} + +func (c *mockCompactorFn) Compact(_ string, _ []string, _ []*Block) (ulid.ULID, error) { + return c.compactFn() +} + +func (c *mockCompactorFn) Write(_ string, _ BlockReader, _, _ int64, _ *BlockMeta) (ulid.ULID, error) { + return c.writeFn() +} + +// Regression test for https://github.com/prometheus/prometheus/pull/13754 +func TestAbortBlockCompactions(t *testing.T) { + // Create a test DB + db := openTestDB(t, nil, nil) + defer func() { + require.NoError(t, db.Close()) + }() + // It should NOT be compactible at the beginning of the test + require.False(t, db.head.compactable(), "head should NOT be compactable") + + // Track the number of compactions run inside db.compactBlocks() + var compactions int + + // Use a mock compactor with custom Plan() implementation + db.compactor = &mockCompactorFn{ + planFn: func() ([]string, error) { + // On every Plan() run increment compactions. After 4 compactions + // update HEAD to make it compactible to force an exit from db.compactBlocks() loop. + compactions++ + if compactions > 3 { + chunkRange := db.head.chunkRange.Load() + db.head.minTime.Store(0) + db.head.maxTime.Store(chunkRange * 2) + require.True(t, db.head.compactable(), "head should be compactable") + } + // Our custom Plan() will always return something to compact. + return []string{"1", "2", "3"}, nil + }, + compactFn: func() (ulid.ULID, error) { + return ulid.ULID{}, nil + }, + writeFn: func() (ulid.ULID, error) { + return ulid.ULID{}, nil + }, + } + + err := db.Compact(context.Background()) + require.NoError(t, err) + require.True(t, db.head.compactable(), "head should be compactable") + require.Equal(t, 4, compactions, "expected 4 compactions to be completed") +}