Skip to content

Commit

Permalink
fix: unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Mar 4, 2024
1 parent 6dbb947 commit 5003854
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 31 deletions.
53 changes: 22 additions & 31 deletions cmd/bee/cmd/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"

"github.com/ethersphere/bee/pkg/file/pipeline/builder"
"github.com/ethersphere/bee/pkg/file/redundancy"
Expand All @@ -21,16 +22,15 @@ import (

// putter is a putter that stores all the split chunk addresses of a file
type putter struct {
c chan swarm.Chunk
cb func(chunk swarm.Chunk) error
}

func (s *putter) Put(ctx context.Context, chunk swarm.Chunk) error {
s.c <- chunk
return nil
func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error {
return s.cb(chunk)
}
func newPutter() *putter {
func newPutter(cb func(ch swarm.Chunk) error) *putter {
return &putter{
c: make(chan swarm.Chunk),
cb: cb,
}
}

Expand Down Expand Up @@ -98,23 +98,19 @@ func splitRefs(cmd *cobra.Command) {
logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "file", outputFileName)

store := newPutter()
var refs []string
store := newPutter(func(ch swarm.Chunk) error {
refs = append(refs, ch.Address().String())
return nil
})
writer, err := os.OpenFile(outputFileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open output file: %w", err)
}
defer writer.Close()

var refs []string
go func() {
for chunk := range store.c {
refs = append(refs, chunk.Address().String())
}
}()

p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(context.Background(), reader)
close(store.c)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}
Expand Down Expand Up @@ -192,28 +188,23 @@ func splitChunks(cmd *cobra.Command) {
logger.Info("splitting", "file", inputFileName, "rLevel", rLevel)
logger.Info("writing output", "dir", outputDir)

store := newPutter()
ctx, cancel := context.WithCancel(context.Background())
var chunksCount int64
go func() {
for chunk := range store.c {
filePath := filepath.Join(outputDir, chunk.Address().String())
err := os.WriteFile(filePath, chunk.Data(), 0644)
if err != nil {
logger.Error(err, "write chunk")
cancel()
}
chunksCount++
var chunksCount atomic.Int64
store := newPutter(func(chunk swarm.Chunk) error {
filePath := filepath.Join(outputDir, chunk.Address().String())
err := os.WriteFile(filePath, chunk.Data(), 0644)
if err != nil {
return err
}
}()
chunksCount.Add(1)
return nil
})

p := requestPipelineFn(store, false, redundancy.Level(rLevel))
rootRef, err := p(ctx, reader)
close(store.c)
rootRef, err := p(context.Background(), reader)
if err != nil {
return fmt.Errorf("pipeline: %w", err)
}
logger.Info("done", "root", rootRef.String(), "chunks", chunksCount)
logger.Info("done", "root", rootRef.String(), "chunks", chunksCount.Load())
return nil
},
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/bee/cmd/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"path"
"path/filepath"
"sync"
"testing"

"github.com/ethersphere/bee/cmd/bee/cmd"
Expand Down Expand Up @@ -108,6 +109,10 @@ func TestDBSplitChunks(t *testing.T) {
if err != nil {
t.Fatal(err)
}

if len(entries) != len(putter.chunks) {
t.Fatal("number of chunks does not match")
}
for _, entry := range entries {
ref := entry.Name()
if _, ok := putter.chunks[ref]; !ok {
Expand Down Expand Up @@ -149,9 +154,12 @@ func compare(path string, chunk swarm.Chunk) (error, bool) {

type putter struct {
chunks map[string]swarm.Chunk
mu sync.Mutex
}

func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error {
s.mu.Lock()
defer s.mu.Unlock()
s.chunks[chunk.Address().String()] = chunk
return nil
}
Expand Down

0 comments on commit 5003854

Please sign in to comment.