mirror of
https://github.com/prometheus/prometheus
synced 2026-04-20 22:41:05 +08:00
feat(chunkenc): allow more native histograms schemas
Allow -9..52 schemas instead of just -4..8, but reduce resolution to 8 if above. The reduce code path will be slow, but we only expect it to happen if TSDB already has higher resolution samples and we are in a rollback. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> # Conflicts: # model/histogram/generic.go
This commit is contained in:
@@ -45,12 +45,17 @@ var (
|
||||
ErrHistogramCustomBucketsNegBuckets = errors.New("custom buckets: must not have negative buckets")
|
||||
ErrHistogramExpSchemaCustomBounds = errors.New("histogram with exponential schema must not have custom bounds")
|
||||
ErrHistogramsInvalidSchema = fmt.Errorf("histogram has an invalid schema, which must be between %d and %d for exponential buckets, or %d for custom buckets", ExponentialSchemaMin, ExponentialSchemaMax, CustomBucketsSchema)
|
||||
ErrHistogramsUnknownSchema = fmt.Errorf("histogram has an unknown schema, which must be between %d and %d for exponential buckets, or %d for custom buckets", ExponentialSchemaMinReserved, ExponentialSchemaMaxReserved, CustomBucketsSchema)
|
||||
)
|
||||
|
||||
func InvalidSchemaError(s int32) error {
|
||||
return fmt.Errorf("%w, got schema %d", ErrHistogramsInvalidSchema, s)
|
||||
}
|
||||
|
||||
func UnknownSchemaError(s int32) error {
|
||||
return fmt.Errorf("%w, got schema %d", ErrHistogramsUnknownSchema, s)
|
||||
}
|
||||
|
||||
func IsCustomBucketsSchema(s int32) bool {
|
||||
return s == CustomBucketsSchema
|
||||
}
|
||||
@@ -67,6 +72,12 @@ func IsValidSchema(s int32) bool {
|
||||
return IsCustomBucketsSchema(s) || IsExponentialSchema(s)
|
||||
}
|
||||
|
||||
// IsKnownSchema returns bool if we known and accept the schema, but need to
|
||||
// reduce resolution to the nearest supported schema.
|
||||
func IsKnownSchema(s int32) bool {
|
||||
return IsCustomBucketsSchema(s) || IsExponentialSchemaReserved(s)
|
||||
}
|
||||
|
||||
// BucketCount is a type constraint for the count in a bucket, which can be
|
||||
// float64 (for type FloatHistogram) or uint64 (for type Histogram).
|
||||
type BucketCount interface {
|
||||
|
||||
@@ -866,7 +866,7 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram)
|
||||
}
|
||||
if fh == nil {
|
||||
it.atFloatHistogramCalled = true
|
||||
return it.t, &histogram.FloatHistogram{
|
||||
fh = &histogram.FloatHistogram{
|
||||
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
||||
Count: it.cnt.value,
|
||||
ZeroCount: it.zCnt.value,
|
||||
@@ -879,6 +879,14 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram)
|
||||
NegativeBuckets: it.nBuckets,
|
||||
CustomValues: it.customValues,
|
||||
}
|
||||
if fh.Schema > histogram.ExponentialSchemaMax && fh.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||
// This is a very slow path, but it should only happen if the
|
||||
// chunk is from a newer Prometheus version that supports higher
|
||||
// resolution.
|
||||
fh = fh.Copy()
|
||||
fh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
return it.t, fh
|
||||
}
|
||||
|
||||
fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
|
||||
@@ -903,6 +911,13 @@ func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram)
|
||||
// Custom values are interned. The single copy is in this iterator.
|
||||
fh.CustomValues = it.customValues
|
||||
|
||||
if fh.Schema > histogram.ExponentialSchemaMax && fh.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||
// This is a very slow path, but it should only happen if the
|
||||
// chunk is from a newer Prometheus version that supports higher
|
||||
// resolution.
|
||||
fh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
|
||||
return it.t, fh
|
||||
}
|
||||
|
||||
@@ -955,8 +970,8 @@ func (it *floatHistogramIterator) Next() ValueType {
|
||||
return ValNone
|
||||
}
|
||||
|
||||
if !histogram.IsValidSchema(schema) {
|
||||
it.err = histogram.InvalidSchemaError(schema)
|
||||
if !histogram.IsKnownSchema(schema) {
|
||||
it.err = histogram.UnknownSchemaError(schema)
|
||||
return ValNone
|
||||
}
|
||||
|
||||
|
||||
@@ -1492,3 +1492,33 @@ func TestFloatHistogramIteratorFailIfSchemaInValid(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFloatHistogramIteratorReduceShema(t *testing.T) {
|
||||
for _, schema := range []int32{9, 52} {
|
||||
t.Run(fmt.Sprintf("schema %d", schema), func(t *testing.T) {
|
||||
h := &histogram.FloatHistogram{
|
||||
Schema: schema,
|
||||
Count: 10,
|
||||
Sum: 15.0,
|
||||
ZeroThreshold: 1e-100,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []float64{1, 2, 3, 4},
|
||||
}
|
||||
|
||||
c := NewFloatHistogramChunk()
|
||||
app, err := c.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, _, err = app.AppendFloatHistogram(nil, 1, h, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
it := c.Iterator(nil)
|
||||
require.Equal(t, ValFloatHistogram, it.Next())
|
||||
_, rh := it.AtFloatHistogram(nil)
|
||||
require.Equal(t, histogram.ExponentialSchemaMax, rh.Schema)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -921,7 +921,7 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog
|
||||
}
|
||||
if h == nil {
|
||||
it.atHistogramCalled = true
|
||||
return it.t, &histogram.Histogram{
|
||||
h = &histogram.Histogram{
|
||||
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
||||
Count: it.cnt,
|
||||
ZeroCount: it.zCnt,
|
||||
@@ -934,6 +934,14 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog
|
||||
NegativeBuckets: it.nBuckets,
|
||||
CustomValues: it.customValues,
|
||||
}
|
||||
if h.Schema > histogram.ExponentialSchemaMax && h.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||
// This is a very slow path, but it should only happen if the
|
||||
// chunk is from a newer Prometheus version that supports higher
|
||||
// resolution.
|
||||
h = h.Copy()
|
||||
h.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
return it.t, h
|
||||
}
|
||||
|
||||
h.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
|
||||
@@ -958,6 +966,13 @@ func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histog
|
||||
// Custom values are interned. The single copy is here in the iterator.
|
||||
h.CustomValues = it.customValues
|
||||
|
||||
if h.Schema > histogram.ExponentialSchemaMax && h.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||
// This is a very slow path, but it should only happen if the
|
||||
// chunk is from a newer Prometheus version that supports higher
|
||||
// resolution.
|
||||
h.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
|
||||
return it.t, h
|
||||
}
|
||||
|
||||
@@ -967,7 +982,7 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int
|
||||
}
|
||||
if fh == nil {
|
||||
it.atFloatHistogramCalled = true
|
||||
return it.t, &histogram.FloatHistogram{
|
||||
fh = &histogram.FloatHistogram{
|
||||
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
|
||||
Count: float64(it.cnt),
|
||||
ZeroCount: float64(it.zCnt),
|
||||
@@ -980,6 +995,14 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int
|
||||
NegativeBuckets: it.nFloatBuckets,
|
||||
CustomValues: it.customValues,
|
||||
}
|
||||
if fh.Schema > histogram.ExponentialSchemaMax && fh.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||
// This is a very slow path, but it should only happen if the
|
||||
// chunk is from a newer Prometheus version that supports higher
|
||||
// resolution.
|
||||
fh = fh.Copy()
|
||||
fh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
return it.t, fh
|
||||
}
|
||||
|
||||
fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead)
|
||||
@@ -1012,6 +1035,13 @@ func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int
|
||||
// Custom values are interned. The single copy is here in the iterator.
|
||||
fh.CustomValues = it.customValues
|
||||
|
||||
if fh.Schema > histogram.ExponentialSchemaMax && fh.Schema <= histogram.ExponentialSchemaMaxReserved {
|
||||
// This is a very slow path, but it should only happen if the
|
||||
// chunk is from a newer Prometheus version that supports higher
|
||||
// resolution.
|
||||
fh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
|
||||
return it.t, fh
|
||||
}
|
||||
|
||||
@@ -1078,8 +1108,8 @@ func (it *histogramIterator) Next() ValueType {
|
||||
return ValNone
|
||||
}
|
||||
|
||||
if !histogram.IsValidSchema(schema) {
|
||||
it.err = histogram.InvalidSchemaError(schema)
|
||||
if !histogram.IsKnownSchema(schema) {
|
||||
it.err = histogram.UnknownSchemaError(schema)
|
||||
return ValNone
|
||||
}
|
||||
|
||||
|
||||
@@ -1848,3 +1848,36 @@ func TestHistogramIteratorFailIfSchemaInValid(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHistogramIteratorReduceShema(t *testing.T) {
|
||||
for _, schema := range []int32{9, 52} {
|
||||
t.Run(fmt.Sprintf("schema %d", schema), func(t *testing.T) {
|
||||
h := &histogram.Histogram{
|
||||
Schema: schema,
|
||||
Count: 10,
|
||||
Sum: 15.0,
|
||||
ZeroThreshold: 1e-100,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 2, 3, 4},
|
||||
}
|
||||
|
||||
c := NewHistogramChunk()
|
||||
app, err := c.Appender()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, _, err = app.AppendHistogram(nil, 1, h, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
it := c.Iterator(nil)
|
||||
require.Equal(t, ValHistogram, it.Next())
|
||||
_, rh := it.AtHistogram(nil)
|
||||
require.Equal(t, histogram.ExponentialSchemaMax, rh.Schema)
|
||||
|
||||
_, rfh := it.AtFloatHistogram(nil)
|
||||
require.Equal(t, histogram.ExponentialSchemaMax, rfh.Schema)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user