Skip to content

Commit

Permalink
Changed how we export/import the account ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsporn committed May 6, 2024
1 parent 588db18 commit 9f7e9d2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 82 deletions.
2 changes: 1 addition & 1 deletion pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (m *Manager) rollbackAccountTo(accountData *accounts.AccountData, targetSlo
return false, false, ierrors.Wrapf(err, "can't retrieve account, could not load diff for account %s in slot %d", accountData.ID, diffSlot)
}

m.LogDebug("Rolling back account", "accountID", accountData.ID, "slot", diffSlot, "accountData", accountData, "diffChange", diffChange, "destroyed", destroyed)
m.LogDebug("Rolling back account", "accountID", accountData.ID, "targetSlot", targetSlot, "diffSlot", diffSlot, "accountData", accountData, "diffChange", diffChange, "destroyed", destroyed)

// update the account data with the diff
if diffChange.BICChange != 0 {
Expand Down
112 changes: 31 additions & 81 deletions pkg/protocol/engine/accounts/accountsledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"io"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/serializer/v2"
"github.com/iotaledger/hive.go/serializer/v2/stream"
"github.com/iotaledger/iota-core/pkg/model"
Expand All @@ -16,6 +15,10 @@ func (m *Manager) Import(reader io.ReadSeeker) error {
m.mutex.Lock()
defer m.mutex.Unlock()

latestCommittedSlot, err := stream.Read[iotago.SlotIndex](reader)
if err != nil {
return ierrors.Wrap(err, "unable to read latest committed slot")
}
// populate the account tree, account tree should be empty at this point
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint64, func(i int) error {
accountData, err := stream.ReadObjectFromReader(reader, accounts.AccountDataFromReader)
Expand All @@ -34,13 +37,16 @@ func (m *Manager) Import(reader io.ReadSeeker) error {
return ierrors.Wrap(err, "failed to read account data")
}

if err := m.readSlotDiffs(reader); err != nil {
oldestSlot, err := m.readSlotDiffs(reader)
if err != nil {
return ierrors.Wrap(err, "unable to import slot diffs")
}

if err := m.accountsTree.Commit(); err != nil {
return ierrors.Wrap(err, "unable to commit account tree")
m.latestCommittedSlot = latestCommittedSlot
if err := m.Rollback(oldestSlot); err != nil {
return ierrors.Wrapf(err, "unable to rollback to slot %d", oldestSlot)
}
m.latestCommittedSlot = oldestSlot

return nil
}
Expand All @@ -49,8 +55,14 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er
m.mutex.Lock()
defer m.mutex.Unlock()

m.LogDebug("Exporting AccountsLedger", "latestCommittedSlot", m.latestCommittedSlot, "targetIndex", targetIndex)

if err := stream.Write[iotago.SlotIndex](writer, m.latestCommittedSlot); err != nil {
return ierrors.Wrap(err, "unable to write latest committed slot")
}

if err := stream.WriteCollection(writer, serializer.SeriLengthPrefixTypeAsUint64, func() (int, error) {
elements, err := m.exportAccountTree(writer, targetIndex)
elements, err := m.exportAccountTree(writer)
if err != nil {
return 0, ierrors.Wrap(err, "can't write account tree")
}
Expand All @@ -74,94 +86,39 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er
return nil
}

// exportAccountTree exports the AccountTree at a certain target slot, returning the total amount of exported accounts.
func (m *Manager) exportAccountTree(writer io.WriteSeeker, targetIndex iotago.SlotIndex) (int, error) {
// exportAccountTree exports the current AccountTree

Check failure on line 89 in pkg/protocol/engine/accounts/accountsledger/snapshot.go

View workflow job for this annotation

GitHub Actions / GolangCI-Lint

Comment should end in a period (godot)
func (m *Manager) exportAccountTree(writer io.WriteSeeker) (int, error) {
var accountCount int

if err := m.accountsTree.Stream(func(id iotago.AccountID, account *accounts.AccountData) error {
wasCreatedAfterTargetSlot, _, err := m.rollbackAccountTo(account, targetIndex)
if err != nil {
return ierrors.Wrapf(err, "unable to rollback account %s", id)
}

// Account was created after the target slot, so we don't need to export it.
if wasCreatedAfterTargetSlot {
m.LogTrace("account was created after target slot", "id", id, "targetSlot", targetIndex)

return nil
}
m.LogTrace("exportAccountTree", "accountID", id, "account", account)

if err = stream.WriteObject(writer, account, (*accounts.AccountData).Bytes); err != nil {
if err := stream.WriteObject(writer, account, (*accounts.AccountData).Bytes); err != nil {
return ierrors.Wrapf(err, "unable to write account %s", id)
}
accountCount++

m.LogTrace("exported account", "id", id, "account", account)

return nil
}); err != nil {
return 0, ierrors.Wrap(err, "error in streaming account tree")
}

// we might have entries that were destroyed, that are present in diffs but not in the tree from the latestCommittedIndex we streamed above
recreatedAccountsCount, err := m.recreateDestroyedAccounts(writer, targetIndex)

return accountCount + recreatedAccountsCount, err
}

func (m *Manager) recreateDestroyedAccounts(writer io.WriteSeeker, targetSlot iotago.SlotIndex) (int, error) {
var recreatedAccountsCount int
destroyedAccounts := make(map[iotago.AccountID]*accounts.AccountData)

for slot := m.latestCommittedSlot; slot > targetSlot; slot-- {
// it should be impossible that `m.slotDiff(slot)` returns an error, because it is impossible to export a pruned slot
err := lo.PanicOnErr(m.slotDiff(slot)).StreamDestroyed(func(accountID iotago.AccountID) bool {
// actual data will be filled in by rollbackAccountTo
accountData := accounts.NewAccountData(accountID)

destroyedAccounts[accountID] = accountData

return true
})
if err != nil {
return 0, err
}
}

for accountID, accountData := range destroyedAccounts {
m.LogDebug("Exporting recreated destroyed account", "accountID", accountID, "outputID", accountData.OutputID, "credits.value", accountData.Credits.Value, "credits.updateSlot", accountData.Credits.UpdateSlot)

if wasCreatedAfterTargetSlot, wasDestroyed, err := m.rollbackAccountTo(accountData, targetSlot); err != nil {
return 0, ierrors.Wrapf(err, "unable to rollback account %s to target slot %d", accountID, targetSlot)
} else if wasCreatedAfterTargetSlot {
// Account was created after the target slot, so we don't need to export it.
m.LogDebug("Exporting recreated destroyed account was created after target slot", "accountID", accountID, "targetSlot", targetSlot)

continue
} else if !wasDestroyed {
return 0, ierrors.Errorf("account %s was not destroyed", accountID)
}

m.LogDebug("Exporting recreated destroyed account after rollback", "accountID", accountID, "outputID", accountData.OutputID, "credits.value", accountData.Credits.Value, "credits.updateSlot", accountData.Credits.UpdateSlot)

if err := stream.WriteObject(writer, accountData, (*accounts.AccountData).Bytes); err != nil {
return 0, ierrors.Wrapf(err, "unable to write account %s", accountID)
}

recreatedAccountsCount++
}

return recreatedAccountsCount, nil
return accountCount, nil
}

func (m *Manager) readSlotDiffs(reader io.ReadSeeker) error {
func (m *Manager) readSlotDiffs(reader io.ReadSeeker) (iotago.SlotIndex, error) {
oldestSlot := iotago.MaxSlotIndex
// Read all the slots.
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint64, func(i int) error {
slot, err := stream.Read[iotago.SlotIndex](reader)
if err != nil {
return ierrors.Wrapf(err, "unable to read slot index at index %d", i)
}

if slot < oldestSlot {
oldestSlot = slot
}

// Read all the slot diffs within each slot.
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint64, func(j int) error {
diffStore, err := m.slotDiff(slot)
Expand Down Expand Up @@ -201,23 +158,16 @@ func (m *Manager) readSlotDiffs(reader io.ReadSeeker) error {

return nil
}); err != nil {
return ierrors.Wrap(err, "failed to read slot diffs")
return oldestSlot, ierrors.Wrap(err, "failed to read slot diffs")
}

return nil
return oldestSlot, nil
}

func (m *Manager) writeSlotDiffs(writer io.WriteSeeker, targetSlot iotago.SlotIndex) (int, error) {
var slotDiffsCount int

// write slot diffs until being able to reach targetSlot, where the exported tree is at
slot := iotago.SlotIndex(1)
maxCommittableAge := m.apiProvider.APIForSlot(targetSlot).ProtocolParameters().MaxCommittableAge()
if targetSlot > maxCommittableAge {
slot = targetSlot - maxCommittableAge
}

for ; slot <= targetSlot; slot++ {
for slot := m.latestCommittedSlot; slot > targetSlot; slot-- {
var accountsInDiffCount int

if err := stream.Write(writer, slot); err != nil {
Expand Down

0 comments on commit 9f7e9d2

Please sign in to comment.