mirror of
https://github.com/prometheus/prometheus
synced 2026-04-20 22:41:05 +08:00
Add xor2-encoding feature flag
Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
This commit is contained in:
@@ -191,10 +191,10 @@ func (v ValueType) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
func (v ValueType) ChunkEncoding(storeST bool) Encoding {
|
||||
func (v ValueType) ChunkEncoding(useXOR2 bool) Encoding {
|
||||
switch v {
|
||||
case ValFloat:
|
||||
if storeST {
|
||||
if useXOR2 {
|
||||
return EncXOR2
|
||||
}
|
||||
return EncXOR
|
||||
@@ -207,10 +207,12 @@ func (v ValueType) ChunkEncoding(storeST bool) Encoding {
|
||||
}
|
||||
}
|
||||
|
||||
func (v ValueType) NewChunk(storeST bool) (Chunk, error) {
|
||||
return NewEmptyChunk(v.ChunkEncoding(storeST))
|
||||
// NewChunk returns a new empty chunk for the given value type.
|
||||
func (v ValueType) NewChunk(useXOR2 bool) (Chunk, error) {
|
||||
return NewEmptyChunk(v.ChunkEncoding(useXOR2))
|
||||
}
|
||||
|
||||
|
||||
// MockSeriesIterator returns an iterator for a mock series with custom
|
||||
// start timestamp, timestamps, and values.
|
||||
// Start timestamps is optional, pass nil or empty slice to indicate no start
|
||||
|
||||
10
tsdb/db.go
10
tsdb/db.go
@@ -240,6 +240,11 @@ type Options struct {
|
||||
// is implemented.
|
||||
EnableSTAsZeroSample bool
|
||||
|
||||
// EnableXOR2Encoding enables the XOR2 chunk encoding for float samples.
|
||||
// XOR2 provides better compression than XOR, especially for stale markers.
|
||||
// Automatically set to true when EnableSTStorage is true.
|
||||
EnableXOR2Encoding bool
|
||||
|
||||
// EnableSTStorage determines whether TSDB should write a Start Timestamp (ST)
|
||||
// per sample to WAL.
|
||||
// TODO(bwplotka): Implement this option as per PROM-60, currently it's noop.
|
||||
@@ -869,6 +874,7 @@ func Open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, st
|
||||
opts.FeatureRegistry.Set(features.TSDB, "use_uncached_io", opts.UseUncachedIO)
|
||||
opts.FeatureRegistry.Enable(features.TSDB, "native_histograms")
|
||||
opts.FeatureRegistry.Set(features.TSDB, "st_storage", opts.EnableSTStorage)
|
||||
opts.FeatureRegistry.Set(features.TSDB, "xor2_encoding", opts.EnableXOR2Encoding)
|
||||
}
|
||||
|
||||
return open(dir, l, r, opts, rngs, stats)
|
||||
@@ -1075,7 +1081,11 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
|
||||
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
|
||||
headOpts.EnableSharding = opts.EnableSharding
|
||||
headOpts.EnableSTAsZeroSample = opts.EnableSTAsZeroSample
|
||||
if opts.EnableSTStorage {
|
||||
opts.EnableXOR2Encoding = true
|
||||
}
|
||||
headOpts.EnableSTStorage.Store(opts.EnableSTStorage)
|
||||
headOpts.EnableXOR2Encoding.Store(opts.EnableXOR2Encoding)
|
||||
headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords
|
||||
if opts.WALReplayConcurrency > 0 {
|
||||
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
|
||||
|
||||
@@ -166,6 +166,11 @@ type HeadOptions struct {
|
||||
// Represents 'st-storage' feature flag.
|
||||
EnableSTStorage atomic.Bool
|
||||
|
||||
// EnableXOR2Encoding enables XOR2 chunk encoding for float samples.
|
||||
// Represents 'xor2-encoding' feature flag. Automatically true when
|
||||
// EnableSTStorage is true.
|
||||
EnableXOR2Encoding atomic.Bool
|
||||
|
||||
ChunkRange int64
|
||||
// ChunkDirRoot is the parent directory of the chunks directory.
|
||||
ChunkDirRoot string
|
||||
|
||||
@@ -186,6 +186,7 @@ func (h *Head) appender() *headAppender {
|
||||
appendID: appendID,
|
||||
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
|
||||
storeST: h.opts.EnableSTStorage.Load(),
|
||||
useXOR2: h.opts.EnableXOR2Encoding.Load(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -414,6 +415,7 @@ type headAppenderBase struct {
|
||||
appendID, cleanupAppendIDsBelow uint64
|
||||
closed bool
|
||||
storeST bool
|
||||
useXOR2 bool
|
||||
}
|
||||
type headAppender struct {
|
||||
headAppenderBase
|
||||
@@ -1747,7 +1749,7 @@ func (a *headAppenderBase) Commit() (err error) {
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: h.chunkRange.Load(),
|
||||
samplesPerChunk: h.opts.SamplesPerChunk,
|
||||
storeST: a.storeST,
|
||||
useXOR2: a.useXOR2,
|
||||
},
|
||||
oooEnc: record.Encoder{
|
||||
EnableSTStorage: a.storeST,
|
||||
@@ -1834,7 +1836,7 @@ type chunkOpts struct {
|
||||
chunkDiskMapper *chunks.ChunkDiskMapper
|
||||
chunkRange int64
|
||||
samplesPerChunk int
|
||||
storeST bool
|
||||
useXOR2 bool // Selects XOR2 encoding for float chunks.
|
||||
}
|
||||
|
||||
// append adds the sample (t, v) to the series. The caller also has to provide
|
||||
@@ -1842,7 +1844,7 @@ type chunkOpts struct {
|
||||
// isolation for this append.)
|
||||
// Series lock must be held when calling.
|
||||
func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(o.storeST), o)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(o.useXOR2), o)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
@@ -1873,7 +1875,7 @@ func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendI
|
||||
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
|
||||
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(o.storeST), o)
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(o.useXOR2), o)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
@@ -1930,7 +1932,7 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr
|
||||
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
|
||||
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(o.storeST), o)
|
||||
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(o.useXOR2), o)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
@@ -2187,7 +2189,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(o chunkOpts, logger *slog.Logger) []
|
||||
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
|
||||
return nil
|
||||
}
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, o.storeST)
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, o.useXOR2)
|
||||
if err != nil {
|
||||
handleChunkWriteError(err)
|
||||
return nil
|
||||
|
||||
@@ -96,6 +96,7 @@ func (h *Head) appenderV2() *headAppenderV2 {
|
||||
appendID: appendID,
|
||||
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
|
||||
storeST: h.opts.EnableSTStorage.Load(),
|
||||
useXOR2: h.opts.EnableXOR2Encoding.Load(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2943,6 +2943,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableST
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
|
||||
opts.EnableSTStorage.Store(enableSTstorage)
|
||||
opts.EnableXOR2Encoding.Store(enableSTstorage)
|
||||
|
||||
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -2994,7 +2995,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableST
|
||||
require.False(t, ok)
|
||||
require.NotNil(t, ms)
|
||||
|
||||
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, h.opts.EnableSTStorage.Load())
|
||||
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, h.opts.EnableXOR2Encoding.Load())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, chks, 1)
|
||||
|
||||
@@ -4813,6 +4814,7 @@ func TestHeadAppenderV2_STStorage(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
|
||||
opts.EnableSTStorage.Store(true)
|
||||
opts.EnableXOR2Encoding.Store(true)
|
||||
h, _ := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
|
||||
@@ -2183,47 +2183,47 @@ func TestComputeChunkEndTime(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestMemSeries_append tests float appending with various storeST/st combinations.
|
||||
// TestMemSeries_append tests float appending with various useXOR2/st combinations.
|
||||
func TestMemSeries_append(t *testing.T) {
|
||||
scenarios := []struct {
|
||||
name string
|
||||
storeST bool
|
||||
useXOR2 bool
|
||||
stFunc func(ts int64) int64 // Function to compute st from ts
|
||||
}{
|
||||
{
|
||||
name: "storeST=false st=0",
|
||||
storeST: false,
|
||||
name: "useXOR2=false st=0",
|
||||
useXOR2: false,
|
||||
stFunc: func(_ int64) int64 { return 0 },
|
||||
},
|
||||
{
|
||||
name: "storeST=true st=0",
|
||||
storeST: true,
|
||||
name: "useXOR2=true st=0",
|
||||
useXOR2: true,
|
||||
stFunc: func(_ int64) int64 { return 0 },
|
||||
},
|
||||
{
|
||||
name: "storeST=true st=ts",
|
||||
storeST: true,
|
||||
name: "useXOR2=true st=ts",
|
||||
useXOR2: true,
|
||||
stFunc: func(ts int64) int64 { return ts },
|
||||
},
|
||||
{
|
||||
name: "storeST=true st=ts-100",
|
||||
storeST: true,
|
||||
name: "useXOR2=true st=ts-100",
|
||||
useXOR2: true,
|
||||
stFunc: func(ts int64) int64 { return ts - 100 },
|
||||
},
|
||||
{
|
||||
name: "storeST=false st=ts (st ignored)",
|
||||
storeST: false,
|
||||
name: "useXOR2=false st=ts (st ignored)",
|
||||
useXOR2: false,
|
||||
stFunc: func(ts int64) int64 { return ts },
|
||||
},
|
||||
}
|
||||
for _, scenario := range scenarios {
|
||||
t.Run(scenario.name, func(t *testing.T) {
|
||||
testMemSeriesAppend(t, scenario.storeST, scenario.stFunc)
|
||||
testMemSeriesAppend(t, scenario.useXOR2, scenario.stFunc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64) {
|
||||
func testMemSeriesAppend(t *testing.T, useXOR2 bool, stFunc func(ts int64) int64) {
|
||||
dir := t.TempDir()
|
||||
// This is usually taken from the Head, but passing manually here.
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
@@ -2235,7 +2235,7 @@ func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: 500,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
storeST: storeST,
|
||||
useXOR2: useXOR2,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
|
||||
@@ -2286,47 +2286,47 @@ func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64
|
||||
}
|
||||
}
|
||||
|
||||
// TestMemSeries_appendHistogram tests histogram appending with various storeST/st combinations.
|
||||
// TestMemSeries_appendHistogram tests histogram appending with various useXOR2/st combinations.
|
||||
func TestMemSeries_appendHistogram(t *testing.T) {
|
||||
scenarios := []struct {
|
||||
name string
|
||||
storeST bool
|
||||
useXOR2 bool
|
||||
stFunc func(ts int64) int64 // Function to compute st from ts
|
||||
}{
|
||||
{
|
||||
name: "storeST=false st=0",
|
||||
storeST: false,
|
||||
name: "useXOR2=false st=0",
|
||||
useXOR2: false,
|
||||
stFunc: func(_ int64) int64 { return 0 },
|
||||
},
|
||||
{
|
||||
name: "storeST=true st=0",
|
||||
storeST: true,
|
||||
name: "useXOR2=true st=0",
|
||||
useXOR2: true,
|
||||
stFunc: func(_ int64) int64 { return 0 },
|
||||
},
|
||||
{
|
||||
name: "storeST=true st=ts",
|
||||
storeST: true,
|
||||
name: "useXOR2=true st=ts",
|
||||
useXOR2: true,
|
||||
stFunc: func(ts int64) int64 { return ts },
|
||||
},
|
||||
{
|
||||
name: "storeST=true st=ts-100",
|
||||
storeST: true,
|
||||
name: "useXOR2=true st=ts-100",
|
||||
useXOR2: true,
|
||||
stFunc: func(ts int64) int64 { return ts - 100 },
|
||||
},
|
||||
{
|
||||
name: "storeST=false st=ts (st ignored)",
|
||||
storeST: false,
|
||||
name: "useXOR2=false st=ts (st ignored)",
|
||||
useXOR2: false,
|
||||
stFunc: func(ts int64) int64 { return ts },
|
||||
},
|
||||
}
|
||||
for _, scenario := range scenarios {
|
||||
t.Run(scenario.name, func(t *testing.T) {
|
||||
testMemSeriesAppendHistogram(t, scenario.storeST, scenario.stFunc)
|
||||
testMemSeriesAppendHistogram(t, scenario.useXOR2, scenario.stFunc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testMemSeriesAppendHistogram(t *testing.T, storeST bool, stFunc func(ts int64) int64) {
|
||||
func testMemSeriesAppendHistogram(t *testing.T, useXOR2 bool, stFunc func(ts int64) int64) {
|
||||
dir := t.TempDir()
|
||||
// This is usually taken from the Head, but passing manually here.
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
@@ -2338,7 +2338,7 @@ func testMemSeriesAppendHistogram(t *testing.T, storeST bool, stFunc func(ts int
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: int64(1000),
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
storeST: storeST,
|
||||
useXOR2: useXOR2,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
|
||||
@@ -7354,6 +7354,7 @@ func TestHeadAppender_WALEncoder_EnableSTStorage(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("enableSTStorage=%v", enableST), func(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
|
||||
opts.EnableSTStorage.Store(enableST)
|
||||
opts.EnableXOR2Encoding.Store(enableST)
|
||||
h, w := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
@@ -7409,6 +7410,7 @@ func TestHeadAppender_WBLEncoder_EnableSTStorage(t *testing.T) {
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds())
|
||||
opts.EnableSTStorage.Store(enableST)
|
||||
opts.EnableXOR2Encoding.Store(enableST)
|
||||
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -7527,6 +7529,7 @@ func TestHeadAppender_STStorage_Disabled(t *testing.T) {
|
||||
func TestHeadAppender_STStorage_WALReplay(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
|
||||
opts.EnableSTStorage.Store(true)
|
||||
opts.EnableXOR2Encoding.Store(true)
|
||||
h, w := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
@@ -7578,6 +7581,7 @@ func TestHeadAppender_STStorage_WBLReplay(t *testing.T) {
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds())
|
||||
opts.EnableSTStorage.Store(true)
|
||||
opts.EnableXOR2Encoding.Store(true)
|
||||
|
||||
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
|
||||
require.NoError(t, err)
|
||||
@@ -7656,6 +7660,7 @@ func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
|
||||
opts.EnableSTStorage.Store(enableST)
|
||||
opts.EnableXOR2Encoding.Store(enableST) // ST storage implies XOR2 encoding.
|
||||
h, _ := newTestHeadWithOptions(t, compression.None, opts)
|
||||
|
||||
lbls := labels.FromStrings("foo", "bar")
|
||||
|
||||
@@ -636,7 +636,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: h.chunkRange.Load(),
|
||||
samplesPerChunk: h.opts.SamplesPerChunk,
|
||||
storeST: h.opts.EnableSTStorage.Load(),
|
||||
useXOR2: h.opts.EnableXOR2Encoding.Load(),
|
||||
}
|
||||
|
||||
for in := range wp.input {
|
||||
@@ -1084,7 +1084,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: h.chunkRange.Load(),
|
||||
samplesPerChunk: h.opts.SamplesPerChunk,
|
||||
storeST: h.opts.EnableSTStorage.Load(),
|
||||
useXOR2: h.opts.EnableXOR2Encoding.Load(),
|
||||
}
|
||||
// We don't check for minValidTime for ooo samples.
|
||||
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
||||
@@ -1251,7 +1251,7 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh
|
||||
csr.mc.chunk = chk
|
||||
|
||||
switch enc {
|
||||
case chunkenc.EncXOR:
|
||||
case chunkenc.EncXOR, chunkenc.EncXOR2:
|
||||
// Backwards-compatibility for old sampleBuf which had last 4 samples.
|
||||
for range 3 {
|
||||
_ = dec.Be64int64()
|
||||
|
||||
@@ -73,7 +73,7 @@ func (o *OOOChunk) NumSamples() int {
|
||||
// ToEncodedChunks returns chunks with the samples in the OOOChunk.
|
||||
//
|
||||
//nolint:revive
|
||||
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, storeST bool) (chks []memChunk, err error) {
|
||||
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memChunk, err error) {
|
||||
if len(o.samples) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -93,7 +93,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, storeST bool) (chks []memCh
|
||||
if s.t > maxt {
|
||||
break
|
||||
}
|
||||
encoding := chunkenc.ValFloat.ChunkEncoding(storeST)
|
||||
encoding := chunkenc.ValFloat.ChunkEncoding(useXOR2)
|
||||
switch {
|
||||
case s.h != nil:
|
||||
// TODO(krajorama): use ST capable histogram chunk.
|
||||
|
||||
@@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
if s.ooo != nil {
|
||||
return getOOOSeriesChunks(s, oh.head.opts.EnableSTStorage.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
|
||||
return getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
|
||||
}
|
||||
*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
|
||||
return nil
|
||||
@@ -88,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
|
||||
//
|
||||
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
|
||||
// the oooHeadChunk will not be considered.
|
||||
func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
|
||||
func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
|
||||
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
|
||||
|
||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
|
||||
@@ -106,7 +106,7 @@ func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbag
|
||||
if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 {
|
||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
|
||||
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime, storeST)
|
||||
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime, useXOR2)
|
||||
if err != nil {
|
||||
handleChunkWriteError(err)
|
||||
return nil
|
||||
@@ -347,7 +347,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
|
||||
}
|
||||
|
||||
var lastMmapRef chunks.ChunkDiskMapperRef
|
||||
mmapRefs := ms.mmapCurrentOOOHeadChunk(chunkOpts{chunkDiskMapper: head.chunkDiskMapper, storeST: head.opts.EnableSTStorage.Load()}, head.logger)
|
||||
mmapRefs := ms.mmapCurrentOOOHeadChunk(chunkOpts{chunkDiskMapper: head.chunkDiskMapper, useXOR2: head.opts.EnableXOR2Encoding.Load()}, head.logger)
|
||||
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
|
||||
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
|
||||
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
|
||||
@@ -481,7 +481,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
|
||||
return nil
|
||||
}
|
||||
|
||||
return getOOOSeriesChunks(s, ir.ch.head.opts.EnableSTStorage.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
|
||||
return getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
|
||||
}
|
||||
|
||||
func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) {
|
||||
|
||||
@@ -367,8 +367,8 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with storeST=true and storeST=false for float samples.
|
||||
// When storeST=true, st values are preserved; when storeST=false, AtST() returns 0.
|
||||
// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with useXOR2=true and useXOR2=false for float samples.
|
||||
// When useXOR2=true, st values are preserved; when useXOR2=false, AtST() returns 0.
|
||||
// TODO(@krajorama): Add histogram test cases once ST storage is implemented for histograms.
|
||||
func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
|
||||
testCases := map[string]struct {
|
||||
@@ -403,11 +403,11 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
|
||||
|
||||
storageScenarios := []struct {
|
||||
name string
|
||||
storeST bool
|
||||
useXOR2 bool
|
||||
expectedEncoding chunkenc.Encoding
|
||||
}{
|
||||
{"storeST=true", true, chunkenc.EncXOR2},
|
||||
{"storeST=false", false, chunkenc.EncXOR},
|
||||
{"useXOR2=true", true, chunkenc.EncXOR2},
|
||||
{"useXOR2=false", false, chunkenc.EncXOR},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
@@ -418,7 +418,7 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
|
||||
oooChunk.Insert(s.st, s.t, s.f, nil, nil)
|
||||
}
|
||||
|
||||
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, ss.storeST)
|
||||
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, ss.useXOR2)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, chunks, 1, "number of chunks")
|
||||
|
||||
@@ -434,12 +434,12 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
|
||||
gotT, gotF := it.At()
|
||||
gotST := it.AtST()
|
||||
|
||||
if ss.storeST {
|
||||
// When storeST=true, st values should be preserved.
|
||||
if ss.useXOR2 {
|
||||
// When useXOR2=true, st values should be preserved.
|
||||
require.Equal(t, tc.samples[sampleIndex].st, gotST, "sample %d st", sampleIndex)
|
||||
} else {
|
||||
// When storeST=false, AtST() should return 0.
|
||||
require.Equal(t, int64(0), gotST, "sample %d st should be 0 when storeST=false", sampleIndex)
|
||||
// When useXOR2=false, AtST() should return 0.
|
||||
require.Equal(t, int64(0), gotST, "sample %d st should be 0 when useXOR2=false", sampleIndex)
|
||||
}
|
||||
require.Equal(t, tc.samples[sampleIndex].t, gotT, "sample %d t", sampleIndex)
|
||||
require.Equal(t, tc.samples[sampleIndex].f, gotF, "sample %d f", sampleIndex)
|
||||
|
||||
Reference in New Issue
Block a user