From f2271a60ddd388e69bd06a7406965cbb26bdf59a Mon Sep 17 00:00:00 2001 From: Raphael Bizos Date: Mon, 9 Feb 2026 11:47:04 +0100 Subject: [PATCH] Adding a nhcb as classic histogram Storage wrapper this allows to query nhcb with _buckets, _sum or _count suffix and get classic histograms Signed-off-by: Raphael Bizos --- cmd/prometheus/main.go | 13 +- storage/nhcb_querier.go | 306 +++++++++++++++++++++++++++++++++++ storage/nhcb_querier_test.go | 239 +++++++++++++++++++++++++++ 3 files changed, 556 insertions(+), 2 deletions(-) create mode 100644 storage/nhcb_querier.go create mode 100644 storage/nhcb_querier_test.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index ee60e58b2e..b06b6095b3 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -212,6 +212,7 @@ type flagConfig struct { // for ease of use. enablePerStepStats bool enableConcurrentRuleEval bool + enableNHCBasClassic bool prometheusURL string corsRegexString string @@ -303,6 +304,8 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "use-uncached-io": c.tsdb.UseUncachedIO = true logger.Info("Experimental Uncached IO is enabled.") + case "promq-nhcb-as-classic": + c.enableNHCBasClassic = true default: logger.Warn("Unknown option for --enable-feature", "option", o) } @@ -819,12 +822,18 @@ func main() { features.Set(features.Prometheus, "auto_reload_config", cfg.enableAutoReload) features.Enable(features.Prometheus, labels.ImplementationName) template.RegisterFeatures(features.DefaultRegistry) + var ( + localStorage = &readyStorage{stats: tsdb.NewDBStats()} + wrappedStorage storage.Storage = localStorage + ) + if cfg.enableNHCBasClassic { + wrappedStorage = storage.NewNHCBAsClassicStorage(localStorage) + } var ( - localStorage = &readyStorage{stats: tsdb.NewDBStats()} scraper = &readyScrapeManager{} remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.EnableTypeAndUnitLabels) - fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) + fanoutStorage = storage.NewFanout(logger, wrappedStorage, remoteStorage) ) var ( diff --git a/storage/nhcb_querier.go b/storage/nhcb_querier.go new file mode 100644 index 0000000000..717646e9f5 --- /dev/null +++ b/storage/nhcb_querier.go @@ -0,0 +1,306 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "strings" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/util/annotations" +) + +// NHCBAsClassicQuerier wraps a Querier and converts NHCB (Native Histogram Custom Buckets) +// queries to classic histogram format when classic series don't exist. +type NHCBAsClassicQuerier struct { + Querier +} + +// NewNHCBAsClassicQuerier returns a new querier that wraps the given querier +// and converts NHCB to classic histogram format for queries. +func NewNHCBAsClassicQuerier(q Querier) Querier { + return &NHCBAsClassicQuerier{Querier: q} +} + +// NHCBAsClassicStorage wraps a Storage and applies NHCB-to-classic conversion +// to queriers when enabled. +type NHCBAsClassicStorage struct { + Storage +} + +// NewNHCBAsClassicStorage returns a new storage that wraps the given storage +// and applies NHCB-to-classic conversion to queriers. +func NewNHCBAsClassicStorage(s Storage) Storage { + return &NHCBAsClassicStorage{Storage: s} +} + +// Querier implements the Storage interface. +func (s *NHCBAsClassicStorage) Querier(mint, maxt int64) (Querier, error) { + q, err := s.Storage.Querier(mint, maxt) + if err != nil { + return nil, err + } + return NewNHCBAsClassicQuerier(q), nil +} + +// Select implements the Querier interface. +func (q *NHCBAsClassicQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet { + metricName, suffix, baseMatchers := extractHistogramSuffix(matchers) + if suffix == "" { + // Not a classic histogram query, pass through + return q.Querier.Select(ctx, sortSeries, hints, matchers...) + } + + classicSet := q.Querier.Select(ctx, sortSeries, hints, matchers...) + if classicSet.Err() != nil { + return classicSet + } + + var classicSeries []Series + for classicSet.Next() { + classicSeries = append(classicSeries, classicSet.At()) + } + + if err := classicSet.Err(); err != nil { + return ErrSeriesSet(err) + } + + if len(classicSeries) > 0 { + return &bufferedSeriesSet{series: classicSeries} + } + + baseMatchers = append(baseMatchers, labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, metricName)) + nhcbSet := q.Querier.Select(ctx, sortSeries, hints, baseMatchers...) + if nhcbSet.Err() != nil { + return nhcbSet + } + + return &nhcbToClassicSeriesSet{ + nhcbSet: nhcbSet, + suffix: suffix, + metricName: metricName, + } +} + +// bufferedSeriesSet wraps a buffered list of series. +type bufferedSeriesSet struct { + series []Series + idx int +} + +func (b *bufferedSeriesSet) Next() bool { + if b.idx < len(b.series) { + b.idx++ + return true + } + return false +} + +func (b *bufferedSeriesSet) At() Series { + if b.idx == 0 || b.idx > len(b.series) { + return nil + } + return b.series[b.idx-1] +} + +func (*bufferedSeriesSet) Err() error { + return nil +} + +func (*bufferedSeriesSet) Warnings() annotations.Annotations { + return nil +} + +// extractHistogramSuffix extracts the metric name, suffix (_bucket, _count, _sum), and base matchers. +// Returns empty suffix if not a classic histogram query. +func extractHistogramSuffix(matchers []*labels.Matcher) (string, string, []*labels.Matcher) { + var metricName string + baseMatchers := make([]*labels.Matcher, 0, len(matchers)) + + for _, m := range matchers { + if m.Name == model.MetricNameLabel { + metricName = m.Value + } else { + baseMatchers = append(baseMatchers, m) + } + } + + if metricName == "" { + return "", "", matchers + } + + var suffix string + + switch { + case strings.HasSuffix(metricName, "_bucket"): + suffix = "_bucket" + case strings.HasSuffix(metricName, "_count"): + suffix = "_count" + case strings.HasSuffix(metricName, "_sum"): + suffix = "_sum" + default: + return "", "", matchers + } + + baseName := metricName[:len(metricName)-len(suffix)] + return baseName, suffix, baseMatchers +} + +// nhcbToClassicSeriesSet converts NHCB series to classic histogram series format. +type nhcbToClassicSeriesSet struct { + nhcbSet SeriesSet + suffix string + metricName string + series []Series + idx int + err error +} + +func (s *nhcbToClassicSeriesSet) Next() bool { + if s.err != nil { + return false + } + + // convert all NHCB series on first Next() call + // It is easier to implement like this as a single NHCB represents multiple series when converted + // to a classic Histogram. + if s.series == nil { + s.series = make([]Series, 0) + lsetBuilder := labels.NewBuilder(labels.EmptyLabels()) + + convertedSeriesMap := make(map[string]*convertedSeriesData) + + for s.nhcbSet.Next() { + nhcbSeries := s.nhcbSet.At() + if nhcbSeries == nil { + continue + } + + // Check if this is an NHCB histogram series + nhcbLabels := nhcbSeries.Labels() + it := nhcbSeries.Iterator(nil) + if it == nil { + continue + } + + for { + valType := it.Next() + if valType == chunkenc.ValNone { + break + } + + var h *histogram.Histogram + var fh *histogram.FloatHistogram + var t int64 + + switch valType { + case chunkenc.ValHistogram: + t, h = it.AtHistogram(nil) + if h == nil || !histogram.IsCustomBucketsSchema(h.Schema) { + continue + } + case chunkenc.ValFloatHistogram: + t, fh = it.AtFloatHistogram(nil) + if fh == nil || !histogram.IsCustomBucketsSchema(fh.Schema) { + continue + } + default: + // Not a histogram, skip + continue + } + + var nhcb any + if h != nil { + nhcb = h + } else { + nhcb = fh + } + + // We could try to find a way to cache the names and do only once the string concatenation + // also this convert to all parts of the Histogram (buckets sum, count) while we only need one + err := histogram.ConvertNHCBToClassic(nhcb, nhcbLabels, lsetBuilder, func(l labels.Labels, value float64) error { + // keep only series matching the requested suffix + name := l.Get(model.MetricNameLabel) + if !strings.HasSuffix(name, s.suffix) { + return nil + } + + key := l.String() + if _, exists := convertedSeriesMap[key]; !exists { + convertedSeriesMap[key] = &convertedSeriesData{ + labels: l, + samples: make([]chunks.Sample, 0), + } + } + + convertedSeriesMap[key].samples = append(convertedSeriesMap[key].samples, fSample{ + t: t, + f: value, + }) + return nil + }) + if err != nil { + s.err = err + return false + } + } + + if err := it.Err(); err != nil { + s.err = err + return false + } + } + + if err := s.nhcbSet.Err(); err != nil { + s.err = err + return false + } + + for _, data := range convertedSeriesMap { + s.series = append(s.series, NewListSeries(data.labels, data.samples)) + } + } + + if s.idx < len(s.series) { + s.idx++ + return true + } + + return false +} + +func (s *nhcbToClassicSeriesSet) At() Series { + if s.idx == 0 || s.idx > len(s.series) { + return nil + } + return s.series[s.idx-1] +} + +func (s *nhcbToClassicSeriesSet) Err() error { + return s.err +} + +func (s *nhcbToClassicSeriesSet) Warnings() annotations.Annotations { + return s.nhcbSet.Warnings() +} + +type convertedSeriesData struct { + labels labels.Labels + samples []chunks.Sample +} diff --git a/storage/nhcb_querier_test.go b/storage/nhcb_querier_test.go new file mode 100644 index 0000000000..2ce39af7bd --- /dev/null +++ b/storage/nhcb_querier_test.go @@ -0,0 +1,239 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "strings" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/util/annotations" +) + +func TestExtractHistogramSuffix(t *testing.T) { + tests := []struct { + name string + matchers []*labels.Matcher + expectedBase string + expectedSuffix string + }{ + { + name: "bucket suffix", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_bucket")}, + expectedBase: "http_requests", + expectedSuffix: "_bucket", + }, + { + name: "count suffix", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_count")}, + expectedBase: "http_requests", + expectedSuffix: "_count", + }, + { + name: "sum suffix", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_sum")}, + expectedBase: "http_requests", + expectedSuffix: "_sum", + }, + { + name: "no suffix - regular metric", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "my_gauge")}, + expectedBase: "", + expectedSuffix: "", + }, + { + name: "no metric name matcher", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus")}, + expectedBase: "", + expectedSuffix: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + baseName, suffix, _ := extractHistogramSuffix(tc.matchers) + require.Equal(t, tc.expectedBase, baseName) + require.Equal(t, tc.expectedSuffix, suffix) + }) + } +} + +func TestNHCBAsClassicQuerier_Select(t *testing.T) { + nhcb := &histogram.Histogram{ + Schema: histogram.CustomBucketsSchema, + Count: 16, + Sum: 100.0, + CustomValues: []float64{1.0, 5.0, 10.0}, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 4}}, + PositiveBuckets: []int64{2, 1, 2, 1}, + } + + tests := []struct { + name string + queryMatchers []*labels.Matcher + classicSeries []Series + nhcbSeries []Series + passthroughSeries []Series + expectedCount int + expectedSuffix string + }{ + { + name: "non-histogram query passes through", + queryMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "my_gauge")}, + passthroughSeries: []Series{ + NewListSeries(labels.FromStrings("__name__", "my_gauge"), []chunks.Sample{fSample{t: 1, f: 42}}), + }, + expectedCount: 1, + }, + { + name: "classic histogram exists - return classic", + queryMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_bucket")}, + classicSeries: []Series{ + NewListSeries(labels.FromStrings("__name__", "http_requests_bucket", "le", "1"), []chunks.Sample{fSample{t: 1, f: 5}}), + }, + expectedCount: 1, + }, + { + name: "no classic - convert NHCB to bucket series", + queryMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_bucket")}, + classicSeries: []Series{}, + nhcbSeries: []Series{ + NewListSeries(labels.FromStrings("__name__", "http_requests"), []chunks.Sample{hSample{t: 1, h: nhcb}}), + }, + expectedCount: 4, + expectedSuffix: "_bucket", + }, + { + name: "no classic - convert NHCB to count series", + queryMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_count")}, + classicSeries: []Series{}, + nhcbSeries: []Series{ + NewListSeries(labels.FromStrings("__name__", "http_requests"), []chunks.Sample{hSample{t: 1, h: nhcb}}), + }, + expectedCount: 1, + expectedSuffix: "_count", + }, + { + name: "no classic - convert NHCB to sum series", + queryMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_sum")}, + classicSeries: []Series{}, + nhcbSeries: []Series{ + NewListSeries(labels.FromStrings("__name__", "http_requests"), []chunks.Sample{hSample{t: 1, h: nhcb}}), + }, + expectedCount: 1, + expectedSuffix: "_sum", + }, + { + name: "no classic and no NHCB - return empty", + queryMatchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "http_requests_bucket")}, + classicSeries: []Series{}, + nhcbSeries: []Series{}, + expectedCount: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mock := &nhcbMockQuerier{ + classicSeries: tc.classicSeries, + nhcbSeries: tc.nhcbSeries, + passthroughSeries: tc.passthroughSeries, + } + q := NewNHCBAsClassicQuerier(mock) + + ss := q.Select(context.Background(), false, nil, tc.queryMatchers...) + var count int + for ss.Next() { + count++ + s := ss.At() + if tc.expectedSuffix != "" { + require.Contains(t, s.Labels().Get(model.MetricNameLabel), tc.expectedSuffix) + } + } + require.NoError(t, ss.Err()) + require.Equal(t, tc.expectedCount, count) + }) + } +} + +func TestNHCBAsClassicQuerier_FloatHistogram(t *testing.T) { + fhNHCB := &histogram.FloatHistogram{ + Schema: histogram.CustomBucketsSchema, + Count: 15, + Sum: 150.0, + CustomValues: []float64{1.0, 5.0}, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 3}}, + PositiveBuckets: []float64{3, 5, 7}, + } + + mock := &nhcbMockQuerier{ + classicSeries: []Series{}, + nhcbSeries: []Series{ + NewListSeries(labels.FromStrings("__name__", "latency"), []chunks.Sample{fhSample{t: 1, fh: fhNHCB}}), + }, + } + q := NewNHCBAsClassicQuerier(mock) + + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "latency_bucket")) + var count int + for ss.Next() { + count++ + } + require.NoError(t, ss.Err()) + require.Equal(t, 3, count) +} + +type nhcbMockQuerier struct { + classicSeries []Series + nhcbSeries []Series + passthroughSeries []Series // For non-histogram queries +} + +func (m *nhcbMockQuerier) Select(_ context.Context, _ bool, _ *SelectHints, matchers ...*labels.Matcher) SeriesSet { + for _, matcher := range matchers { + if matcher.Name == model.MetricNameLabel { + // Check if this is a histogram suffix query (classic histogram query) + if strings.HasSuffix(matcher.Value, "_bucket") || + strings.HasSuffix(matcher.Value, "_count") || + strings.HasSuffix(matcher.Value, "_sum") { + return NewMockSeriesSet(m.classicSeries...) + } + // If passthroughSeries is set, use it for non-histogram metric queries + if len(m.passthroughSeries) > 0 { + return NewMockSeriesSet(m.passthroughSeries...) + } + // Base metric name query - return NHCB series + return NewMockSeriesSet(m.nhcbSeries...) + } + } + return NewMockSeriesSet() +} + +func (*nhcbMockQuerier) LabelValues(context.Context, string, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (*nhcbMockQuerier) LabelNames(context.Context, *LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} + +func (*nhcbMockQuerier) Close() error { + return nil +}