From e1cb29bf8a7e435ed1fc7d88f46900017b4ee543 Mon Sep 17 00:00:00 2001 From: pipiland2612 Date: Fri, 31 Oct 2025 21:55:14 +0200 Subject: [PATCH] create common struct and function to DRY Signed-off-by: pipiland2612 --- storage/remote/queue_manager.go | 136 +++++++++++++++++++++----------- 1 file changed, 88 insertions(+), 48 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 25d3a94b6a..a5c215ec2e 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/common/promslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" "go.uber.org/atomic" @@ -1737,6 +1738,20 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } reqSize := len(req) + sc := sendBatchContext{ + ctx: ctx, + sampleCount: sampleCount, + exemplarCount: exemplarCount, + histogramCount: histogramCount, + metadataCount: metadataCount, + reqSize: reqSize, + } + + metricsUpdater := batchMetricsUpdater{ + metrics: s.qm.metrics, + storeClient: s.qm.storeClient, + sentDuration: s.qm.metrics.sentBatchDuration, + } // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need // to track the total amount of accepted data across the various attempts. @@ -1772,33 +1787,14 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti req = req2 } - ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") + ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try) defer span.End() - span.SetAttributes( - attribute.Int("request_size", reqSize), - attribute.Int("samples", sampleCount), - attribute.Int("try", try), - attribute.String("remote_name", s.qm.storeClient.Name()), - attribute.String("remote_url", s.qm.storeClient.Endpoint()), - ) - - if exemplarCount > 0 { - span.SetAttributes(attribute.Int("exemplars", exemplarCount)) - } - if histogramCount > 0 { - span.SetAttributes(attribute.Int("histograms", histogramCount)) - } - begin := time.Now() - s.qm.metrics.samplesTotal.Add(float64(sampleCount)) - s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) - s.qm.metrics.metadataTotal.Add(float64(metadataCount)) + metricsUpdater.recordBatchAttempt(sc, begin) // Technically for v1, we will likely have empty response stats, but for // newer Receivers this might be not, so used it in a best effort. rs, err := s.qm.client().Store(ctx, req, try) - s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error // so far we don't have those, so it's ok to potentially skew statistics. addStats(rs) @@ -1811,9 +1807,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti } onRetry := func() { - s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) + metricsUpdater.recordRetry(sc) } err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry) @@ -1850,6 +1844,20 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 } reqSize := len(req) + sc := sendBatchContext{ + ctx: ctx, + sampleCount: sampleCount, + exemplarCount: exemplarCount, + histogramCount: histogramCount, + metadataCount: metadataCount, + reqSize: reqSize, + } + + metricsUpdater := batchMetricsUpdater{ + metrics: s.qm.metrics, + storeClient: s.qm.storeClient, + sentDuration: s.qm.metrics.sentBatchDuration, + } // Since we retry writes via attemptStore and sendWriteRequestWithBackoff we need // to track the total amount of accepted data across the various attempts. @@ -1885,31 +1893,12 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 req = req2 } - ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") + ctx, span := createBatchSpan(sc.ctx, sc, s.qm.storeClient.Name(), s.qm.storeClient.Endpoint(), try) defer span.End() - span.SetAttributes( - attribute.Int("request_size", reqSize), - attribute.Int("samples", sampleCount), - attribute.Int("try", try), - attribute.String("remote_name", s.qm.storeClient.Name()), - attribute.String("remote_url", s.qm.storeClient.Endpoint()), - ) - - if exemplarCount > 0 { - span.SetAttributes(attribute.Int("exemplars", exemplarCount)) - } - if histogramCount > 0 { - span.SetAttributes(attribute.Int("histograms", histogramCount)) - } - begin := time.Now() - s.qm.metrics.samplesTotal.Add(float64(sampleCount)) - s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.histogramsTotal.Add(float64(histogramCount)) - s.qm.metrics.metadataTotal.Add(float64(metadataCount)) + metricsUpdater.recordBatchAttempt(sc, begin) rs, err := s.qm.client().Store(ctx, req, try) - s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) // TODO(bwplotka): Revisit this once we have Receivers doing retriable partial error // so far we don't have those, so it's ok to potentially skew statistics. addStats(rs) @@ -1933,9 +1922,7 @@ func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2 } onRetry := func() { - s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.retriedExemplarsTotal.Add(float64(exemplarCount)) - s.qm.metrics.retriedHistogramsTotal.Add(float64(histogramCount)) + metricsUpdater.recordRetry(sc) } err = s.qm.sendWriteRequestWithBackoff(ctx, attemptStore, onRetry) @@ -2266,3 +2253,56 @@ func buildV2TimeSeries(timeSeries []writev2.TimeSeries, filter func(writev2.Time timeSeries = timeSeries[:keepIdx] return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms } + +// sendBatchContext encapsulates the common parameters for sending batches. +// This reduces the number of function arguments (addresses "too many arguments" issue). +type sendBatchContext struct { + ctx context.Context + sampleCount int + exemplarCount int + histogramCount int + metadataCount int + reqSize int +} + +// batchMetricsUpdater encapsulates metrics update operations for batch sending. +type batchMetricsUpdater struct { + metrics *queueManagerMetrics + storeClient WriteClient + sentDuration prometheus.Observer +} + +// recordBatchAttempt records metrics for a batch send attempt. +func (b *batchMetricsUpdater) recordBatchAttempt(sc sendBatchContext, begin time.Time) { + b.metrics.samplesTotal.Add(float64(sc.sampleCount)) + b.metrics.exemplarsTotal.Add(float64(sc.exemplarCount)) + b.metrics.histogramsTotal.Add(float64(sc.histogramCount)) + b.metrics.metadataTotal.Add(float64(sc.metadataCount)) + b.sentDuration.Observe(time.Since(begin).Seconds()) +} + +// recordRetry records retry metrics for a batch. +func (b *batchMetricsUpdater) recordRetry(sc sendBatchContext) { + b.metrics.retriedSamplesTotal.Add(float64(sc.sampleCount)) + b.metrics.retriedExemplarsTotal.Add(float64(sc.exemplarCount)) + b.metrics.retriedHistogramsTotal.Add(float64(sc.histogramCount)) +} + +// createSpan creates and configures an OpenTelemetry span for batch sending. +func createBatchSpan(ctx context.Context, sc sendBatchContext, remoteName, remoteURL string, try int) (context.Context, trace.Span) { + ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch") + span.SetAttributes( + attribute.Int("request_size", sc.reqSize), + attribute.Int("samples", sc.sampleCount), + attribute.Int("try", try), + attribute.String("remote_name", remoteName), + attribute.String("remote_url", remoteURL), + ) + if sc.exemplarCount > 0 { + span.SetAttributes(attribute.Int("exemplars", sc.exemplarCount)) + } + if sc.histogramCount > 0 { + span.SetAttributes(attribute.Int("histograms", sc.histogramCount)) + } + return ctx, span +}