Merge branch 'feature/start-time' into cedwards/document-st-storage

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka
2026-03-17 06:21:05 +01:00
committed by GitHub
16 changed files with 111 additions and 69 deletions

View File

@@ -281,9 +281,13 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
logger.Info("Experimental start timestamp zero ingestion enabled. OpenMetrics 1.0 parsing will parse <metric>_created metrics as ST instead of normal sample. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "xor2-encoding":
c.tsdb.EnableXOR2Encoding = true
logger.Info("Experimental XOR2 chunk encoding enabled.")
case "st-storage":
c.scrape.ParseST = true
c.tsdb.EnableSTStorage = true
c.tsdb.EnableXOR2Encoding = true // Set chunk encoding type to XOR2 for samples with ST
c.agent.EnableSTStorage = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. This is to widen the ST support surface.
@@ -602,7 +606,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, created-timestamp-zero-ingestion, st-storage, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io, promql-extended-range-selectors, promql-binop-fill-modifiers. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, created-timestamp-zero-ingestion, st-storage, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io, promql-extended-range-selectors, promql-binop-fill-modifiers, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)
@@ -2010,6 +2014,7 @@ type tsdbOptions struct {
BlockReloadInterval model.Duration
EnableSTAsZeroSample bool
EnableSTStorage bool
EnableXOR2Encoding bool
StaleSeriesCompactionThreshold float64
}
@@ -2040,6 +2045,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
FeatureRegistry: features.DefaultRegistry,
EnableSTAsZeroSample: opts.EnableSTAsZeroSample,
EnableSTStorage: opts.EnableSTStorage,
EnableXOR2Encoding: opts.EnableXOR2Encoding,
StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold,
}
}

View File

