diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 28f35572c2..91fcac1cfb 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -164,8 +164,8 @@ func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram { Sum: h.Sum, } - c.PositiveSpans, c.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, false) - c.NegativeSpans, c.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, false) + c.PositiveSpans, c.PositiveBuckets = mustReduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, false) + c.NegativeSpans, c.NegativeBuckets = mustReduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, false) return &c } @@ -393,13 +393,13 @@ func (h *FloatHistogram) Add(other *FloatHistogram) (res *FloatHistogram, counte switch { case other.Schema < h.Schema: - hPositiveSpans, hPositiveBuckets = reduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) - hNegativeSpans, hNegativeBuckets = reduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) + hPositiveSpans, hPositiveBuckets = mustReduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) + hNegativeSpans, hNegativeBuckets = mustReduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) h.Schema = other.Schema case other.Schema > h.Schema: - otherPositiveSpans, otherPositiveBuckets = reduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) - otherNegativeSpans, otherNegativeBuckets = reduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) + otherPositiveSpans, otherPositiveBuckets = mustReduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) + otherNegativeSpans, otherNegativeBuckets = mustReduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) } h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets) @@ -459,12 +459,12 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte switch { case other.Schema < h.Schema: - hPositiveSpans, hPositiveBuckets = reduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) - hNegativeSpans, hNegativeBuckets = reduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) + hPositiveSpans, hPositiveBuckets = mustReduceResolution(hPositiveSpans, hPositiveBuckets, h.Schema, other.Schema, false, true) + hNegativeSpans, hNegativeBuckets = mustReduceResolution(hNegativeSpans, hNegativeBuckets, h.Schema, other.Schema, false, true) h.Schema = other.Schema case other.Schema > h.Schema: - otherPositiveSpans, otherPositiveBuckets = reduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) - otherNegativeSpans, otherNegativeBuckets = reduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) + otherPositiveSpans, otherPositiveBuckets = mustReduceResolution(otherPositiveSpans, otherPositiveBuckets, other.Schema, h.Schema, false, false) + otherNegativeSpans, otherNegativeBuckets = mustReduceResolution(otherNegativeSpans, otherNegativeBuckets, other.Schema, h.Schema, false, false) } h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, hPositiveSpans, hPositiveBuckets, otherPositiveSpans, otherPositiveBuckets) @@ -1582,25 +1582,40 @@ func addCustomBucketsWithMismatches( } // ReduceResolution reduces the float histogram's spans, buckets into target schema. -// The target schema must be smaller than the current float histogram's schema. -// This will panic if the histogram has custom buckets or if the target schema is -// a custom buckets schema. -func (h *FloatHistogram) ReduceResolution(targetSchema int32) *FloatHistogram { +// An error is returned in the following cases: +// - The target schema is not smaller than the current histogram's schema. +// - The histogram has custom buckets. +// - The target schema is a custom buckets schema. +// - Any spans have an invalid offset. +// - The spans are inconsistent with the number of buckets. +func (h *FloatHistogram) ReduceResolution(targetSchema int32) error { + // Note that the follow three returns are not returning a + // histogram.Error because they are programming errors. if h.UsesCustomBuckets() { - panic("cannot reduce resolution when there are custom buckets") + return errors.New("cannot reduce resolution when there are custom buckets") } if IsCustomBucketsSchema(targetSchema) { - panic("cannot reduce resolution to custom buckets schema") + return errors.New("cannot reduce resolution to custom buckets schema") } if targetSchema >= h.Schema { - panic(fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema)) + return fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema) } - h.PositiveSpans, h.PositiveBuckets = reduceResolution(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, true) - h.NegativeSpans, h.NegativeBuckets = reduceResolution(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, true) + var err error + + if h.PositiveSpans, h.PositiveBuckets, err = reduceResolution( + h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, false, true, + ); err != nil { + return err + } + if h.NegativeSpans, h.NegativeBuckets, err = reduceResolution( + h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, false, true, + ); err != nil { + return err + } h.Schema = targetSchema - return h + return nil } // checkSchemaAndBounds checks if two histograms are compatible because they diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index ac339f152e..e79f5a0f49 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -4141,14 +4141,16 @@ func createRandomSpans(rng *rand.Rand, spanNum int32) ([]Span, []float64) { func TestFloatHistogramReduceResolution(t *testing.T) { tcs := map[string]struct { - origin *FloatHistogram - target *FloatHistogram + origin *FloatHistogram + targetSchema int32 + target *FloatHistogram + errorMsg string }{ "valid float histogram": { origin: &FloatHistogram{ Schema: 0, PositiveSpans: []Span{ - {Offset: 0, Length: 4}, + {Offset: -2, Length: 4}, {Offset: 0, Length: 0}, {Offset: 3, Length: 2}, }, @@ -4160,10 +4162,11 @@ func TestFloatHistogramReduceResolution(t *testing.T) { }, NegativeBuckets: []float64{1, 3, 1, 2, 1, 1}, }, + targetSchema: -1, target: &FloatHistogram{ Schema: -1, PositiveSpans: []Span{ - {Offset: 0, Length: 3}, + {Offset: -1, Length: 3}, {Offset: 1, Length: 1}, }, PositiveBuckets: []float64{1, 4, 2, 2}, @@ -4174,12 +4177,58 @@ func TestFloatHistogramReduceResolution(t *testing.T) { NegativeBuckets: []float64{1, 4, 2, 2}, }, }, + "not enough buckets": { + origin: &FloatHistogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1}, + }, + targetSchema: -1, + errorMsg: "have 5 buckets but spans need more: histogram spans specify different number of buckets than provided", + }, + "too many buckets": { + origin: &FloatHistogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 5}, + }, + targetSchema: -1, + errorMsg: "spans need 6 buckets, have 7 buckets: histogram spans specify different number of buckets than provided", + }, + "negative offset": { + origin: &FloatHistogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: -1, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1}, + }, + targetSchema: -1, + errorMsg: "span number 2 with offset -1: histogram has a span whose offset is negative", + }, } - for _, tc := range tcs { - target := tc.origin.ReduceResolution(tc.target.Schema) - require.Equal(t, tc.target, target) - // Check that receiver histogram was mutated: - require.Equal(t, tc.target, tc.origin) + for tn, tc := range tcs { + t.Run(tn, func(t *testing.T) { + err := tc.origin.ReduceResolution(tc.targetSchema) + if tc.errorMsg != "" { + require.Equal(t, tc.errorMsg, err.Error()) + // The returned error should be a histogram.Error. + require.ErrorAs(t, err, &Error{}) + return + } + require.NoError(t, err) + require.Equal(t, tc.target, tc.origin) + }) } } diff --git a/model/histogram/generic.go b/model/histogram/generic.go index cd385407d5..649db769c7 100644 --- a/model/histogram/generic.go +++ b/model/histogram/generic.go @@ -738,6 +738,8 @@ var exponentialBounds = [][]float64{ // deltas. Set it to false if the buckets contain absolute counts. // Set inplace to true to reuse input slices and avoid allocations (otherwise // new slices will be allocated for result). +// The functions returns an error if there are too many or too few buckets for the spans +// or if any span except the first has a negative offset. func reduceResolution[IBC InternalBucketCount]( originSpans []Span, originBuckets []IBC, @@ -745,7 +747,7 @@ func reduceResolution[IBC InternalBucketCount]( targetSchema int32, deltaBuckets bool, inplace bool, -) ([]Span, []IBC) { +) ([]Span, []IBC, error) { var ( targetSpans []Span // The spans in the target schema. targetBuckets []IBC // The bucket counts in the target schema. @@ -764,10 +766,18 @@ func reduceResolution[IBC InternalBucketCount]( targetBuckets = originBuckets[:0] } - for _, span := range originSpans { + for n, span := range originSpans { + if n > 0 && span.Offset < 0 { + return nil, nil, fmt.Errorf("span number %d with offset %d: %w", n+1, span.Offset, ErrHistogramSpanNegativeOffset) + } // Determine the index of the first bucket in this span. bucketIdx += span.Offset for j := 0; j < int(span.Length); j++ { + // Protect against too few buckets in the origin. + if bucketCountIdx >= len(originBuckets) { + return nil, nil, fmt.Errorf("have %d buckets but spans need more: %w", len(originBuckets), ErrHistogramSpansBucketsMismatch) + } + // Determine the index of the bucket in the target schema from the index in the original schema. targetBucketIdx = targetIdx(bucketIdx, originSchema, targetSchema) @@ -826,12 +836,33 @@ func reduceResolution[IBC InternalBucketCount]( targetBuckets = append(targetBuckets, originBuckets[bucketCountIdx]) } } - bucketIdx++ bucketCountIdx++ } } + if bucketCountIdx != len(originBuckets) { + return nil, nil, fmt.Errorf("spans need %d buckets, have %d buckets: %w", bucketCountIdx, len(originBuckets), ErrHistogramSpansBucketsMismatch) + } + return targetSpans, targetBuckets, nil +} +// mustReduceResolution works like reduceResolution, but panics instead of +// returning an error. Use mustReduceResolution if you are sure that the spans +// and buckets are valid. +func mustReduceResolution[IBC InternalBucketCount]( + originSpans []Span, + originBuckets []IBC, + originSchema, + targetSchema int32, + deltaBuckets bool, + inplace bool, +) ([]Span, []IBC) { + targetSpans, targetBuckets, err := reduceResolution( + originSpans, originBuckets, originSchema, targetSchema, deltaBuckets, inplace, + ) + if err != nil { + panic(err) + } return targetSpans, targetBuckets } diff --git a/model/histogram/generic_test.go b/model/histogram/generic_test.go index 1651830e9d..54324beaff 100644 --- a/model/histogram/generic_test.go +++ b/model/histogram/generic_test.go @@ -142,7 +142,7 @@ func TestReduceResolutionHistogram(t *testing.T) { for _, tc := range cases { spansCopy, bucketsCopy := slices.Clone(tc.spans), slices.Clone(tc.buckets) - spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, false) + spans, buckets := mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, false) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were not mutated: @@ -151,7 +151,7 @@ func TestReduceResolutionHistogram(t *testing.T) { // Output slices reuse input slices: const inplace = true - spans, buckets = reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, inplace) + spans, buckets = mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, true, inplace) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were mutated which is now expected: @@ -190,7 +190,7 @@ func TestReduceResolutionFloatHistogram(t *testing.T) { for _, tc := range cases { spansCopy, bucketsCopy := slices.Clone(tc.spans), slices.Clone(tc.buckets) - spans, buckets := reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, false) + spans, buckets := mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, false) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were not mutated: @@ -199,7 +199,7 @@ func TestReduceResolutionFloatHistogram(t *testing.T) { // Output slices reuse input slices: const inplace = true - spans, buckets = reduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, inplace) + spans, buckets = mustReduceResolution(tc.spans, tc.buckets, tc.schema, tc.targetSchema, false, inplace) require.Equal(t, tc.expectedSpans, spans) require.Equal(t, tc.expectedBuckets, buckets) // Verify inputs were mutated which is now expected: diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index 959df4c87a..5fc68ef9d0 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -14,6 +14,7 @@ package histogram import ( + "errors" "fmt" "math" "slices" @@ -617,26 +618,37 @@ func (c *cumulativeBucketIterator) At() Bucket[uint64] { } // ReduceResolution reduces the histogram's spans, buckets into target schema. -// The target schema must be smaller than the current histogram's schema. -// This will panic if the histogram has custom buckets or if the target schema is -// a custom buckets schema. -func (h *Histogram) ReduceResolution(targetSchema int32) *Histogram { +// An error is returned in the following cases: +// - The target schema is not smaller than the current histogram's schema. +// - The histogram has custom buckets. +// - The target schema is a custom buckets schema. +// - Any spans have an invalid offset. +// - The spans are inconsistent with the number of buckets. +func (h *Histogram) ReduceResolution(targetSchema int32) error { + // Note that the follow three returns are not returning a + // histogram.Error because they are programming errors. if h.UsesCustomBuckets() { - panic("cannot reduce resolution when there are custom buckets") + return errors.New("cannot reduce resolution when there are custom buckets") } if IsCustomBucketsSchema(targetSchema) { - panic("cannot reduce resolution to custom buckets schema") + return errors.New("cannot reduce resolution to custom buckets schema") } if targetSchema >= h.Schema { - panic(fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema)) + return fmt.Errorf("cannot reduce resolution from schema %d to %d", h.Schema, targetSchema) } - h.PositiveSpans, h.PositiveBuckets = reduceResolution( + var err error + + if h.PositiveSpans, h.PositiveBuckets, err = reduceResolution( h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema, true, true, - ) - h.NegativeSpans, h.NegativeBuckets = reduceResolution( + ); err != nil { + return err + } + if h.NegativeSpans, h.NegativeBuckets, err = reduceResolution( h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema, true, true, - ) + ); err != nil { + return err + } h.Schema = targetSchema - return h + return nil } diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index e4c6ce683b..ae17f9be37 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -1719,14 +1719,16 @@ func BenchmarkHistogramValidation(b *testing.B) { func TestHistogramReduceResolution(t *testing.T) { tcs := map[string]struct { - origin *Histogram - target *Histogram + origin *Histogram + targetSchema int32 + target *Histogram + errorMsg string }{ "valid histogram": { origin: &Histogram{ Schema: 0, PositiveSpans: []Span{ - {Offset: 0, Length: 4}, + {Offset: -2, Length: 4}, {Offset: 0, Length: 0}, {Offset: 3, Length: 2}, }, @@ -1738,10 +1740,11 @@ func TestHistogramReduceResolution(t *testing.T) { }, NegativeBuckets: []int64{1, 2, -2, 1, -1, 0}, }, + targetSchema: -1, target: &Histogram{ Schema: -1, PositiveSpans: []Span{ - {Offset: 0, Length: 3}, + {Offset: -1, Length: 3}, {Offset: 1, Length: 1}, }, PositiveBuckets: []int64{1, 3, -2, 0}, @@ -1752,12 +1755,58 @@ func TestHistogramReduceResolution(t *testing.T) { NegativeBuckets: []int64{1, 3, -2, 0}, }, }, + "not enough buckets": { + origin: &Histogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1}, + }, + targetSchema: -1, + errorMsg: "have 5 buckets but spans need more: histogram spans specify different number of buckets than provided", + }, + "too many buckets": { + origin: &Histogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 3}, + }, + targetSchema: -1, + errorMsg: "spans need 6 buckets, have 7 buckets: histogram spans specify different number of buckets than provided", + }, + "negative offset": { + origin: &Histogram{ + Schema: 0, + PositiveSpans: []Span{ + {Offset: -2, Length: 4}, + {Offset: -1, Length: 0}, + {Offset: 3, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0}, + }, + targetSchema: -1, + errorMsg: "span number 2 with offset -1: histogram has a span whose offset is negative", + }, } - for _, tc := range tcs { - target := tc.origin.ReduceResolution(tc.target.Schema) - require.Equal(t, tc.target, target) - // Check that receiver histogram was mutated: - require.Equal(t, tc.target, tc.origin) + for tn, tc := range tcs { + t.Run(tn, func(t *testing.T) { + err := tc.origin.ReduceResolution(tc.targetSchema) + if tc.errorMsg != "" { + require.Equal(t, tc.errorMsg, err.Error()) + // The returned error should be a histogram.Error. + require.ErrorAs(t, err, &Error{}) + return + } + require.NoError(t, err) + require.Equal(t, tc.target, tc.origin) + }) } } diff --git a/model/textparse/nhcbparse.go b/model/textparse/nhcbparse.go index ab821f0e63..79441e1f75 100644 --- a/model/textparse/nhcbparse.go +++ b/model/textparse/nhcbparse.go @@ -352,7 +352,7 @@ func (p *NHCBParser) swapExemplars() { } // processNHCB converts the collated classic histogram series to NHCB and caches the info -// to be returned to callers. Retruns true if the conversion was successful. +// to be returned to callers. Returns true if the conversion was successful. func (p *NHCBParser) processNHCB() bool { if p.state != stateCollecting { return false diff --git a/scrape/target.go b/scrape/target.go index 563fe33f82..2aabff20e2 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -389,6 +389,7 @@ type bucketLimitAppender struct { } func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + var err error if h != nil { // Return with an early error if the histogram has too many buckets and the // schema is not exponential, in which case we can't reduce the resolution. @@ -399,7 +400,9 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe if h.Schema <= histogram.ExponentialSchemaMin { return 0, errBucketLimit } - h = h.ReduceResolution(h.Schema - 1) + if err = h.ReduceResolution(h.Schema - 1); err != nil { + return 0, err + } } } if fh != nil { @@ -412,11 +415,12 @@ func (app *bucketLimitAppender) AppendHistogram(ref storage.SeriesRef, lset labe if fh.Schema <= histogram.ExponentialSchemaMin { return 0, errBucketLimit } - fh = fh.ReduceResolution(fh.Schema - 1) + if err = fh.ReduceResolution(fh.Schema - 1); err != nil { + return 0, err + } } } - ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) - if err != nil { + if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil { return 0, err } return ref, nil @@ -429,18 +433,22 @@ type maxSchemaAppender struct { } func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + var err error if h != nil { if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema { - h = h.ReduceResolution(app.maxSchema) + if err = h.ReduceResolution(app.maxSchema); err != nil { + return 0, err + } } } if fh != nil { if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema { - fh = fh.ReduceResolution(app.maxSchema) + if err = fh.ReduceResolution(app.maxSchema); err != nil { + return 0, err + } } } - ref, err := app.Appender.AppendHistogram(ref, lset, t, h, fh) - if err != nil { + if ref, err = app.Appender.AppendHistogram(ref, lset, t, h, fh); err != nil { return 0, err } return ref, nil diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 7e21909354..059d5e66ce 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -389,6 +389,11 @@ type concreteSeriesIterator struct { curValType chunkenc.ValueType series *concreteSeries err error + + // These are pre-filled with the current model histogram if curValType + // is ValHistogram or ValFloatHistogram, respectively. + curH *histogram.Histogram + curFH *histogram.FloatHistogram } func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator { @@ -461,9 +466,7 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { c.curValType = chunkenc.ValHistogram } if c.curValType == chunkenc.ValHistogram { - h := &c.series.histograms[c.histogramsCur] - c.curValType = getHistogramValType(h) - c.err = validateHistogramSchema(h) + c.setCurrentHistogram() } if c.err != nil { c.curValType = chunkenc.ValNone @@ -471,18 +474,57 @@ func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { return c.curValType } -func validateHistogramSchema(h *prompb.Histogram) error { - if histogram.IsKnownSchema(h.Schema) { - return nil - } - return histogram.UnknownSchemaError(h.Schema) -} +// setCurrentHistogram pre-fills either the curH or the curFH field with a +// converted model histogram and sets c.curValType accordingly. It validates the +// histogram and sets c.err accordingly. This all has to be done in Seek() and +// Next() already so that we know if the histogram we got from the remote-read +// source is valid or not before we allow the AtHistogram()/AtFloatHistogram() +// call. +func (c *concreteSeriesIterator) setCurrentHistogram() { + pbH := c.series.histograms[c.histogramsCur] -func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType { - if h.IsFloatHistogram() { - return chunkenc.ValFloatHistogram + // Basic schema check first. + schema := pbH.Schema + if !histogram.IsKnownSchema(schema) { + c.err = histogram.UnknownSchemaError(schema) + return } - return chunkenc.ValHistogram + + if pbH.IsFloatHistogram() { + c.curValType = chunkenc.ValFloatHistogram + mFH := pbH.ToFloatHistogram() + if mFH.Schema > histogram.ExponentialSchemaMax && mFH.Schema <= histogram.ExponentialSchemaMaxReserved { + // This is a very slow path, but it should only happen if the + // sample is from a newer Prometheus version that supports higher + // resolution. + if err := mFH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + c.err = err + return + } + } + if err := mFH.Validate(); err != nil { + c.err = err + return + } + c.curFH = mFH + return + } + c.curValType = chunkenc.ValHistogram + mH := pbH.ToIntHistogram() + if mH.Schema > histogram.ExponentialSchemaMax && mH.Schema <= histogram.ExponentialSchemaMaxReserved { + // This is a very slow path, but it should only happen if the + // sample is from a newer Prometheus version that supports higher + // resolution. + if err := mH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + c.err = err + return + } + } + if err := mH.Validate(); err != nil { + c.err = err + return + } + c.curH = mH } // At implements chunkenc.Iterator. @@ -499,31 +541,19 @@ func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *hist if c.curValType != chunkenc.ValHistogram { panic("iterator is not on an integer histogram sample") } - h := c.series.histograms[c.histogramsCur] - mh := h.ToIntHistogram() - if mh.Schema > histogram.ExponentialSchemaMax && mh.Schema <= histogram.ExponentialSchemaMaxReserved { - // This is a very slow path, but it should only happen if the - // sample is from a newer Prometheus version that supports higher - // resolution. - mh.ReduceResolution(histogram.ExponentialSchemaMax) - } - return h.Timestamp, mh + return c.series.histograms[c.histogramsCur].Timestamp, c.curH } // AtFloatHistogram implements chunkenc.Iterator. func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram { - fh := c.series.histograms[c.histogramsCur] - mfh := fh.ToFloatHistogram() // integer will be auto-converted. - if mfh.Schema > histogram.ExponentialSchemaMax && mfh.Schema <= histogram.ExponentialSchemaMaxReserved { - // This is a very slow path, but it should only happen if the - // sample is from a newer Prometheus version that supports higher - // resolution. - mfh.ReduceResolution(histogram.ExponentialSchemaMax) - } - return fh.Timestamp, mfh + switch c.curValType { + case chunkenc.ValFloatHistogram: + return c.series.histograms[c.histogramsCur].Timestamp, c.curFH + case chunkenc.ValHistogram: + return c.series.histograms[c.histogramsCur].Timestamp, c.curH.ToFloat(nil) + default: + panic("iterator is not on a histogram sample") } - panic("iterator is not on a histogram sample") } // AtT implements chunkenc.Iterator. @@ -571,9 +601,7 @@ func (c *concreteSeriesIterator) Next() chunkenc.ValueType { } if c.curValType == chunkenc.ValHistogram { - h := &c.series.histograms[c.histogramsCur] - c.curValType = getHistogramValType(h) - c.err = validateHistogramSchema(h) + c.setCurrentHistogram() } if c.err != nil { c.curValType = chunkenc.ValNone diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index ce3a09b878..ba67ff33d9 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -546,7 +546,7 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Seek(1)) } -func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) { +func TestConcreteSeriesIterator_HistogramSamplesWithInvalidSchema(t *testing.T) { for _, schema := range []int32{-100, 100} { t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) { h := prompb.FromIntHistogram(2, &testHistogram) @@ -591,6 +591,47 @@ func TestConcreteSeriesIterator_InvalidHistogramSamples(t *testing.T) { } } +func TestConcreteSeriesIterator_HistogramSamplesWithMissingBucket(t *testing.T) { + mh := testHistogram.Copy() + mh.PositiveSpans = []histogram.Span{{Offset: 0, Length: 2}} + h := prompb.FromIntHistogram(2, mh) + fh := prompb.FromFloatHistogram(4, mh.ToFloat(nil)) + series := &concreteSeries{ + labels: labels.FromStrings("foo", "bar"), + floats: []prompb.Sample{ + {Value: 1, Timestamp: 0}, + {Value: 2, Timestamp: 3}, + }, + histograms: []prompb.Histogram{ + h, + fh, + }, + } + it := series.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.Error(t, it.Err()) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValFloat, it.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValNone, it.Seek(1)) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValFloat, it.Seek(3)) + require.Equal(t, chunkenc.ValNone, it.Next()) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) + + it = series.Iterator(it) + require.Equal(t, chunkenc.ValNone, it.Seek(4)) + require.ErrorIs(t, it.Err(), histogram.ErrHistogramSpansBucketsMismatch) +} + func TestConcreteSeriesIterator_ReducesHighResolutionHistograms(t *testing.T) { for _, schema := range []int32{9, 52} { t.Run(fmt.Sprintf("schema=%d", schema), func(t *testing.T) { diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 67c244167b..b95c85b6c4 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -697,19 +697,23 @@ func (app *remoteWriteAppender) Append(ref storage.SeriesRef, lset labels.Labels } func (app *remoteWriteAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + var err error if t > app.maxTime { return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) } if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax { - h = h.ReduceResolution(histogram.ExponentialSchemaMax) + if err = h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return 0, err + } } if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax { - fh = fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err = fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return 0, err + } } - ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh) - if err != nil { + if ref, err = app.Appender.AppendHistogram(ref, l, t, h, fh); err != nil { return 0, err } return ref, nil diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 8002dd0d4e..d960e835f2 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -884,7 +884,14 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) // chunk is from a newer Prometheus version that supports higher // resolution. fh = fh.Copy() - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen + // with invalid data in a chunk. As this is a + // rare edge case of a rare edge case, we'd + // rather not create all the plumbing to handle + // this error gracefully. + panic(err) + } } return it.t, fh } @@ -915,7 +922,13 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) // This is a very slow path, but it should only happen if the // chunk is from a newer Prometheus version that supports higher // resolution. - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen with + // invalid data in a chunk. As this is a rare edge case + // of a rare edge case, we'd rather not create all the + // plumbing to handle this error gracefully. + panic(err) + } } return it.t, fh diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index cc1d771235..be1c31ae76 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -939,7 +939,14 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog // chunk is from a newer Prometheus version that supports higher // resolution. h = h.Copy() - h.ReduceResolution(histogram.ExponentialSchemaMax) + if err := h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen + // with invalid data in a chunk. As this is a + // rare edge case of a rare edge case, we'd + // rather not create all the plumbing to handle + // this error gracefully. + panic(err) + } } return it.t, h } @@ -970,7 +977,13 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog // This is a very slow path, but it should only happen if the // chunk is from a newer Prometheus version that supports higher // resolution. - h.ReduceResolution(histogram.ExponentialSchemaMax) + if err := h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen with + // invalid data in a chunk. As this is a rare edge case + // of a rare edge case, we'd rather not create all the + // plumbing to handle this error gracefully. + panic(err) + } } return it.t, h @@ -1000,7 +1013,14 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int // chunk is from a newer Prometheus version that supports higher // resolution. fh = fh.Copy() - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen + // with invalid data in a chunk. As this is a + // rare edge case of a rare edge case, we'd + // rather not create all the plumbing to handle + // this error gracefully. + panic(err) + } } return it.t, fh } @@ -1039,7 +1059,13 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int // This is a very slow path, but it should only happen if the // chunk is from a newer Prometheus version that supports higher // resolution. - fh.ReduceResolution(histogram.ExponentialSchemaMax) + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + // With the checks above, this can only happen with + // invalid data in a chunk. As this is a rare edge case + // of a rare edge case, we'd rather not create all the + // plumbing to handle this error gracefully. + panic(err) + } } return it.t, fh diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 561810a3a5..5791f60df4 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -475,7 +475,9 @@ func (d *Decoder) HistogramSamples(rec []byte, histograms []RefHistogramSample) // This is a very slow path, but it should only happen if the // record is from a newer Prometheus version that supports higher // resolution. - rh.H.ReduceResolution(histogram.ExponentialSchemaMax) + if err := rh.H.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return nil, fmt.Errorf("error reducing resolution of histogram #%d: %w", len(histograms)+1, err) + } } histograms = append(histograms, rh) @@ -579,7 +581,9 @@ func (d *Decoder) FloatHistogramSamples(rec []byte, histograms []RefFloatHistogr // This is a very slow path, but it should only happen if the // record is from a newer Prometheus version that supports higher // resolution. - rh.FH.ReduceResolution(histogram.ExponentialSchemaMax) + if err := rh.FH.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return nil, fmt.Errorf("error reducing resolution of histogram #%d: %w", len(histograms)+1, err) + } } histograms = append(histograms, rh)