Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement draft idea how reorg should be restructured #1820

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions clients/feeder/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
"github.com/stretchr/testify/require"
)

var ErrDeprecatedCompiledClass = errors.New("deprecated compiled class")
var (
ErrDeprecatedCompiledClass = errors.New("deprecated compiled class")
ErrBlockNotFound = errors.New("block not found")
)

type Backoff func(wait time.Duration) time.Duration

Expand Down Expand Up @@ -254,7 +257,12 @@
if res.StatusCode == http.StatusOK {
return res.Body, nil
} else {
err = errors.New(res.Status)
if c.isBlockNotFoundError(res) {
err = ErrBlockNotFound
break

Check warning on line 262 in clients/feeder/feeder.go

View check run for this annotation

Codecov / codecov/patch

clients/feeder/feeder.go#L261-L262

Added lines #L261 - L262 were not covered by tests
} else {
err = errors.New(res.Status)
}
}

res.Body.Close()
Expand All @@ -270,9 +278,25 @@
c.log.Debugw("Failed query to feeder, retrying...", "req", req.URL.String(), "retryAfter", wait.String(), "err", err)
}
}

return nil, err
}

func (c *Client) isBlockNotFoundError(res *http.Response) bool {
var starknetError struct {
Code string `json:"code"`
Message string `json:"message"` // not used for now
}

err := json.NewDecoder(res.Body).Decode(&starknetError)
if err != nil {
c.log.Errorw("Failed to read response body", "err", err)
return false
}

return starknetError.Code == "StarknetErrorCode.BLOCK_NOT_FOUND"

Check warning on line 297 in clients/feeder/feeder.go

View check run for this annotation

Codecov / codecov/patch

clients/feeder/feeder.go#L297

Added line #L297 was not covered by tests
}

func (c *Client) StateUpdate(ctx context.Context, blockID string) (*starknet.StateUpdate, error) {
queryURL := c.buildQueryString("get_state_update", map[string]string{
"blockNumber": blockID,
Expand Down
28 changes: 23 additions & 5 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"sync/atomic"
"time"

"github.com/NethermindEth/juno/clients/feeder"

Check failure on line 10 in sync/sync.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)

"github.com/NethermindEth/juno/blockchain"

Check failure on line 12 in sync/sync.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
Expand Down Expand Up @@ -101,16 +103,23 @@
return nil
}

func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers *stream.Stream,
resetStreams context.CancelFunc,
) stream.Callback {
func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers *stream.Stream, resetStreams context.CancelFunc, deepReorg chan struct{}) stream.Callback {

Check failure on line 106 in sync/sync.go

View workflow job for this annotation

GitHub Actions / lint

line is 172 characters (lll)
emptyCallback := func() {}

for {
select {
case <-ctx.Done():
return func() {}
return emptyCallback
default:
stateUpdate, block, err := s.starknetData.StateUpdateWithBlock(ctx, height)
if err != nil {
// in case block not found we should initiate deep reorg
if errors.Is(err, feeder.ErrBlockNotFound) {
resetStreams()
deepReorg <- struct{}{}
return emptyCallback

Check warning on line 120 in sync/sync.go

View check run for this annotation

Codecov / codecov/patch

sync/sync.go#L118-L120

Added lines #L118 - L120 were not covered by tests
}

continue
}

Expand Down Expand Up @@ -266,6 +275,8 @@
pendingSem := make(chan struct{}, 1)
go s.pollPending(syncCtx, pendingSem)

// rough marker that deep reorg in progress, todo replace with smth. else
deepReorg := make(chan struct{}, 1)
for {
select {
case <-streamCtx.Done():
Expand All @@ -278,6 +289,13 @@
pendingSem <- struct{}{}
latestSem <- struct{}{}
return
case <-deepReorg:

Check warning on line 292 in sync/sync.go

View check run for this annotation

Codecov / codecov/patch

sync/sync.go#L292

Added line #L292 was not covered by tests
// 1. fetch current height from feeder by using block=latest
// 2. revert all blocks till the block where it diverged
// 3. mark deep reorg as done
// 4. restart syncing process
// to discuss: do we need to remove "small" reorg in verifyier logic ?
// potential speedup: set catchUpMode before restarting sync
default:
streamCtx, streamCancel = context.WithCancel(syncCtx)
nextHeight = s.nextHeight()
Expand All @@ -288,7 +306,7 @@
curHeight, curStreamCtx, curCancel := nextHeight, streamCtx, streamCancel
fetchers.Go(func() stream.Callback {
fetchTimer := time.Now()
cb := s.fetcherTask(curStreamCtx, curHeight, verifiers, curCancel)
cb := s.fetcherTask(curStreamCtx, curHeight, verifiers, curCancel, deepReorg)
s.listener.OnSyncStepDone(OpFetch, curHeight, time.Since(fetchTimer))
return cb
})
Expand Down
Loading