diff --git a/share/getters/getter_test.go b/share/getters/getter_test.go deleted file mode 100644 index bf00745271..0000000000 --- a/share/getters/getter_test.go +++ /dev/null @@ -1,352 +0,0 @@ -package getters - -import ( - "context" - "os" - "sync" - "testing" - "time" - - "github.com/ipfs/boxo/exchange/offline" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" - dsbadger "github.com/ipfs/go-ds-badger4" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/celestiaorg/celestia-app/pkg/wrapper" - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/header/headertest" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/eds/edstest" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/celestia-node/share/sharetest" -) - -func TestStoreGetter(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - tmpDir := t.TempDir() - storeCfg := eds.DefaultParameters() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - edsStore, err := eds.NewStore(storeCfg, tmpDir, ds) - require.NoError(t, err) - - err = edsStore.Start(ctx) - require.NoError(t, err) - - sg := NewStoreGetter(edsStore) - - t.Run("GetShare", func(t *testing.T) { - randEds, eh := randomEDS(t) - err = edsStore.Put(ctx, eh.DAH.Hash(), randEds) - require.NoError(t, err) - - squareSize := int(randEds.Width()) - for i := 0; i < squareSize; i++ { - for j := 0; j < squareSize; j++ { - share, err := sg.GetShare(ctx, eh, i, j) - require.NoError(t, err) - assert.Equal(t, randEds.GetCell(uint(i), uint(j)), share) - } - } - - // doesn't panic on indexes too high - _, err := sg.GetShare(ctx, eh, squareSize, squareSize) - require.ErrorIs(t, err, share.ErrOutOfBounds) - - // root not found - _, eh = randomEDS(t) - _, err = sg.GetShare(ctx, eh, 0, 0) - require.ErrorIs(t, err, share.ErrNotFound) - }) - - t.Run("GetEDS", func(t *testing.T) { - randEds, eh := randomEDS(t) - err = edsStore.Put(ctx, eh.DAH.Hash(), randEds) - require.NoError(t, err) - - retrievedEDS, err := sg.GetEDS(ctx, eh) - require.NoError(t, err) - assert.True(t, randEds.Equals(retrievedEDS)) - - // root not found - eh.DAH = share.EmptyEDSRoots() - _, err = sg.GetEDS(ctx, eh) - require.ErrorIs(t, err, share.ErrNotFound) - }) - - t.Run("GetSharesByNamespace", func(t *testing.T) { - randEds, namespace, eh := randomEDSWithDoubledNamespace(t, 4) - err = edsStore.Put(ctx, eh.DAH.Hash(), randEds) - require.NoError(t, err) - - shares, err := sg.GetSharesByNamespace(ctx, eh, namespace) - require.NoError(t, err) - require.NoError(t, shares.Verify(eh.DAH, namespace)) - assert.Len(t, shares.Flatten(), 2) - - // namespace not found - randNamespace := sharetest.RandV0Namespace() - emptyShares, err := sg.GetSharesByNamespace(ctx, eh, randNamespace) - require.NoError(t, err) - require.Nil(t, emptyShares.Flatten()) - - // root not found - eh.DAH = share.EmptyEDSRoots() - _, err = sg.GetSharesByNamespace(ctx, eh, namespace) - require.ErrorIs(t, err, share.ErrNotFound) - }) - - t.Run("GetSharesFromNamespace removes corrupted shard", func(t *testing.T) { - randEds, namespace, eh := randomEDSWithDoubledNamespace(t, 4) - err = edsStore.Put(ctx, eh.DAH.Hash(), randEds) - require.NoError(t, err) - - // available - shares, err := sg.GetSharesByNamespace(ctx, eh, namespace) - require.NoError(t, err) - require.NoError(t, shares.Verify(eh.DAH, namespace)) - assert.Len(t, shares.Flatten(), 2) - - // 'corrupt' existing CAR by overwriting with a random EDS - f, err := os.OpenFile(tmpDir+"/blocks/"+eh.DAH.String(), os.O_WRONLY, 0o644) - require.NoError(t, err) - edsToOverwriteWith, eh := randomEDS(t) - err = eds.WriteEDS(ctx, edsToOverwriteWith, f) - require.NoError(t, err) - - shares, err = sg.GetSharesByNamespace(ctx, eh, namespace) - require.ErrorIs(t, err, share.ErrNotFound) - require.Nil(t, shares) - - // corruption detected, shard is removed - // try every 200ms until it passes or the context ends - ticker := time.NewTicker(200 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - t.Fatal("context ended before successful retrieval") - case <-ticker.C: - has, err := edsStore.Has(ctx, eh.DAH.Hash()) - if err != nil { - t.Fatal(err) - } - if !has { - require.NoError(t, err) - return - } - } - } - }) -} - -func TestIPLDGetter(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - storeCfg := eds.DefaultParameters() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - edsStore, err := eds.NewStore(storeCfg, t.TempDir(), ds) - require.NoError(t, err) - - err = edsStore.Start(ctx) - require.NoError(t, err) - - bStore := edsStore.Blockstore() - bserv := ipld.NewBlockservice(bStore, offline.Exchange(edsStore.Blockstore())) - sg := NewIPLDGetter(bserv) - - t.Run("GetShare", func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, time.Second) - t.Cleanup(cancel) - - randEds, eh := randomEDS(t) - err = edsStore.Put(ctx, eh.DAH.Hash(), randEds) - require.NoError(t, err) - - squareSize := int(randEds.Width()) - for i := 0; i < squareSize; i++ { - for j := 0; j < squareSize; j++ { - share, err := sg.GetShare(ctx, eh, i, j) - require.NoError(t, err) - assert.Equal(t, randEds.GetCell(uint(i), uint(j)), share) - } - } - - // doesn't panic on indexes too high - _, err := sg.GetShare(ctx, eh, squareSize+1, squareSize+1) - require.ErrorIs(t, err, share.ErrOutOfBounds) - - // root not found - _, eh = randomEDS(t) - _, err = sg.GetShare(ctx, eh, 0, 0) - require.ErrorIs(t, err, share.ErrNotFound) - }) - - t.Run("GetEDS", func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, time.Second) - t.Cleanup(cancel) - - randEds, eh := randomEDS(t) - err = edsStore.Put(ctx, eh.DAH.Hash(), randEds) - require.NoError(t, err) - - retrievedEDS, err := sg.GetEDS(ctx, eh) - require.NoError(t, err) - assert.True(t, randEds.Equals(retrievedEDS)) - - // Ensure blocks still exist after cleanup - colRoots, _ := retrievedEDS.ColRoots() - has, err := bStore.Has(ctx, ipld.MustCidFromNamespacedSha256(colRoots[0])) - assert.NoError(t, err) - assert.True(t, has) - }) - - t.Run("GetSharesByNamespace", func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, time.Second) - t.Cleanup(cancel) - - randEds, namespace, eh := randomEDSWithDoubledNamespace(t, 4) - err = edsStore.Put(ctx, eh.DAH.Hash(), randEds) - require.NoError(t, err) - - // first check that shares are returned correctly if they exist - shares, err := sg.GetSharesByNamespace(ctx, eh, namespace) - require.NoError(t, err) - require.NoError(t, shares.Verify(eh.DAH, namespace)) - assert.Len(t, shares.Flatten(), 2) - - // namespace not found - randNamespace := sharetest.RandV0Namespace() - emptyShares, err := sg.GetSharesByNamespace(ctx, eh, randNamespace) - require.NoError(t, err) - require.Nil(t, emptyShares.Flatten()) - - // nid doesn't exist in root - eh.DAH = share.EmptyEDSRoots() - emptyShares, err = sg.GetSharesByNamespace(ctx, eh, namespace) - require.NoError(t, err) - require.Empty(t, emptyShares.Flatten()) - }) -} - -// BenchmarkIPLDGetterOverBusyCache benchmarks the performance of the IPLDGetter when the -// cache size of the underlying blockstore is less than the number of blocks being requested in -// parallel. This is to ensure performance doesn't degrade when the cache is being frequently -// evicted. -// BenchmarkIPLDGetterOverBusyCache-10/128 1 12460428417 ns/op (~12s) -func BenchmarkIPLDGetterOverBusyCache(b *testing.B) { - const ( - blocks = 10 - size = 128 - ) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - b.Cleanup(cancel) - - dir := b.TempDir() - ds, err := dsbadger.NewDatastore(dir, &dsbadger.DefaultOptions) - require.NoError(b, err) - - newStore := func(params *eds.Parameters) *eds.Store { - edsStore, err := eds.NewStore(params, dir, ds) - require.NoError(b, err) - err = edsStore.Start(ctx) - require.NoError(b, err) - return edsStore - } - edsStore := newStore(eds.DefaultParameters()) - - // generate EDSs and store them - headers := make([]*header.ExtendedHeader, blocks) - for i := range headers { - eds := edstest.RandEDS(b, size) - roots, err := share.NewAxisRoots(eds) - require.NoError(b, err) - err = edsStore.Put(ctx, roots.Hash(), eds) - require.NoError(b, err) - - eh := headertest.RandExtendedHeader(b) - eh.DAH = roots - - // store cids for read loop later - headers[i] = eh - } - - // restart store to clear cache - require.NoError(b, edsStore.Stop(ctx)) - - // set BlockstoreCacheSize to 1 to force eviction on every read - params := eds.DefaultParameters() - params.BlockstoreCacheSize = 1 - edsStore = newStore(params) - bstore := edsStore.Blockstore() - bserv := ipld.NewBlockservice(bstore, offline.Exchange(bstore)) - - // start client - getter := NewIPLDGetter(bserv) - - // request blocks in parallel - b.ResetTimer() - g := sync.WaitGroup{} - g.Add(blocks) - for _, h := range headers { - h := h - go func() { - defer g.Done() - _, err := getter.GetEDS(ctx, h) - require.NoError(b, err) - }() - } - g.Wait() -} - -func randomEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, *header.ExtendedHeader) { - eds := edstest.RandEDS(t, 4) - roots, err := share.NewAxisRoots(eds) - require.NoError(t, err) - eh := headertest.RandExtendedHeaderWithRoot(t, roots) - return eds, eh -} - -// randomEDSWithDoubledNamespace generates a random EDS and ensures that there are two shares in the -// middle that share a namespace. -// -//nolint:dupword -func randomEDSWithDoubledNamespace( - t *testing.T, - size int, -) (*rsmt2d.ExtendedDataSquare, []byte, *header.ExtendedHeader) { - n := size * size - randShares := sharetest.RandShares(t, n) - idx1 := (n - 1) / 2 - idx2 := n / 2 - - // Make it so that the two shares in two different rows have a common - // namespace. For example if size=4, the original data square looks like - // this: - // _ _ _ _ - // _ _ _ D - // D _ _ _ - // _ _ _ _ - // where the D shares have a common namespace. - copy(share.GetNamespace(randShares[idx2]), share.GetNamespace(randShares[idx1])) - - eds, err := rsmt2d.ComputeExtendedDataSquare( - randShares, - share.DefaultRSMT2DCodec(), - wrapper.NewConstructor(uint64(size)), - ) - require.NoError(t, err, "failure to recompute the extended data square") - roots, err := share.NewAxisRoots(eds) - require.NoError(t, err) - eh := headertest.RandExtendedHeaderWithRoot(t, roots) - - return eds, share.GetNamespace(randShares[idx1]), eh -} diff --git a/share/getters/ipld.go b/share/getters/ipld.go deleted file mode 100644 index e9c930248d..0000000000 --- a/share/getters/ipld.go +++ /dev/null @@ -1,165 +0,0 @@ -package getters - -import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - - "github.com/ipfs/boxo/blockservice" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/libs/utils" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/eds/byzantine" - "github.com/celestiaorg/celestia-node/share/ipld" -) - -var _ share.Getter = (*IPLDGetter)(nil) - -// IPLDGetter is a share.Getter that retrieves shares from the bitswap network. Result caching is -// handled by the provided blockservice. A blockservice session will be created for retrieval if the -// passed context is wrapped with WithSession. -type IPLDGetter struct { - rtrv *eds.Retriever - bServ blockservice.BlockService -} - -// NewIPLDGetter creates a new share.Getter that retrieves shares from the bitswap network. -func NewIPLDGetter(bServ blockservice.BlockService) *IPLDGetter { - return &IPLDGetter{ - rtrv: eds.NewRetriever(bServ), - bServ: bServ, - } -} - -// GetShare gets a single share at the given EDS coordinates from the bitswap network. -func (ig *IPLDGetter) GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (share.Share, error) { - var err error - ctx, span := tracer.Start(ctx, "ipld/get-share", trace.WithAttributes( - attribute.Int("row", row), - attribute.Int("col", col), - )) - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - dah := header.DAH - upperBound := len(dah.RowRoots) - if row >= upperBound || col >= upperBound { - err := share.ErrOutOfBounds - span.RecordError(err) - return nil, err - } - root, leaf := ipld.Translate(dah, row, col) - - // wrap the blockservice in a session if it has been signaled in the context. - blockGetter := getGetter(ctx, ig.bServ) - s, err := ipld.GetShare(ctx, blockGetter, root, leaf, len(dah.RowRoots)) - if errors.Is(err, ipld.ErrNodeNotFound) { - // convert error to satisfy getter interface contract - err = share.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("getter/ipld: failed to retrieve share: %w", err) - } - - return s, nil -} - -func (ig *IPLDGetter) GetEDS( - ctx context.Context, - header *header.ExtendedHeader, -) (eds *rsmt2d.ExtendedDataSquare, err error) { - ctx, span := tracer.Start(ctx, "ipld/get-eds") - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - // rtrv.Retrieve calls shares.GetShares until enough shares are retrieved to reconstruct the EDS - eds, err = ig.rtrv.Retrieve(ctx, header.DAH) - if errors.Is(err, ipld.ErrNodeNotFound) { - // convert error to satisfy getter interface contract - err = share.ErrNotFound - } - var errByz *byzantine.ErrByzantine - if errors.As(err, &errByz) { - return nil, err - } - if err != nil { - return nil, fmt.Errorf("getter/ipld: failed to retrieve eds: %w", err) - } - return eds, nil -} - -func (ig *IPLDGetter) GetSharesByNamespace( - ctx context.Context, - header *header.ExtendedHeader, - namespace share.Namespace, -) (shares share.NamespacedShares, err error) { - ctx, span := tracer.Start(ctx, "ipld/get-shares-by-namespace", trace.WithAttributes( - attribute.String("namespace", namespace.String()), - )) - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - if err = namespace.ValidateForData(); err != nil { - return nil, err - } - - // wrap the blockservice in a session if it has been signaled in the context. - blockGetter := getGetter(ctx, ig.bServ) - shares, err = eds.CollectSharesByNamespace(ctx, blockGetter, header.DAH, namespace) - if errors.Is(err, ipld.ErrNodeNotFound) { - // convert error to satisfy getter interface contract - err = share.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("getter/ipld: failed to retrieve shares by namespace: %w", err) - } - return shares, nil -} - -var sessionKey = &session{} - -// session is a struct that can optionally be passed by context to the share.Getter methods using -// WithSession to indicate that a blockservice session should be created. -type session struct { - sync.Mutex - atomic.Pointer[blockservice.Session] - ctx context.Context -} - -// WithSession stores an empty session in the context, indicating that a blockservice session should -// be created. -func WithSession(ctx context.Context) context.Context { - return context.WithValue(ctx, sessionKey, &session{ctx: ctx}) -} - -func getGetter(ctx context.Context, service blockservice.BlockService) blockservice.BlockGetter { - s, ok := ctx.Value(sessionKey).(*session) - if !ok { - return service - } - - val := s.Load() - if val != nil { - return val - } - - s.Lock() - defer s.Unlock() - val = s.Load() - if val == nil { - val = blockservice.NewSession(s.ctx, service) - s.Store(val) - } - return val -} diff --git a/share/getters/store.go b/share/getters/store.go deleted file mode 100644 index d66a057c56..0000000000 --- a/share/getters/store.go +++ /dev/null @@ -1,122 +0,0 @@ -package getters - -import ( - "context" - "errors" - "fmt" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/libs/utils" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/ipld" -) - -var _ share.Getter = (*StoreGetter)(nil) - -// StoreGetter is a share.Getter that retrieves shares from an eds.Store. No results are saved to -// the eds.Store after retrieval. -type StoreGetter struct { - store *eds.Store -} - -// NewStoreGetter creates a new share.Getter that retrieves shares from an eds.Store. -func NewStoreGetter(store *eds.Store) *StoreGetter { - return &StoreGetter{ - store: store, - } -} - -// GetShare gets a single share at the given EDS coordinates from the eds.Store through the -// corresponding CAR-level blockstore. -func (sg *StoreGetter) GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (share.Share, error) { - dah := header.DAH - var err error - ctx, span := tracer.Start(ctx, "store/get-share", trace.WithAttributes( - attribute.Int("row", row), - attribute.Int("col", col), - )) - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - upperBound := len(dah.RowRoots) - if row >= upperBound || col >= upperBound { - err := share.ErrOutOfBounds - span.RecordError(err) - return nil, err - } - root, leaf := ipld.Translate(dah, row, col) - bs, err := sg.store.CARBlockstore(ctx, dah.Hash()) - if errors.Is(err, eds.ErrNotFound) { - // convert error to satisfy getter interface contract - err = share.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("getter/store: failed to retrieve blockstore: %w", err) - } - defer func() { - if err := bs.Close(); err != nil { - log.Warnw("closing blockstore", "err", err) - } - }() - - // wrap the read-only CAR blockstore in a getter - blockGetter := eds.NewBlockGetter(bs) - s, err := ipld.GetShare(ctx, blockGetter, root, leaf, len(dah.RowRoots)) - if errors.Is(err, ipld.ErrNodeNotFound) { - // convert error to satisfy getter interface contract - err = share.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("getter/store: failed to retrieve share: %w", err) - } - - return s, nil -} - -// GetEDS gets the EDS identified by the given root from the EDS store. -func (sg *StoreGetter) GetEDS( - ctx context.Context, header *header.ExtendedHeader, -) (data *rsmt2d.ExtendedDataSquare, err error) { - ctx, span := tracer.Start(ctx, "store/get-eds") - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - data, err = sg.store.Get(ctx, header.DAH.Hash()) - if errors.Is(err, eds.ErrNotFound) { - // convert error to satisfy getter interface contract - err = share.ErrNotFound - } - if err != nil { - return nil, fmt.Errorf("getter/store: failed to retrieve eds: %w", err) - } - return data, nil -} - -// GetSharesByNamespace gets all EDS shares in the given namespace from the EDS store through the -// corresponding CAR-level blockstore. -func (sg *StoreGetter) GetSharesByNamespace( - ctx context.Context, - header *header.ExtendedHeader, - namespace share.Namespace, -) (shares share.NamespacedShares, err error) { - ctx, span := tracer.Start(ctx, "store/get-shares-by-namespace", trace.WithAttributes( - attribute.String("namespace", namespace.String()), - )) - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - ns, err := eds.RetrieveNamespaceFromStore(ctx, sg.store, header.DAH, namespace) - if err != nil { - return nil, fmt.Errorf("getter/store: %w", err) - } - return ns, nil -} diff --git a/share/shwap/getter.go b/share/shwap/getter.go index 370a2d696a..a43d99f8f2 100644 --- a/share/shwap/getter.go +++ b/share/shwap/getter.go @@ -25,7 +25,7 @@ var ( // Getter interface provides a set of accessors for shares by the Root. // Automatically verifies integrity of shares(exceptions possible depending on the implementation). // -//go:generate mockgen -destination=mocks/getter.go -package=mocks . Getter +//go:generate mockgen -destination=getters/mock/getter.go -package=mock . Getter type Getter interface { // GetShare gets a Share by coordinates in EDS. GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (share.Share, error) diff --git a/share/getters/cascade.go b/share/shwap/getters/cascade.go similarity index 82% rename from share/getters/cascade.go rename to share/shwap/getters/cascade.go index 574b70153c..ac1733ed34 100644 --- a/share/getters/cascade.go +++ b/share/shwap/getters/cascade.go @@ -15,6 +15,7 @@ import ( "github.com/celestiaorg/celestia-node/libs/utils" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/byzantine" + "github.com/celestiaorg/celestia-node/share/shwap" ) var ( @@ -22,24 +23,24 @@ var ( log = logging.Logger("share/getters") ) -var _ share.Getter = (*CascadeGetter)(nil) +var _ shwap.Getter = (*CascadeGetter)(nil) -// CascadeGetter implements custom share.Getter that composes multiple Getter implementations in +// CascadeGetter implements custom shwap.Getter that composes multiple Getter implementations in // "cascading" order. // // See cascade func for details on cascading. type CascadeGetter struct { - getters []share.Getter + getters []shwap.Getter } -// NewCascadeGetter instantiates a new CascadeGetter from given share.Getters with given interval. -func NewCascadeGetter(getters []share.Getter) *CascadeGetter { +// NewCascadeGetter instantiates a new CascadeGetter from given shwap.Getters with given interval. +func NewCascadeGetter(getters []shwap.Getter) *CascadeGetter { return &CascadeGetter{ getters: getters, } } -// GetShare gets a share from any of registered share.Getters in cascading order. +// GetShare gets a share from any of registered shwap.Getters in cascading order. func (cg *CascadeGetter) GetShare( ctx context.Context, header *header.ExtendedHeader, row, col int, ) (share.Share, error) { @@ -55,40 +56,40 @@ func (cg *CascadeGetter) GetShare( span.RecordError(err) return nil, err } - get := func(ctx context.Context, get share.Getter) (share.Share, error) { + get := func(ctx context.Context, get shwap.Getter) (share.Share, error) { return get.GetShare(ctx, header, row, col) } return cascadeGetters(ctx, cg.getters, get) } -// GetEDS gets a full EDS from any of registered share.Getters in cascading order. +// GetEDS gets a full EDS from any of registered shwap.Getters in cascading order. func (cg *CascadeGetter) GetEDS( ctx context.Context, header *header.ExtendedHeader, ) (*rsmt2d.ExtendedDataSquare, error) { ctx, span := tracer.Start(ctx, "cascade/get-eds") defer span.End() - get := func(ctx context.Context, get share.Getter) (*rsmt2d.ExtendedDataSquare, error) { + get := func(ctx context.Context, get shwap.Getter) (*rsmt2d.ExtendedDataSquare, error) { return get.GetEDS(ctx, header) } return cascadeGetters(ctx, cg.getters, get) } -// GetSharesByNamespace gets NamespacedShares from any of registered share.Getters in cascading +// GetSharesByNamespace gets NamespacedShares from any of registered shwap.Getters in cascading // order. func (cg *CascadeGetter) GetSharesByNamespace( ctx context.Context, header *header.ExtendedHeader, namespace share.Namespace, -) (share.NamespacedShares, error) { +) (shwap.NamespaceData, error) { ctx, span := tracer.Start(ctx, "cascade/get-shares-by-namespace", trace.WithAttributes( attribute.String("namespace", namespace.String()), )) defer span.End() - get := func(ctx context.Context, get share.Getter) (share.NamespacedShares, error) { + get := func(ctx context.Context, get shwap.Getter) (shwap.NamespaceData, error) { return get.GetSharesByNamespace(ctx, header, namespace) } @@ -105,8 +106,8 @@ func (cg *CascadeGetter) GetSharesByNamespace( // NOTE: New source attempts after interval do suspend running sources in progress. func cascadeGetters[V any]( ctx context.Context, - getters []share.Getter, - get func(context.Context, share.Getter) (V, error), + getters []shwap.Getter, + get func(context.Context, shwap.Getter) (V, error), ) (V, error) { var ( zero V @@ -139,7 +140,7 @@ func cascadeGetters[V any]( return val, nil } - if errors.Is(getErr, ErrOperationNotSupported) { + if errors.Is(getErr, shwap.ErrOperationNotSupported) { continue } diff --git a/share/getters/cascade_test.go b/share/shwap/getters/cascade_test.go similarity index 77% rename from share/getters/cascade_test.go rename to share/shwap/getters/cascade_test.go index d2b44883a1..75017c621e 100644 --- a/share/getters/cascade_test.go +++ b/share/shwap/getters/cascade_test.go @@ -12,8 +12,8 @@ import ( "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/mocks" + "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/getters/mock" ) func TestCascadeGetter(t *testing.T) { @@ -22,7 +22,7 @@ func TestCascadeGetter(t *testing.T) { const gettersN = 3 headers := make([]*header.ExtendedHeader, gettersN) - getters := make([]share.Getter, gettersN) + getters := make([]shwap.Getter, gettersN) for i := range headers { getters[i], headers[i] = TestGetter(t) } @@ -50,10 +50,10 @@ func TestCascade(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - timeoutGetter := mocks.NewMockGetter(ctrl) - immediateFailGetter := mocks.NewMockGetter(ctrl) - successGetter := mocks.NewMockGetter(ctrl) - ctxGetter := mocks.NewMockGetter(ctrl) + timeoutGetter := mock.NewMockGetter(ctrl) + immediateFailGetter := mock.NewMockGetter(ctrl) + successGetter := mock.NewMockGetter(ctrl) + ctxGetter := mock.NewMockGetter(ctrl) timeoutGetter.EXPECT().GetEDS(gomock.Any(), gomock.Any()). DoAndReturn(func(ctx context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { return nil, context.DeadlineExceeded @@ -67,36 +67,36 @@ func TestCascade(t *testing.T) { return nil, ctx.Err() }).AnyTimes() - get := func(ctx context.Context, get share.Getter) (*rsmt2d.ExtendedDataSquare, error) { + get := func(ctx context.Context, get shwap.Getter) (*rsmt2d.ExtendedDataSquare, error) { return get.GetEDS(ctx, nil) } t.Run("SuccessFirst", func(t *testing.T) { - getters := []share.Getter{successGetter, timeoutGetter, immediateFailGetter} + getters := []shwap.Getter{successGetter, timeoutGetter, immediateFailGetter} _, err := cascadeGetters(ctx, getters, get) assert.NoError(t, err) }) t.Run("SuccessSecond", func(t *testing.T) { - getters := []share.Getter{immediateFailGetter, successGetter} + getters := []shwap.Getter{immediateFailGetter, successGetter} _, err := cascadeGetters(ctx, getters, get) assert.NoError(t, err) }) t.Run("SuccessSecondAfterFirst", func(t *testing.T) { - getters := []share.Getter{timeoutGetter, successGetter} + getters := []shwap.Getter{timeoutGetter, successGetter} _, err := cascadeGetters(ctx, getters, get) assert.NoError(t, err) }) t.Run("SuccessAfterMultipleTimeouts", func(t *testing.T) { - getters := []share.Getter{timeoutGetter, immediateFailGetter, timeoutGetter, timeoutGetter, successGetter} + getters := []shwap.Getter{timeoutGetter, immediateFailGetter, timeoutGetter, timeoutGetter, successGetter} _, err := cascadeGetters(ctx, getters, get) assert.NoError(t, err) }) t.Run("Error", func(t *testing.T) { - getters := []share.Getter{immediateFailGetter, timeoutGetter, immediateFailGetter} + getters := []shwap.Getter{immediateFailGetter, timeoutGetter, immediateFailGetter} _, err := cascadeGetters(ctx, getters, get) assert.Error(t, err) assert.Equal(t, strings.Count(err.Error(), "\n"), 2) @@ -105,14 +105,14 @@ func TestCascade(t *testing.T) { t.Run("Context Canceled", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) cancel() - getters := []share.Getter{ctxGetter, ctxGetter, ctxGetter} + getters := []shwap.Getter{ctxGetter, ctxGetter, ctxGetter} _, err := cascadeGetters(ctx, getters, get) assert.Error(t, err) assert.Equal(t, strings.Count(err.Error(), "\n"), 0) }) t.Run("Single", func(t *testing.T) { - getters := []share.Getter{successGetter} + getters := []shwap.Getter{successGetter} _, err := cascadeGetters(ctx, getters, get) assert.NoError(t, err) }) diff --git a/share/shwap/getters/mock/getter.go b/share/shwap/getters/mock/getter.go new file mode 100644 index 0000000000..64e04a5132 --- /dev/null +++ b/share/shwap/getters/mock/getter.go @@ -0,0 +1,84 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/celestiaorg/celestia-node/share/shwap (interfaces: Getter) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + header "github.com/celestiaorg/celestia-node/header" + share "github.com/celestiaorg/celestia-node/share" + shwap "github.com/celestiaorg/celestia-node/share/shwap" + rsmt2d "github.com/celestiaorg/rsmt2d" + gomock "github.com/golang/mock/gomock" +) + +// MockGetter is a mock of Getter interface. +type MockGetter struct { + ctrl *gomock.Controller + recorder *MockGetterMockRecorder +} + +// MockGetterMockRecorder is the mock recorder for MockGetter. +type MockGetterMockRecorder struct { + mock *MockGetter +} + +// NewMockGetter creates a new mock instance. +func NewMockGetter(ctrl *gomock.Controller) *MockGetter { + mock := &MockGetter{ctrl: ctrl} + mock.recorder = &MockGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockGetter) EXPECT() *MockGetterMockRecorder { + return m.recorder +} + +// GetEDS mocks base method. +func (m *MockGetter) GetEDS(arg0 context.Context, arg1 *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetEDS", arg0, arg1) + ret0, _ := ret[0].(*rsmt2d.ExtendedDataSquare) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetEDS indicates an expected call of GetEDS. +func (mr *MockGetterMockRecorder) GetEDS(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEDS", reflect.TypeOf((*MockGetter)(nil).GetEDS), arg0, arg1) +} + +// GetShare mocks base method. +func (m *MockGetter) GetShare(arg0 context.Context, arg1 *header.ExtendedHeader, arg2, arg3 int) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetShare", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetShare indicates an expected call of GetShare. +func (mr *MockGetterMockRecorder) GetShare(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShare", reflect.TypeOf((*MockGetter)(nil).GetShare), arg0, arg1, arg2, arg3) +} + +// GetSharesByNamespace mocks base method. +func (m *MockGetter) GetSharesByNamespace(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 share.Namespace) (shwap.NamespaceData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSharesByNamespace", arg0, arg1, arg2) + ret0, _ := ret[0].(shwap.NamespaceData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSharesByNamespace indicates an expected call of GetSharesByNamespace. +func (mr *MockGetterMockRecorder) GetSharesByNamespace(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSharesByNamespace", reflect.TypeOf((*MockGetter)(nil).GetSharesByNamespace), arg0, arg1, arg2) +} diff --git a/share/getters/testing.go b/share/shwap/getters/testing.go similarity index 93% rename from share/getters/testing.go rename to share/shwap/getters/testing.go index d91e978987..978a01fb2c 100644 --- a/share/getters/testing.go +++ b/share/shwap/getters/testing.go @@ -14,10 +14,11 @@ import ( "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds/edstest" + "github.com/celestiaorg/celestia-node/share/shwap" ) // TestGetter provides a testing SingleEDSGetter and the root of the EDS it holds. -func TestGetter(t *testing.T) (share.Getter, *header.ExtendedHeader) { +func TestGetter(t *testing.T) (shwap.Getter, *header.ExtendedHeader) { eds := edstest.RandEDS(t, 8) roots, err := share.NewAxisRoots(eds) eh := headertest.RandExtendedHeaderWithRoot(t, roots) @@ -60,7 +61,7 @@ func (seg *SingleEDSGetter) GetEDS( // GetSharesByNamespace returns NamespacedShares from a kept EDS if the correct root is given. func (seg *SingleEDSGetter) GetSharesByNamespace(context.Context, *header.ExtendedHeader, share.Namespace, -) (share.NamespacedShares, error) { +) (shwap.NamespaceData, error) { panic("SingleEDSGetter: GetSharesByNamespace is not implemented") }