Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pinned reference integrity check API #4573

Merged
merged 3 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type Storer interface {
storer.Debugger
}

type PinIntegrity interface {
Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat)
}

type Service struct {
auth auth.Authenticator
storer Storer
Expand Down Expand Up @@ -174,7 +178,9 @@ type Service struct {

batchStore postage.Storer
stamperStore storage.Store
syncStatus func() (bool, error)
pinIntegrity PinIntegrity

syncStatus func() (bool, error)

swap swap.Interface
transaction transaction.Service
Expand Down Expand Up @@ -242,6 +248,7 @@ type ExtraOptions struct {
Steward steward.Interface
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity PinIntegrity
}

func New(
Expand Down Expand Up @@ -348,6 +355,8 @@ func (s *Service) Configure(signer crypto.Signer, auth auth.Authenticator, trace
return "", err
}
}

s.pinIntegrity = e.PinIntegrity
}

func (s *Service) SetProbe(probe *Probe) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type testServerOptions struct {
BeeMode api.BeeNodeMode
RedistributionAgent *storageincentives.Agent
NodeStatus *status.Service
PinIntegrity api.PinIntegrity
}

func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string, *chanStorer) {
Expand Down Expand Up @@ -201,6 +202,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
SyncStatus: o.SyncStatus,
Staking: o.StakingContract,
NodeStatus: o.NodeStatus,
PinIntegrity: o.PinIntegrity,
}

// By default bee mode is set to full mode.
Expand Down
62 changes: 62 additions & 0 deletions pkg/api/integritycheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api

import (
"encoding/json"
"net/http"

storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
)

type PinIntegrityResponse struct {
Ref swarm.Address `json:"ref"`
Total int `json:"total"`
Missing int `json:"missing"`
Invalid int `json:"invalid"`
}

func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to unit test this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

logger := s.logger.WithName("get_pin_integrity").Build()

querie := struct {
Ref swarm.Address `map:"ref"`
}{}

if response := s.mapStructure(r.URL.Query(), &querie); response != nil {
response("invalid query params", logger, w)
return
}

out := make(chan storer.PinStat)

go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out)

flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}

w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
flusher.Flush()

enc := json.NewEncoder(w)

for v := range out {
resp := PinIntegrityResponse{
Ref: v.Ref,
Total: v.Total,
Missing: v.Missing,
Invalid: v.Invalid,
}
if err := enc.Encode(resp); err != nil {
break
}
flusher.Flush()
}
}
67 changes: 67 additions & 0 deletions pkg/api/integritycheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api_test

import (
"context"
"net/http"
"testing"

"github.com/ethersphere/bee/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/pkg/log"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemstore"
storer "github.com/ethersphere/bee/pkg/storer"
)

const pinRef = "620fcd78c7ce54da2d1b7cc2274a02e190cbe8fecbc3bd244690ab6517ce8f39"

func TestIntegrityHandler(t *testing.T) {
t.Parallel()

t.Run("ok", func(t *testing.T) {
t.Parallel()
testServer, _, _, _ := newTestServer(t, testServerOptions{
DebugAPI: true,
PinIntegrity: &mockPinIntegrity{
Store: inmemstore.New(),
tester: t,
},
})

endp := "/check/pin?ref=" + pinRef

// When probe is not set health endpoint should indicate that node is not healthy
jsonhttptest.Request(t, testServer, http.MethodGet, endp, http.StatusOK, jsonhttptest.WithExpectedResponse(nil))
})

t.Run("wrong hash format", func(t *testing.T) {
t.Parallel()
testServer, _, _, _ := newTestServer(t, testServerOptions{
DebugAPI: true,
PinIntegrity: &mockPinIntegrity{
Store: inmemstore.New(),
tester: t,
},
})

endp := "/check/pin?ref=0xbadhash"

// When probe is not set health endpoint should indicate that node is not healthy
jsonhttptest.Request(t, testServer, http.MethodGet, endp, http.StatusBadRequest, jsonhttptest.WithExpectedResponse(nil))
})
}

