From ba7233d33a42f6aa5f5434ebc8567eebdcac7d0a Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 4 Sep 2020 13:04:29 +0100 Subject: [PATCH] Reused chunk iterators. Signed-off-by: Bartlomiej Plotka --- tsdb/querier.go | 114 ++++++++++++++++++++++++++---------------------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/tsdb/querier.go b/tsdb/querier.go index 8bde49a4be..f33591a807 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -475,13 +475,14 @@ type populateWithDelGenericSeriesIterator struct { // chks are expected to be sorted by minTime and should be related to the same, single series. chks []chunks.Meta - i int - err error - bufIter *deletedIterator - intervals tombstones.Intervals + i int + err error + bufDelIter *deletedIterator + intervals tombstones.Intervals - currDelIter chunkenc.Iterator + currChkIter chunkenc.Iterator currChkMeta chunks.Meta + rewrite bool } func newPopulateWithDelGenericSeriesIterator( @@ -490,11 +491,11 @@ func newPopulateWithDelGenericSeriesIterator( intervals tombstones.Intervals, ) *populateWithDelGenericSeriesIterator { return &populateWithDelGenericSeriesIterator{ - chunks: chunks, - chks: chks, - i: -1, - bufIter: &deletedIterator{}, - intervals: intervals, + chunks: chunks, + chks: chks, + i: -1, + bufDelIter: &deletedIterator{}, + intervals: intervals, } } @@ -506,16 +507,18 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { p.i++ p.currChkMeta = p.chks[p.i] + // TODO(bwplotka): Previous implementation was fetching all of the chunks in one go. Benchmark if potentially there is + // a gain in batching it more. p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref) if p.err != nil { p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref) return false } - p.bufIter.intervals = p.bufIter.intervals[:0] + p.bufDelIter.intervals = p.bufDelIter.intervals[:0] for _, interval := range p.intervals { if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { - p.bufIter.intervals = p.bufIter.intervals.Add(interval) + p.bufDelIter.intervals = p.bufDelIter.intervals.Add(interval) } } @@ -526,15 +529,17 @@ func (p *populateWithDelGenericSeriesIterator) next() bool { // // TODO think how to avoid the typecasting to verify when it is head block. _, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk) - if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) { + if len(p.bufDelIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) { // If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is. - p.currDelIter = nil + p.currChkIter = p.currChkMeta.Chunk.Iterator(p.currChkIter) + p.rewrite = false return true } // We don't want full chunk or it's potentially still opened, take just part of it. - p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil) - p.currDelIter = p.bufIter + p.bufDelIter.it = p.currChkMeta.Chunk.Iterator(p.bufDelIter.it) + p.currChkIter = p.bufDelIter + p.rewrite = true return true } @@ -550,56 +555,62 @@ func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.It // populateWithDelSeriesIterator allows to iterate over samples for the single series. type populateWithDelSeriesIterator struct { *populateWithDelGenericSeriesIterator - - curr chunkenc.Iterator } func (p *populateWithDelSeriesIterator) Next() bool { - if p.curr != nil && p.curr.Next() { - return true + if p.currChkIter == nil { + if !p.next() { + return false + } } - for p.next() { - if p.currDelIter != nil { - p.curr = p.currDelIter - } else { - p.curr = p.currChkMeta.Chunk.Iterator(nil) - } - if p.curr.Next() { + for { + if p.currChkIter.Next() { return true } + if p.currChkIter.Err() != nil { + return false + } + if !p.next() { + return false + } } - return false } func (p *populateWithDelSeriesIterator) Seek(t int64) bool { - if p.curr != nil && p.curr.Seek(t) { - return true - } - for p.Next() { - if p.curr.Seek(t) { - return true + if p.currChkIter == nil { + if !p.next() { + return false + } + } + + for { + if p.currChkIter.Seek(t) { + return true + } + if p.currChkIter.Err() != nil { + return false + } + if !p.next() { + return false } } - return false } -func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() } +func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.currChkIter.At() } func (p *populateWithDelSeriesIterator) Err() error { if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil { return err } - if p.curr != nil { - return p.curr.Err() + if p.currChkIter != nil { + return p.currChkIter.Err() } return nil } type populateWithDelChunkSeriesIterator struct { *populateWithDelGenericSeriesIterator - - curr chunks.Meta } func (p *populateWithDelChunkSeriesIterator) Next() bool { @@ -607,8 +618,7 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { return false } - p.curr = p.currChkMeta - if p.currDelIter == nil { + if !p.rewrite { return true } @@ -620,8 +630,8 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { return false } - if !p.currDelIter.Next() { - if err := p.currDelIter.Err(); err != nil { + if !p.currChkIter.Next() { + if err := p.currChkIter.Err(); err != nil { p.err = errors.Wrap(err, "iterate chunk while re-encoding") return false } @@ -631,25 +641,25 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { return false } - t, v := p.currDelIter.At() - p.curr.MinTime = t + t, v := p.currChkIter.At() + p.currChkMeta.MinTime = t app.Append(t, v) - for p.currDelIter.Next() { - t, v = p.currDelIter.At() + for p.currChkIter.Next() { + t, v = p.currChkIter.At() app.Append(t, v) } - if err := p.currDelIter.Err(); err != nil { + if err := p.currChkIter.Err(); err != nil { p.err = errors.Wrap(err, "iterate chunk while re-encoding") return false } - p.curr.Chunk = newChunk - p.curr.MaxTime = t + p.currChkMeta.Chunk = newChunk + p.currChkMeta.MaxTime = t return true } -func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr } +func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.currChkMeta } // blockSeriesSet allows to iterate over sorted, populated series with applied tombstones. // Series with all deleted chunks are still present as Series with no samples.