mirror of
https://github.com/prometheus/prometheus
synced 2026-04-30 23:11:34 +08:00
Remove label index and labe offset index
Signed-off-by: pipiland2612 <nguyen.t.dang.minh@gmail.com>
This commit is contained in:
@@ -15,7 +15,6 @@ package index
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
@@ -142,8 +141,7 @@ type Writer struct {
|
||||
lastSymbol string
|
||||
symbolCache map[string]uint32 // From symbol to index in table.
|
||||
|
||||
labelIndexes []labelIndexHashEntry // Label index offsets.
|
||||
labelNames map[string]uint64 // Label names, and their usage.
|
||||
labelNames map[string]uint64 // Label names, and their usage.
|
||||
|
||||
// Hold last series to validate that clients insert new series in order.
|
||||
lastSeries labels.Labels
|
||||
@@ -393,9 +391,6 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
||||
if err := w.writePostingsToTmpFiles(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.writeLabelIndices(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.toc.Postings = w.f.pos
|
||||
if err := w.writePostings(); err != nil {
|
||||
@@ -403,9 +398,6 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
||||
}
|
||||
|
||||
w.toc.LabelIndicesTable = w.f.pos
|
||||
if err := w.writeLabelIndexesOffsetTable(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.toc.PostingsTable = w.f.pos
|
||||
if err := w.writePostingsOffsetTable(); err != nil {
|
||||
@@ -592,147 +584,6 @@ func (w *Writer) finishSymbols() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) writeLabelIndices() error {
|
||||
if err := w.fPO.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find all the label values in the tmp posting offset table.
|
||||
f, err := fileutil.OpenMmapFile(w.fPO.name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos))
|
||||
cnt := w.cntPO
|
||||
current := []byte{}
|
||||
values := []uint32{}
|
||||
for d.Err() == nil && cnt > 0 {
|
||||
cnt--
|
||||
d.Uvarint() // Keycount.
|
||||
name := d.UvarintBytes() // Label name.
|
||||
value := d.UvarintBytes() // Label value.
|
||||
d.Uvarint64() // Offset.
|
||||
if len(name) == 0 {
|
||||
continue // All index is ignored.
|
||||
}
|
||||
|
||||
if !bytes.Equal(name, current) && len(values) > 0 {
|
||||
// We've reached a new label name.
|
||||
if err := w.writeLabelIndex(string(current), values); err != nil {
|
||||
return err
|
||||
}
|
||||
values = values[:0]
|
||||
}
|
||||
current = name
|
||||
sid, ok := w.symbolCache[string(value)]
|
||||
if !ok {
|
||||
return fmt.Errorf("symbol entry for %q does not exist", string(value))
|
||||
}
|
||||
values = append(values, sid)
|
||||
}
|
||||
if d.Err() != nil {
|
||||
return d.Err()
|
||||
}
|
||||
|
||||
// Handle the last label.
|
||||
if len(values) > 0 {
|
||||
if err := w.writeLabelIndex(string(current), values); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) writeLabelIndex(name string, values []uint32) error {
|
||||
// Align beginning to 4 bytes for more efficient index list scans.
|
||||
if err := w.addPadding(4); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
|
||||
keys: []string{name},
|
||||
offset: w.f.pos,
|
||||
})
|
||||
|
||||
startPos := w.f.pos
|
||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
||||
if err := w.write([]byte("alen")); err != nil {
|
||||
return err
|
||||
}
|
||||
w.crc32.Reset()
|
||||
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(1) // Number of names.
|
||||
w.buf1.PutBE32int(len(values))
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range values {
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32(v)
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write out the length.
|
||||
w.buf1.Reset()
|
||||
l := w.f.pos - startPos - 4
|
||||
if l > math.MaxUint32 {
|
||||
return fmt.Errorf("label index size exceeds 4 bytes: %d", l)
|
||||
}
|
||||
w.buf1.PutBE32int(int(l))
|
||||
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutHashSum(w.crc32)
|
||||
return w.write(w.buf1.Get())
|
||||
}
|
||||
|
||||
// writeLabelIndexesOffsetTable writes the label indices offset table.
|
||||
func (w *Writer) writeLabelIndexesOffsetTable() error {
|
||||
startPos := w.f.pos
|
||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
||||
if err := w.write([]byte("alen")); err != nil {
|
||||
return err
|
||||
}
|
||||
w.crc32.Reset()
|
||||
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(len(w.labelIndexes))
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, e := range w.labelIndexes {
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutUvarint(len(e.keys))
|
||||
for _, k := range e.keys {
|
||||
w.buf1.PutUvarintStr(k)
|
||||
}
|
||||
w.buf1.PutUvarint64(e.offset)
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write out the length.
|
||||
err := w.writeLengthAndHash(startPos)
|
||||
if err != nil {
|
||||
return fmt.Errorf("label indexes offset table length/crc32 write error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writePostingsOffsetTable writes the postings offset table.
|
||||
func (w *Writer) writePostingsOffsetTable() error {
|
||||
// Ensure everything is in the temporary file.
|
||||
@@ -1049,11 +900,6 @@ func (w *Writer) writePostings() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type labelIndexHashEntry struct {
|
||||
keys []string
|
||||
offset uint64
|
||||
}
|
||||
|
||||
func (w *Writer) Close() error {
|
||||
// Even if this fails, we need to close all the files.
|
||||
ensureErr := w.ensureStage(idxStageDone)
|
||||
|
||||
@@ -186,37 +186,6 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
|
||||
// The label indices are no longer used, so test them by hand here.
|
||||
labelValuesOffsets := map[string]uint64{}
|
||||
d := encoding.NewDecbufAt(ir.b, int(ir.toc.LabelIndicesTable), castagnoliTable)
|
||||
cnt := d.Be32()
|
||||
|
||||
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
||||
require.Equal(t, 1, d.Uvarint(), "Unexpected number of keys for label indices table")
|
||||
lbl := d.UvarintStr()
|
||||
off := d.Uvarint64()
|
||||
labelValuesOffsets[lbl] = off
|
||||
cnt--
|
||||
}
|
||||
require.NoError(t, d.Err())
|
||||
|
||||
labelIndices := map[string][]string{}
|
||||
for lbl, off := range labelValuesOffsets {
|
||||
d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable)
|
||||
require.Equal(t, 1, d.Be32int(), "Unexpected number of label indices table names")
|
||||
for i := d.Be32(); i > 0 && d.Err() == nil; i-- {
|
||||
v, err := ir.lookupSymbol(ctx, d.Be32())
|
||||
require.NoError(t, err)
|
||||
labelIndices[lbl] = append(labelIndices[lbl], v)
|
||||
}
|
||||
require.NoError(t, d.Err())
|
||||
}
|
||||
|
||||
require.Equal(t, map[string][]string{
|
||||
"a": {"1"},
|
||||
"b": {"1", "2", "3", "4"},
|
||||
}, labelIndices)
|
||||
|
||||
t.Run("ShardedPostings()", func(t *testing.T) {
|
||||
ir, err := NewFileReader(fn, DecodePostingsRaw)
|
||||
require.NoError(t, err)
|
||||
|
||||
Reference in New Issue
Block a user