type mockPinIntegrity struct {
tester *testing.T
Store storage.Store
}

func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat) {
if pin != pinRef {
p.tester.Fatal("bad pin", pin)
}
close(out)
}
6 changes: 6 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,4 +599,10 @@ func (s *Service) mountBusinessDebug(restricted bool) {
"GET": http.HandlerFunc(s.rchash),
}),
))

handle("/check/pin", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.pinIntegrityHandler),
}),
))
}
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ func NewBee(
Steward: steward,
SyncStatus: syncStatusFn,
NodeStatus: nodeStatus,
PinIntegrity: localStore.PinIntegrity(),
}

if o.APIAddr != "" {
Expand Down
39 changes: 26 additions & 13 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ func initDiskRepository(
basePath string,
locker storage.ChunkLocker,
opts *Options,
) (storage.Repository, io.Closer, error) {
) (storage.Repository, *PinIntegrity, io.Closer, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of returning so many things from the constructor, the PinIntegrity can returned with a get function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I tried adding a getter to the storage.Repository but the compiler says that now there are 183 places that need changing (to comply with the new interface) so it seems it's no feasible. I'll look into other ways but for now I don't have a better approach.

store, err := initStore(basePath, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
return nil, nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
}

err = migration.Migrate(store, "core-migration", localmigration.BeforeIinitSteps())
if err != nil {
return nil, nil, fmt.Errorf("failed core migration: %w", err)
return nil, nil, nil, fmt.Errorf("failed core migration: %w", err)
}

if opts.LdbStats.Load() != nil {
Expand Down Expand Up @@ -338,13 +338,13 @@ func initDiskRepository(
if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) {
err := os.Mkdir(sharkyBasePath, 0777)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}

recoveryCloser, err := sharkyRecovery(ctx, sharkyBasePath, store, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed to recover sharky: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover sharky: %w", err)
}

sharky, err := sharky.New(
Expand All @@ -353,20 +353,25 @@ func initDiskRepository(
swarm.SocMaxChunkSize,
)
if err != nil {
return nil, nil, fmt.Errorf("failed creating sharky instance: %w", err)
return nil, nil, nil, fmt.Errorf("failed creating sharky instance: %w", err)
}

txStore := leveldbstore.NewTxStore(store)
if err := txStore.Recover(); err != nil {
return nil, nil, fmt.Errorf("failed to recover index store: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover index store: %w", err)
}

txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky)
if err := txChunkStore.Recover(); err != nil {
return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
}

return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky, recoveryCloser), nil
pinIntegrity := &PinIntegrity{
Store: store,
Sharky: sharky,
}

return storage.NewRepository(txStore, txChunkStore, locker), pinIntegrity, closer(store, sharky, recoveryCloser), nil
}

func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) {
Expand Down Expand Up @@ -457,6 +462,8 @@ type DB struct {
setSyncerOnce sync.Once
syncer Syncer
opts workerOpts

pinIntegrity *PinIntegrity
}

type workerOpts struct {
Expand All @@ -468,9 +475,10 @@ type workerOpts struct {
// component stores.
func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
var (
repo storage.Repository
err error
dbCloser io.Closer
repo storage.Repository
err error
dbCloser io.Closer
pinIntegrity *PinIntegrity
)
if opts == nil {
opts = defaultOptions()
Expand All @@ -497,7 +505,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
return nil, err
}
} else {
repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
repo, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,6 +558,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
wakeupDuration: opts.ReserveWakeUpDuration,
},
directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes),
pinIntegrity: pinIntegrity,
}

if db.validStamp == nil {
Expand Down Expand Up @@ -665,6 +674,10 @@ func (db *DB) ChunkStore() storage.ReadOnlyChunkStore {
return db.repo.ChunkStore()
}

func (db *DB) PinIntegrity() *PinIntegrity {
return db.pinIntegrity
}

// Execute implements the internal.TxExecutor interface.
func (db *DB) Execute(ctx context.Context, do func(internal.Storage) error) error {
tx, commit, rollback := db.repo.NewTx(ctx)
Expand Down
Loading
Loading