@@ -252,6 +252,7 @@
"isolation": true,
"native_histograms": true,
"st_storage": false,
"xor2_encoding": false,
"use_uncached_io": false
},
"ui": {

View File

@@ -59,7 +59,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, created-timestamp-zero-ingestion, st-storage, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io, promql-extended-range-selectors, promql-binop-fill-modifiers. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, created-timestamp-zero-ingestion, st-storage, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io, promql-extended-range-selectors, promql-binop-fill-modifiers, xor2-encoding. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--agent</code> | Run Prometheus in 'Agent mode'. | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View File

@@ -335,6 +335,17 @@ This is currently implemented using direct I/O.
For more details, see the [proposal](https://github.com/prometheus/proposals/pull/45).
## XOR2 chunk encoding
`--enable-feature=xor2-encoding`
> WARNING: This is highly experimental and risky setting:
> * Chunks encoded with XOR2 **cannot be read by older Prometheus versions** that do not support the encoding. Once enabled and data is written, you need to **manually delete blocks from the disk**, otherwise Prometheus will return error on all queries.
> * We are still experimenting on the final encoding. As of now this encoding can change in any Prometheus version. All your persistent block data will be lost between versions.
> * This is encoding is new, meaning downstream tools and LTS systems might now support it yet (e.g. Thanos sidecar uploaded blocks).
This setting enables the new XOR2 chunk encoding for float samples, which provides better disk compression than the default XOR encoding for typical Prometheus workloads. This format also allow storing Start Timestamp (ST).
## Extended Range Selectors
`--enable-feature=promql-extended-range-selectors`

View File

@@ -191,10 +191,10 @@ func (v ValueType) String() string {
}
}
func (v ValueType) ChunkEncoding(storeST bool) Encoding {
func (v ValueType) ChunkEncoding(useXOR2 bool) Encoding {
switch v {
case ValFloat:
if storeST {
if useXOR2 {
return EncXOR2
}
return EncXOR
@@ -207,8 +207,9 @@ func (v ValueType) ChunkEncoding(storeST bool) Encoding {
}
}
func (v ValueType) NewChunk(storeST bool) (Chunk, error) {
return NewEmptyChunk(v.ChunkEncoding(storeST))
// NewChunk returns a new empty chunk for the given value type.
func (v ValueType) NewChunk(useXOR2 bool) (Chunk, error) {
return NewEmptyChunk(v.ChunkEncoding(useXOR2))
}
// MockSeriesIterator returns an iterator for a mock series with custom

View File

@@ -240,6 +240,11 @@ type Options struct {
// is implemented.
EnableSTAsZeroSample bool
// EnableXOR2Encoding enables the XOR2 chunk encoding for float samples.
// XOR2 provides better compression than XOR, especially for stale markers.
// Automatically set to true when EnableSTStorage is true.
EnableXOR2Encoding bool
// EnableSTStorage determines whether TSDB should write a Start Timestamp (ST)
// per sample to WAL.
// TODO(bwplotka): Implement this option as per PROM-60, currently it's noop.
@@ -869,6 +874,7 @@ func Open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, st
opts.FeatureRegistry.Set(features.TSDB, "use_uncached_io", opts.UseUncachedIO)
opts.FeatureRegistry.Enable(features.TSDB, "native_histograms")
opts.FeatureRegistry.Set(features.TSDB, "st_storage", opts.EnableSTStorage)
opts.FeatureRegistry.Set(features.TSDB, "xor2_encoding", opts.EnableXOR2Encoding)
}
return open(dir, l, r, opts, rngs, stats)
@@ -1076,6 +1082,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
headOpts.EnableSharding = opts.EnableSharding
headOpts.EnableSTAsZeroSample = opts.EnableSTAsZeroSample
headOpts.EnableSTStorage.Store(opts.EnableSTStorage)
headOpts.EnableXOR2Encoding.Store(opts.EnableXOR2Encoding)
headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency

View File

@@ -7510,12 +7510,13 @@ func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) {
t.Parallel()
opts := &Options{
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: compression.Snappy,
EnableSTStorage: true,
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: compression.Snappy,
EnableSTStorage: true,
EnableXOR2Encoding: true,
}
db := newTestDB(t, withOpts(opts))
ctx := context.Background()
@@ -7654,6 +7655,7 @@ func TestDBAppenderV2_STStorage_OutOfOrder(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.EnableSTStorage = true
opts.EnableXOR2Encoding = true
db := newTestDB(t, withOpts(opts))
db.DisableCompactions()

View File

@@ -166,6 +166,10 @@ type HeadOptions struct {
// Represents 'st-storage' feature flag.
EnableSTStorage atomic.Bool
// EnableXOR2Encoding enables XOR2 chunk encoding for float samples.
// Represents 'xor2-encoding' feature flag.
EnableXOR2Encoding atomic.Bool
ChunkRange int64
// ChunkDirRoot is the parent directory of the chunks directory.
ChunkDirRoot string

View File

@@ -186,6 +186,7 @@ func (h *Head) appender() *headAppender {
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
storeST: h.opts.EnableSTStorage.Load(),
useXOR2: h.opts.EnableXOR2Encoding.Load(),
},
}
}
@@ -414,6 +415,7 @@ type headAppenderBase struct {
appendID, cleanupAppendIDsBelow uint64
closed bool
storeST bool
useXOR2 bool
}
type headAppender struct {
headAppenderBase
@@ -1747,7 +1749,7 @@ func (a *headAppenderBase) Commit() (err error) {
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: h.chunkRange.Load(),
samplesPerChunk: h.opts.SamplesPerChunk,
storeST: a.storeST,
useXOR2: a.useXOR2,
},
oooEnc: record.Encoder{
EnableSTStorage: a.storeST,
@@ -1834,7 +1836,7 @@ type chunkOpts struct {
chunkDiskMapper *chunks.ChunkDiskMapper
chunkRange int64
samplesPerChunk int
storeST bool
useXOR2 bool // Selects XOR2 encoding for float chunks.
}
// append adds the sample (t, v) to the series. The caller also has to provide
@@ -1842,7 +1844,7 @@ type chunkOpts struct {
// isolation for this append.)
// Series lock must be held when calling.
func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(o.storeST), o)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(o.useXOR2), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@@ -1873,7 +1875,7 @@ func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendI
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(o.storeST), o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(o.useXOR2), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@@ -1930,7 +1932,7 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(o.storeST), o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(o.useXOR2), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@@ -2187,7 +2189,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(o chunkOpts, logger *slog.Logger) []
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
return nil
}
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, o.storeST)
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, o.useXOR2)
if err != nil {
handleChunkWriteError(err)
return nil

View File

@@ -96,6 +96,7 @@ func (h *Head) appenderV2() *headAppenderV2 {
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
storeST: h.opts.EnableSTStorage.Load(),
useXOR2: h.opts.EnableXOR2Encoding.Load(),
},
}
}

View File

@@ -2943,6 +2943,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableST
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
opts.EnableSTStorage.Store(enableSTstorage)
opts.EnableXOR2Encoding.Store(enableSTstorage)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@@ -2994,7 +2995,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableST
require.False(t, ok)
require.NotNil(t, ms)
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, h.opts.EnableSTStorage.Load())
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, h.opts.EnableXOR2Encoding.Load())
require.NoError(t, err)
require.Len(t, chks, 1)
@@ -4813,6 +4814,7 @@ func TestHeadAppenderV2_STStorage(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(true)
opts.EnableXOR2Encoding.Store(true)
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")

View File

@@ -2183,47 +2183,47 @@ func TestComputeChunkEndTime(t *testing.T) {
}
}
// TestMemSeries_append tests float appending with various storeST/st combinations.
// TestMemSeries_append tests float appending with various useXOR2/st combinations.
func TestMemSeries_append(t *testing.T) {
scenarios := []struct {
name string
storeST bool
useXOR2 bool
stFunc func(ts int64) int64 // Function to compute st from ts
}{
{
name: "storeST=false st=0",
storeST: false,
name: "useXOR2=false st=0",
useXOR2: false,
stFunc: func(_ int64) int64 { return 0 },
},
{
name: "storeST=true st=0",
storeST: true,
name: "useXOR2=true st=0",
useXOR2: true,
stFunc: func(_ int64) int64 { return 0 },
},
{
name: "storeST=true st=ts",
storeST: true,
name: "useXOR2=true st=ts",
useXOR2: true,
stFunc: func(ts int64) int64 { return ts },
},
{
name: "storeST=true st=ts-100",
storeST: true,
name: "useXOR2=true st=ts-100",
useXOR2: true,
stFunc: func(ts int64) int64 { return ts - 100 },
},
{
name: "storeST=false st=ts (st ignored)",
storeST: false,
name: "useXOR2=false st=ts (st ignored)",
useXOR2: false,
stFunc: func(ts int64) int64 { return ts },
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
testMemSeriesAppend(t, scenario.storeST, scenario.stFunc)
testMemSeriesAppend(t, scenario.useXOR2, scenario.stFunc)
})
}
}
func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64) {
func testMemSeriesAppend(t *testing.T, useXOR2 bool, stFunc func(ts int64) int64) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
@@ -2235,7 +2235,7 @@ func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64
chunkDiskMapper: chunkDiskMapper,
chunkRange: 500,
samplesPerChunk: DefaultSamplesPerChunk,
storeST: storeST,
useXOR2: useXOR2,
}
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
@@ -2286,47 +2286,47 @@ func testMemSeriesAppend(t *testing.T, storeST bool, stFunc func(ts int64) int64
}
}
// TestMemSeries_appendHistogram tests histogram appending with various storeST/st combinations.
// TestMemSeries_appendHistogram tests histogram appending with various useXOR2/st combinations.
func TestMemSeries_appendHistogram(t *testing.T) {
scenarios := []struct {
name string
storeST bool
useXOR2 bool
stFunc func(ts int64) int64 // Function to compute st from ts
}{
{
name: "storeST=false st=0",
storeST: false,
name: "useXOR2=false st=0",
useXOR2: false,
stFunc: func(_ int64) int64 { return 0 },
},
{
name: "storeST=true st=0",
storeST: true,
name: "useXOR2=true st=0",
useXOR2: true,
stFunc: func(_ int64) int64 { return 0 },
},
{
name: "storeST=true st=ts",
storeST: true,
name: "useXOR2=true st=ts",
useXOR2: true,
stFunc: func(ts int64) int64 { return ts },
},
{
name: "storeST=true st=ts-100",
storeST: true,
name: "useXOR2=true st=ts-100",
useXOR2: true,
stFunc: func(ts int64) int64 { return ts - 100 },
},
{
name: "storeST=false st=ts (st ignored)",
storeST: false,
name: "useXOR2=false st=ts (st ignored)",
useXOR2: false,
stFunc: func(ts int64) int64 { return ts },
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
testMemSeriesAppendHistogram(t, scenario.storeST, scenario.stFunc)
testMemSeriesAppendHistogram(t, scenario.useXOR2, scenario.stFunc)
})
}
}
func testMemSeriesAppendHistogram(t *testing.T, storeST bool, stFunc func(ts int64) int64) {
func testMemSeriesAppendHistogram(t *testing.T, useXOR2 bool, stFunc func(ts int64) int64) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
@@ -2338,7 +2338,7 @@ func testMemSeriesAppendHistogram(t *testing.T, storeST bool, stFunc func(ts int
chunkDiskMapper: chunkDiskMapper,
chunkRange: int64(1000),
samplesPerChunk: DefaultSamplesPerChunk,
storeST: storeST,
useXOR2: useXOR2,
}
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false)
@@ -7354,6 +7354,7 @@ func TestHeadAppender_WALEncoder_EnableSTStorage(t *testing.T) {
t.Run(fmt.Sprintf("enableSTStorage=%v", enableST), func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(enableST)
opts.EnableXOR2Encoding.Store(enableST)
h, w := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
@@ -7409,6 +7410,7 @@ func TestHeadAppender_WBLEncoder_EnableSTStorage(t *testing.T) {
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds())
opts.EnableSTStorage.Store(enableST)
opts.EnableXOR2Encoding.Store(enableST)
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
require.NoError(t, err)
@@ -7527,6 +7529,7 @@ func TestHeadAppender_STStorage_Disabled(t *testing.T) {
func TestHeadAppender_STStorage_WALReplay(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(true)
opts.EnableXOR2Encoding.Store(true)
h, w := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
@@ -7578,6 +7581,7 @@ func TestHeadAppender_STStorage_WBLReplay(t *testing.T) {
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(60 * time.Minute.Milliseconds())
opts.EnableSTStorage.Store(true)
opts.EnableXOR2Encoding.Store(true)
h, err := NewHead(nil, nil, wal, wbl, opts, nil)
require.NoError(t, err)
@@ -7656,6 +7660,7 @@ func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) {
t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(enableST)
opts.EnableXOR2Encoding.Store(enableST) // ST storage implies XOR2 encoding.
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")

View File

@@ -636,7 +636,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: h.chunkRange.Load(),
samplesPerChunk: h.opts.SamplesPerChunk,
storeST: h.opts.EnableSTStorage.Load(),
useXOR2: h.opts.EnableXOR2Encoding.Load(),
}
for in := range wp.input {
@@ -1084,7 +1084,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: h.chunkRange.Load(),
samplesPerChunk: h.opts.SamplesPerChunk,
storeST: h.opts.EnableSTStorage.Load(),
useXOR2: h.opts.EnableXOR2Encoding.Load(),
}
// We don't check for minValidTime for ooo samples.
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
@@ -1251,7 +1251,7 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh
csr.mc.chunk = chk
switch enc {
case chunkenc.EncXOR:
case chunkenc.EncXOR, chunkenc.EncXOR2:
// Backwards-compatibility for old sampleBuf which had last 4 samples.
for range 3 {
_ = dec.Be64int64()

View File

@@ -73,7 +73,7 @@ func (o *OOOChunk) NumSamples() int {
// ToEncodedChunks returns chunks with the samples in the OOOChunk.
//
//nolint:revive
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, storeST bool) (chks []memChunk, err error) {
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, useXOR2 bool) (chks []memChunk, err error) {
if len(o.samples) == 0 {
return nil, nil
}
@@ -93,7 +93,7 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64, storeST bool) (chks []memCh
if s.t > maxt {
break
}
encoding := chunkenc.ValFloat.ChunkEncoding(storeST)
encoding := chunkenc.ValFloat.ChunkEncoding(useXOR2)
switch {
case s.h != nil:
// TODO(krajorama): use ST capable histogram chunk.

View File

@@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
*chks = (*chks)[:0]
if s.ooo != nil {
return getOOOSeriesChunks(s, oh.head.opts.EnableSTStorage.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
return getOOOSeriesChunks(s, oh.head.opts.EnableXOR2Encoding.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks)
}
*chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks)
return nil
@@ -88,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
func getOOOSeriesChunks(s *memSeries, useXOR2 bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error {
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) {
@@ -106,7 +106,7 @@ func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbag
if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime, storeST)
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime, useXOR2)
if err != nil {
handleChunkWriteError(err)
return nil
@@ -347,7 +347,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
}
var lastMmapRef chunks.ChunkDiskMapperRef
mmapRefs := ms.mmapCurrentOOOHeadChunk(chunkOpts{chunkDiskMapper: head.chunkDiskMapper, storeST: head.opts.EnableSTStorage.Load()}, head.logger)
mmapRefs := ms.mmapCurrentOOOHeadChunk(chunkOpts{chunkDiskMapper: head.chunkDiskMapper, useXOR2: head.opts.EnableXOR2Encoding.Load()}, head.logger)
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
@@ -481,7 +481,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
return nil
}
return getOOOSeriesChunks(s, ir.ch.head.opts.EnableSTStorage.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
return getOOOSeriesChunks(s, ir.ch.head.opts.EnableXOR2Encoding.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks)
}
func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) {

View File

@@ -367,8 +367,8 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
}
}
// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with storeST=true and storeST=false for float samples.
// When storeST=true, st values are preserved; when storeST=false, AtST() returns 0.
// TestOOOChunks_ToEncodedChunks_WithST tests ToEncodedChunks with useXOR2=true and useXOR2=false for float samples.
// When useXOR2=true, st values are preserved; when useXOR2=false, AtST() returns 0.
// TODO(@krajorama): Add histogram test cases once ST storage is implemented for histograms.
func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
testCases := map[string]struct {
@@ -403,11 +403,11 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
storageScenarios := []struct {
name string
storeST bool
useXOR2 bool
expectedEncoding chunkenc.Encoding
}{
{"storeST=true", true, chunkenc.EncXOR2},
{"storeST=false", false, chunkenc.EncXOR},
{"useXOR2=true", true, chunkenc.EncXOR2},
{"useXOR2=false", false, chunkenc.EncXOR},
}
for name, tc := range testCases {
@@ -418,7 +418,7 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
oooChunk.Insert(s.st, s.t, s.f, nil, nil)
}
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, ss.storeST)
chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64, ss.useXOR2)
require.NoError(t, err)
require.Len(t, chunks, 1, "number of chunks")
@@ -434,12 +434,12 @@ func TestOOOChunks_ToEncodedChunks_WithST(t *testing.T) {
gotT, gotF := it.At()
gotST := it.AtST()
if ss.storeST {
// When storeST=true, st values should be preserved.
if ss.useXOR2 {
// When useXOR2=true, st values should be preserved.
require.Equal(t, tc.samples[sampleIndex].st, gotST, "sample %d st", sampleIndex)
} else {
// When storeST=false, AtST() should return 0.
require.Equal(t, int64(0), gotST, "sample %d st should be 0 when storeST=false", sampleIndex)
// When useXOR2=false, AtST() should return 0.
require.Equal(t, int64(0), gotST, "sample %d st should be 0 when useXOR2=false", sampleIndex)
}
require.Equal(t, tc.samples[sampleIndex].t, gotT, "sample %d t", sampleIndex)
require.Equal(t, tc.samples[sampleIndex].f, gotF, "sample %d f", sampleIndex)