Skip to content

Commit

Permalink
repr agg.iis and others as array rather than map (#13458)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeepdino008 authored Jan 28, 2025
1 parent b08c69c commit c66f15e
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 97 deletions.
71 changes: 35 additions & 36 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
type Aggregator struct {
db kv.RoDB
d [kv.DomainLen]*Domain
iis map[kv.InvertedIdx]*InvertedIndex
iis []*InvertedIndex
dirs datadir.Dirs
tmpdir string
aggregationStep uint64
Expand Down Expand Up @@ -141,7 +141,6 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
logger: logger,
collateAndBuildWorkers: 1,
mergeWorkers: 1,
iis: make(map[kv.InvertedIdx]*InvertedIndex),

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

Expand Down Expand Up @@ -215,17 +214,18 @@ func (a *Aggregator) registerII(idx kv.InvertedIdx, salt *uint32, dirs datadir.D
keysTable: indexKeysTable,
valuesTable: indexTable,
compression: seg.CompressNone,
name: idx,
}

if _, ok := a.iis[idx]; ok {
if ii := a.searchII(idx); ii != nil {
return fmt.Errorf("inverted index %s already registered", idx)
}
var err error

a.iis[idx], err = NewInvertedIndex(idxCfg, logger)
ii, err := NewInvertedIndex(idxCfg, logger)
if err != nil {
return err
}
a.iis = append(a.iis, ii)
return nil
}

Expand Down Expand Up @@ -493,13 +493,7 @@ func (c AggV3Collation) Close() {

type AggV3StaticFiles struct {
d [kv.DomainLen]StaticFiles
ivfs map[kv.InvertedIdx]InvertedFiles
}

func NewAggV3StaticFiles() *AggV3StaticFiles {
return &AggV3StaticFiles{
ivfs: make(map[kv.InvertedIdx]InvertedFiles),
}
ivfs []InvertedFiles
}

// CleanupOnError - call it on collation fail. It's closing all files
Expand All @@ -521,7 +515,7 @@ func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error {
txTo = a.FirstTxNumOfStep(step + 1)
stepStartedAt = time.Now()

static = NewAggV3StaticFiles()
static = &AggV3StaticFiles{ivfs: make([]InvertedFiles, len(a.iis))}
closeCollations = true
collListMu = sync.Mutex{}
collations = make([]Collation, 0)
Expand Down Expand Up @@ -762,9 +756,21 @@ func (a *Aggregator) DomainTables(domains ...kv.Domain) (tables []string) {
}
func (a *Aggregator) InvertedIndexTables(indices ...kv.InvertedIdx) (tables []string) {
for _, idx := range indices {
tables = append(tables, a.iis[idx].Tables()...)
if ii := a.searchII(idx); ii != nil {
tables = append(tables, ii.Tables()...)
}
}
return tables

return
}

func (a *Aggregator) searchII(name kv.InvertedIdx) *InvertedIndex {
for _, ii := range a.iis {
if ii.name == name {
return ii
}
}
return nil
}

type flusher interface {
Expand Down Expand Up @@ -1071,15 +1077,15 @@ func (ac *AggregatorRoTx) Prune(ctx context.Context, tx kv.RwTx, limit uint64, l
}
}

stats := make(map[kv.InvertedIdx]*InvertedIndexPruneStat, len(ac.a.iis))
stats := make([]*InvertedIndexPruneStat, len(ac.a.iis))
for iikey := range ac.a.iis {
stat, err := ac.iis[iikey].Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
stats[iikey] = stat
}
for iikey, _ := range ac.a.iis {
for iikey := range ac.a.iis {
aggStat.Indices[ac.iis[iikey].ii.filenameBase] = stats[iikey]
}

Expand Down Expand Up @@ -1230,18 +1236,12 @@ func (a *Aggregator) recalcVisibleFilesMinimaxTxNum() {

type RangesV3 struct {
domain [kv.DomainLen]DomainRanges
invertedIndex map[kv.InvertedIdx]*MergeRange
}

func NewRangesV3() *RangesV3 {
return &RangesV3{
invertedIndex: make(map[kv.InvertedIdx]*MergeRange),
}
invertedIndex []*MergeRange
}

func (r RangesV3) String() string {
ss := []string{}
for _, d := range r.domain {
for _, d := range &r.domain {
if d.any() {
ss = append(ss, fmt.Sprintf("%s(%s)", d.name, d.String()))
}
Expand All @@ -1250,14 +1250,14 @@ func (r RangesV3) String() string {
aggStep := r.domain[kv.AccountsDomain].aggStep
for p, mr := range r.invertedIndex {
if mr != nil && mr.needMerge {
ss = append(ss, mr.String(string(p), aggStep))
ss = append(ss, mr.String(fmt.Sprintf("idx%d", p), aggStep))
}
}
return strings.Join(ss, ", ")
}

func (r RangesV3) any() bool {
for _, d := range r.domain {
for _, d := range &r.domain {
if d.any() {
return true
}
Expand All @@ -1271,7 +1271,7 @@ func (r RangesV3) any() bool {
}

func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3 {
r := NewRangesV3()
r := &RangesV3{invertedIndex: make([]*MergeRange, len(ac.a.iis))}
if ac.a.commitmentValuesTransform {
lmrAcc := ac.d[kv.AccountsDomain].files.LatestMergedRange()
lmrSto := ac.d[kv.StorageDomain].files.LatestMergedRange()
Expand All @@ -1292,7 +1292,7 @@ func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3
cr := r.domain[kv.CommitmentDomain]

restorePrevRange := false
for k, dr := range r.domain {
for k, dr := range &r.domain {
kd := kv.Domain(k)
if kd == kv.CommitmentDomain || cr.values.Equal(&dr.values) {
continue
Expand All @@ -1311,7 +1311,7 @@ func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3
}
}
if restorePrevRange {
for k, dr := range r.domain {
for k, dr := range &r.domain {
r.domain[k].values = MergeRange{}
ac.a.logger.Debug("findMergeRange: commitment range is different than accounts or storage, cancel kv merge",
ac.d[k].d.filenameBase, dr.values.String("", ac.a.StepSize()))
Expand All @@ -1333,7 +1333,7 @@ func (ac *AggregatorRoTx) RestrictSubsetFileDeletions(b bool) {
}

func (ac *AggregatorRoTx) mergeFiles(ctx context.Context, files *SelectedStaticFilesV3, r *RangesV3) (*MergedFilesV3, error) {
mf := NewMergedFilesV3()
mf := &MergedFilesV3{iis: make([]*filesItem, len(ac.a.iis))}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(ac.a.mergeWorkers)
closeFiles := true
Expand Down Expand Up @@ -1570,10 +1570,9 @@ func (ac *AggregatorRoTx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs
return ac.d[kv.ReceiptDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
default:
// check the ii
if v, ok := ac.iis[name]; ok {
return v.IdxRange(k, fromTs, toTs, asc, limit, tx)
if ii := ac.searchII(name); ii != nil {
return ii.IdxRange(k, fromTs, toTs, asc, limit, tx)
}

return nil, fmt.Errorf("unexpected history name: %s", name)
}
}
Expand Down Expand Up @@ -1622,7 +1621,7 @@ func (ac *AggregatorRoTx) nastyFileRead(name kv.Domain, from, to uint64) (*seg.R
type AggregatorRoTx struct {
a *Aggregator
d [kv.DomainLen]*DomainRoTx
iis map[kv.InvertedIdx]*InvertedIndexRoTx
iis []*InvertedIndexRoTx

id uint64 // auto-increment id of ctx for logs
_leakID uint64 // set only if TRACE_AGG=true
Expand All @@ -1633,7 +1632,7 @@ func (a *Aggregator) BeginFilesRo() *AggregatorRoTx {
a: a,
id: a.aggRoTxAutoIncrement.Add(1),
_leakID: a.leakDetector.Add(),
iis: make(map[kv.InvertedIdx]*InvertedIndexRoTx, len(a.iis)),
iis: make([]*InvertedIndexRoTx, len(a.iis)),
}

a.visibleFilesLock.RLock()
Expand Down
23 changes: 5 additions & 18 deletions erigon-lib/state/aggregator_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ type SelectedStaticFilesV3 struct {
d [kv.DomainLen][]*filesItem
dHist [kv.DomainLen][]*filesItem
dIdx [kv.DomainLen][]*filesItem
ii map[kv.InvertedIdx][]*filesItem
}

func NewSelectedStaticFilesV3() *SelectedStaticFilesV3 {
return &SelectedStaticFilesV3{ii: make(map[kv.InvertedIdx][]*filesItem)}
ii [][]*filesItem
}

func (sf SelectedStaticFilesV3) Close() {
Expand All @@ -37,9 +33,7 @@ func (sf SelectedStaticFilesV3) Close() {
clist = append(clist, sf.d[id], sf.dIdx[id], sf.dHist[id])
}

for _, i := range sf.ii {
clist = append(clist, i)
}
clist = append(clist, sf.ii...)
for _, group := range clist {
for _, item := range group {
if item != nil {
Expand All @@ -55,7 +49,7 @@ func (sf SelectedStaticFilesV3) Close() {
}

func (ac *AggregatorRoTx) staticFilesInRange(r *RangesV3) (*SelectedStaticFilesV3, error) {
sf := NewSelectedStaticFilesV3()
sf := &SelectedStaticFilesV3{ii: make([][]*filesItem, len(r.invertedIndex))}
for id := range ac.d {
if !r.domain[id].any() {
continue
Expand All @@ -75,11 +69,7 @@ type MergedFilesV3 struct {
d [kv.DomainLen]*filesItem
dHist [kv.DomainLen]*filesItem
dIdx [kv.DomainLen]*filesItem
iis map[kv.InvertedIdx]*filesItem
}

func NewMergedFilesV3() *MergedFilesV3 {
return &MergedFilesV3{iis: make(map[kv.InvertedIdx]*filesItem)}
iis []*filesItem
}

func (mf MergedFilesV3) FrozenList() (frozen []string) {
Expand Down Expand Up @@ -109,10 +99,7 @@ func (mf MergedFilesV3) Close() {
for id := range mf.d {
clist = append(clist, mf.d[id], mf.dHist[id], mf.dIdx[id])
}
for _, ii := range mf.iis {
clist = append(clist, ii)
}

clist = append(clist, mf.iis...)
for _, item := range clist {
if item != nil {
if item.decompressor != nil {
Expand Down
14 changes: 8 additions & 6 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type SharedDomains struct {
storage *btree2.Map[string, dataWithPrevStep]

domainWriters [kv.DomainLen]*domainBufferedWriter
iiWriters map[kv.InvertedIdx]*invertedIndexBufferedWriter
iiWriters []*invertedIndexBufferedWriter

currentChangesAccumulator *StateChangeSet
pastChangesAccumulator map[string]*StateChangeSet
Expand All @@ -115,12 +115,12 @@ type HasAgg interface {
func NewSharedDomains(tx kv.Tx, logger log.Logger) (*SharedDomains, error) {

sd := &SharedDomains{
logger: logger,
storage: btree2.NewMap[string, dataWithPrevStep](128),
iiWriters: map[kv.InvertedIdx]*invertedIndexBufferedWriter{},
logger: logger,
storage: btree2.NewMap[string, dataWithPrevStep](128),
//trace: true,
}
sd.SetTx(tx)
sd.iiWriters = make([]*invertedIndexBufferedWriter, len(sd.aggTx.iis))

sd.aggTx.a.DiscardHistory(kv.CommitmentDomain)

Expand Down Expand Up @@ -667,8 +667,10 @@ func (sd *SharedDomains) delAccountStorage(addr, loc []byte, preVal []byte, prev
}

func (sd *SharedDomains) IndexAdd(table kv.InvertedIdx, key []byte) (err error) {
if writer, ok := sd.iiWriters[table]; ok {
return writer.Add(key)
for _, writer := range sd.iiWriters {
if writer.name == table {
return writer.Add(key)
}
}
panic(fmt.Errorf("unknown index %s", table))
}
Expand Down
1 change: 1 addition & 0 deletions erigon-lib/state/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,7 @@ func emptyTestDomain(aggStep uint64) *Domain {
cfg.hist.iiCfg.salt = &salt
cfg.hist.iiCfg.dirs = datadir2.New(os.TempDir())
cfg.hist.iiCfg.aggregationStep = aggStep
cfg.hist.iiCfg.name = kv.InvertedIdx("dummy")

d, err := NewDomain(cfg, log.New())
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions erigon-lib/state/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@ func (ac *AggregatorRoTx) IntegrityInvertedIndexAllValuesAreInRange(ctx context.
}
default:
// check the ii
if v, ok := ac.iis[name]; ok {
err := v.IntegrityInvertedIndexAllValuesAreInRange(ctx, failFast, fromStep)
if err != nil {
return err
}
if v := ac.searchII(name); v != nil {
return v.IntegrityInvertedIndexAllValuesAreInRange(ctx, failFast, fromStep)
}
panic(fmt.Sprintf("unexpected: %s", name))
}
Expand Down
8 changes: 7 additions & 1 deletion erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type iiCfg struct {
aggregationStep uint64 // amount of transactions inside single aggregation step
keysTable string // bucket name for index keys; txnNum_u64 -> key (k+auto_increment)
valuesTable string // bucket name for index values; k -> txnNum_u64 , Needs to be table with DupSort
name kv.InvertedIdx

withExistence bool // defines if existence index should be built
compression seg.FileCompression // compression type for inverted index keys and values
Expand Down Expand Up @@ -374,6 +375,7 @@ type invertedIndexBufferedWriter struct {
txNum uint64
aggregationStep uint64
txNumBytes [8]byte
name kv.InvertedIdx
}

// loadFunc - is analog of etl.Identity, but it signaling to etl - use .Put instead of .AppendDup - to allow duplicates
Expand Down Expand Up @@ -448,6 +450,7 @@ func (iit *InvertedIndexRoTx) newWriter(tmpdir string, discard bool) *invertedIn
// etl collector doesn't fsync: means if have enough ram, all files produced by all collectors will be in ram
indexKeys: etl.NewCollector(iit.ii.filenameBase+".flush.ii.keys", tmpdir, etl.NewSortableBuffer(WALCollectorRAM), iit.ii.logger).LogLvl(log.LvlTrace),
index: etl.NewCollector(iit.ii.filenameBase+".flush.ii.vals", tmpdir, etl.NewSortableBuffer(WALCollectorRAM), iit.ii.logger).LogLvl(log.LvlTrace),
name: iit.name,
}
w.indexKeys.SortAndFlushInBackground(true)
w.index.SortAndFlushInBackground(true)
Expand All @@ -465,6 +468,7 @@ func (ii *InvertedIndex) BeginFilesRo() *InvertedIndexRoTx {
ii: ii,
visible: ii._visible,
files: files,
name: ii.name,
}
}
func (iit *InvertedIndexRoTx) Close() {
Expand Down Expand Up @@ -496,6 +500,7 @@ func (iit *InvertedIndexRoTx) Close() {
}

type MergeRange struct {
name string // entity name
needMerge bool
from uint64
to uint64
Expand All @@ -509,7 +514,7 @@ func (mr *MergeRange) String(prefix string, aggStep uint64) string {
if prefix != "" {
prefix += "="
}
return fmt.Sprintf("%s%d-%d", prefix, mr.from/aggStep, mr.to/aggStep)
return fmt.Sprintf("%s%s%d-%d", prefix, mr.name, mr.from/aggStep, mr.to/aggStep)
}

func (mr *MergeRange) Equal(other *MergeRange) bool {
Expand All @@ -518,6 +523,7 @@ func (mr *MergeRange) Equal(other *MergeRange) bool {

type InvertedIndexRoTx struct {
ii *InvertedIndex
name kv.InvertedIdx
files visibleFiles
visible *iiVisible
getters []*seg.Reader
Expand Down
Loading

0 comments on commit c66f15e

Please sign in to comment.