Skip to content

Commit

Permalink
feat(compact): log retrieval item data of invalid chunks (#4354)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 29, 2023
1 parent 81e6e04 commit 8f17d2a
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 25 deletions.
2 changes: 1 addition & 1 deletion cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func dbCompactCmd(cmd *cobra.Command) {
}

logger.Warning("Compaction is a destructive process. If the process is stopped for any reason, the localstore may become corrupted.")
logger.Warning("It is highly advised to perform the compaction on a copy of the localtore.")
logger.Warning("It is highly advised to perform the compaction on a copy of the localstore.")
logger.Warning("After compaction finishes, the data directory may be replaced with the compacted version.")
logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cac/cac.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func validateDataLength(dataLength int) error {

// newWithSpan creates a new chunk prepending the given span to the data.
func newWithSpan(data, span []byte) (swarm.Chunk, error) {
hash, err := doHash(data, span)
hash, err := DoHash(data, span)
if err != nil {
return nil, err
}
Expand All @@ -77,12 +77,12 @@ func Valid(c swarm.Chunk) bool {
return false
}

hash, _ := doHash(data[swarm.SpanSize:], data[:swarm.SpanSize])
hash, _ := DoHash(data[swarm.SpanSize:], data[:swarm.SpanSize])

return bytes.Equal(hash, c.Address().Bytes())
}

func doHash(data, span []byte) ([]byte, error) {
func DoHash(data, span []byte) ([]byte, error) {
hasher := bmtpool.Get()
defer bmtpool.Put(hasher)

Expand Down
9 changes: 1 addition & 8 deletions pkg/sharky/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,7 @@ func (r *Recovery) Add(loc Location) error {
func (r *Recovery) Read(ctx context.Context, loc Location, buf []byte) error {
r.mtx.Lock()
defer r.mtx.Unlock()

shFile := r.shardFiles[loc.Shard]
if stat, err := shFile.Stat(); err != nil {
return err
} else if stat.Size() < int64(loc.Slot)*int64(r.datasize) {
return errors.New("slot not found")
}
_, err := shFile.ReadAt(buf, int64(loc.Slot)*int64(r.datasize))
_, err := r.shardFiles[loc.Shard].ReadAt(buf, int64(loc.Slot)*int64(r.datasize))
return err
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/sharky/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sharky
import (
"context"
"encoding/binary"
"fmt"
"io"
)

Expand All @@ -20,6 +21,10 @@ type Location struct {
Length uint16
}

func (l Location) String() string {
return fmt.Sprintf("shard: %d, slot: %d, length: %d", l.Shard, l.Slot, l.Length)
}

// MarshalBinary returns byte representation of location
func (l *Location) MarshalBinary() ([]byte, error) {
b := make([]byte, LocationSize)
Expand Down
55 changes: 42 additions & 13 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,20 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)
start := uint32(0)
end := lastUsedSlot

batch, err := store.Batch(ctx)
if err != nil {
return err
}

for start < end {

if slots[end] == nil {
end-- // walk to the left until a used slot found
if slots[start] != nil {
start++ // walk to the right until a free slot is found
continue
}

if slots[start] != nil {
start++ // walk to the right until a free slot is found
if slots[end] == nil {
end-- // walk to the left until a used slot found
continue
}

Expand All @@ -113,14 +118,18 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)
}

from.Location = to
if err := store.Put(from); err != nil {
if err := batch.Put(from); err != nil {
return fmt.Errorf("store put: %w", err)
}

start++
end--
}

if err := batch.Commit(); err != nil {
return err
}

logger.Info("shard truncated", "shard", fmt.Sprintf("%d/%d", shard, sharkyNoOfShards-1), "slot", end)

if err := sharkyRecover.TruncateAt(context.Background(), uint8(shard), end+1); err != nil {
Expand Down Expand Up @@ -151,18 +160,40 @@ func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recov

iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error {
validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) {
err := sharky.Read(context.Background(), item.Location, buf)
if err != nil {
return err
logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err)
return
}

ch := swarm.NewChunk(item.Address, buf)
if !cac.Valid(ch) && !soc.Valid(ch) {
return errors.New("invalid chunk")
}

return nil
logger.Info("invalid cac/soc chunk ", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0))

h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize])
if err != nil {
logger.Error(err, "cac hash")
return
}

computedAddr := swarm.NewAddress(h)

if !cac.Valid(swarm.NewChunk(computedAddr, buf)) {
logger.Info("computed chunk is also an invalid cac")
return
}

shardedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr}
err = store.Get(&shardedEntry)
if err != nil {
logger.Info("no shared entry found")
return
}

logger.Info("retrieved chunk with shared slot", "shared_address", shardedEntry.Address, "shared_timestamp", time.Unix(int64(shardedEntry.Timestamp), 0))
}
}

var wg sync.WaitGroup
Expand All @@ -173,9 +204,7 @@ func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recov
defer wg.Done()
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if err := validChunk(item, buf[:item.Location.Length]); err != nil {
logger.Info("invalid chunk", "address", item.Address, "error", err)
}
validChunk(item, buf[:item.Location.Length])
}
}()
}
Expand Down

0 comments on commit 8f17d2a

Please sign in to comment.