Reused chunk iterators.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka
2020-09-04 13:04:29 +01:00
parent 065432398b
commit ba7233d33a

View File

@@ -475,13 +475,14 @@ type populateWithDelGenericSeriesIterator struct {
// chks are expected to be sorted by minTime and should be related to the same, single series.
chks []chunks.Meta
i int
err error
bufIter *deletedIterator
intervals tombstones.Intervals
i int
err error
bufDelIter *deletedIterator
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkIter chunkenc.Iterator
currChkMeta chunks.Meta
rewrite bool
}
func newPopulateWithDelGenericSeriesIterator(
@@ -490,11 +491,11 @@ func newPopulateWithDelGenericSeriesIterator(
intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{
chunks: chunks,
chks: chks,
i: -1,
bufIter: &deletedIterator{},
intervals: intervals,
chunks: chunks,
chks: chks,
i: -1,
bufDelIter: &deletedIterator{},
intervals: intervals,
}
}
@@ -506,16 +507,18 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
p.i++
p.currChkMeta = p.chks[p.i]
// TODO(bwplotka): Previous implementation was fetching all of the chunks in one go. Benchmark if potentially there is
// a gain in batching it more.
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref)
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref)
return false
}
p.bufIter.intervals = p.bufIter.intervals[:0]
p.bufDelIter.intervals = p.bufDelIter.intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
p.bufIter.intervals = p.bufIter.intervals.Add(interval)
p.bufDelIter.intervals = p.bufDelIter.intervals.Add(interval)
}
}
@@ -526,15 +529,17 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
//
// TODO think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
if len(p.bufDelIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
p.currDelIter = nil
p.currChkIter = p.currChkMeta.Chunk.Iterator(p.currChkIter)
p.rewrite = false
return true
}
// We don't want full chunk or it's potentially still opened, take just part of it.
p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
p.bufDelIter.it = p.currChkMeta.Chunk.Iterator(p.bufDelIter.it)
p.currChkIter = p.bufDelIter
p.rewrite = true
return true
}
@@ -550,56 +555,62 @@ func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.It
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunkenc.Iterator
}
func (p *populateWithDelSeriesIterator) Next() bool {
if p.curr != nil && p.curr.Next() {
return true
if p.currChkIter == nil {
if !p.next() {
return false
}
}
for p.next() {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
p.curr = p.currChkMeta.Chunk.Iterator(nil)
}
if p.curr.Next() {
for {
if p.currChkIter.Next() {
return true
}
if p.currChkIter.Err() != nil {
return false
}
if !p.next() {
return false
}
}
return false
}
func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
if p.curr != nil && p.curr.Seek(t) {
return true
}
for p.Next() {
if p.curr.Seek(t) {
return true
if p.currChkIter == nil {
if !p.next() {
return false
}
}
for {
if p.currChkIter.Seek(t) {
return true
}
if p.currChkIter.Err() != nil {
return false
}
if !p.next() {
return false
}
}
return false
}
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.currChkIter.At() }
func (p *populateWithDelSeriesIterator) Err() error {
if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
return err
}
if p.curr != nil {
return p.curr.Err()
if p.currChkIter != nil {
return p.currChkIter.Err()
}
return nil
}
type populateWithDelChunkSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunks.Meta
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
@@ -607,8 +618,7 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
return false
}
p.curr = p.currChkMeta
if p.currDelIter == nil {
if !p.rewrite {
return true
}
@@ -620,8 +630,8 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
return false
}
if !p.currDelIter.Next() {
if err := p.currDelIter.Err(); err != nil {
if !p.currChkIter.Next() {
if err := p.currChkIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
@@ -631,25 +641,25 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
return false
}
t, v := p.currDelIter.At()
p.curr.MinTime = t
t, v := p.currChkIter.At()
p.currChkMeta.MinTime = t
app.Append(t, v)
for p.currDelIter.Next() {
t, v = p.currDelIter.At()
for p.currChkIter.Next() {
t, v = p.currChkIter.At()
app.Append(t, v)
}
if err := p.currDelIter.Err(); err != nil {
if err := p.currChkIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
p.currChkMeta.Chunk = newChunk
p.currChkMeta.MaxTime = t
return true
}
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.currChkMeta }
// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Series with no samples.