From 84b6eaad30b2823b61a141f21247e438ef101ac1 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 19 Sep 2023 12:05:18 +0300 Subject: [PATCH] fix: set overlay in statestore, more puller logs, remove chainsyncer (#4325) --- pkg/chainsync/chainsync.go | 152 ------- pkg/chainsync/chainsync_test.go | 100 ----- pkg/chainsync/main_test.go | 15 - pkg/chainsync/pb/chainsync.pb.go | 493 ----------------------- pkg/chainsync/pb/chainsync.proto | 17 - pkg/chainsync/pb/doc.go | 8 - pkg/node/node.go | 18 +- pkg/node/statestore.go | 5 +- pkg/puller/puller.go | 2 +- pkg/statestore/storeadapter/migration.go | 1 + 10 files changed, 7 insertions(+), 804 deletions(-) delete mode 100644 pkg/chainsync/chainsync.go delete mode 100644 pkg/chainsync/chainsync_test.go delete mode 100644 pkg/chainsync/main_test.go delete mode 100644 pkg/chainsync/pb/chainsync.pb.go delete mode 100644 pkg/chainsync/pb/chainsync.proto delete mode 100644 pkg/chainsync/pb/doc.go diff --git a/pkg/chainsync/chainsync.go b/pkg/chainsync/chainsync.go deleted file mode 100644 index c22abe9a52c..00000000000 --- a/pkg/chainsync/chainsync.go +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2021 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package chainsync provides the implementation -// of the chainsync protocol that verifies peer chain synchronization. -package chainsync - -import ( - "context" - "encoding/binary" - "errors" - "fmt" - "math/big" - "time" - - "github.com/ethersphere/bee/pkg/chainsync/pb" - "github.com/ethersphere/bee/pkg/p2p" - "github.com/ethersphere/bee/pkg/p2p/protobuf" - "github.com/ethersphere/bee/pkg/ratelimit" - "github.com/ethersphere/bee/pkg/swarm" - "github.com/ethersphere/bee/pkg/transaction" - lru "github.com/hashicorp/golang-lru" -) - -const ( - protocolName = "chainsync" - protocolVersion = "1.0.0" - syncStreamName = "prove" - messageTimeout = 1 * time.Minute - limitBurst = 100 - limitRate = time.Minute - blocksToRemember = 1000 -) - -var ( - errRateLimitExceeded = errors.New("rate limit exceeded") -) - -func (s *ChainSync) Protocol() p2p.ProtocolSpec { - return p2p.ProtocolSpec{ - Name: protocolName, - Version: protocolVersion, - StreamSpecs: []p2p.StreamSpec{ - { - Name: syncStreamName, - Handler: s.syncHandler, - }, - }, - } -} - -type ChainSync struct { - streamer p2p.Streamer - ethClient transaction.Backend - inLimiter *ratelimit.Limiter - outLimiter *ratelimit.Limiter - lru *lru.Cache - - quit chan struct{} -} - -func New(s p2p.Streamer, backend transaction.Backend) (*ChainSync, error) { - lruCache, err := lru.New(blocksToRemember) - if err != nil { - return nil, err - } - c := &ChainSync{ - streamer: s, - ethClient: backend, - inLimiter: ratelimit.New(limitRate, limitBurst), - outLimiter: ratelimit.New(limitRate, limitBurst), - lru: lruCache, - - quit: make(chan struct{}), - } - return c, nil -} - -// Prove asks a peer to prove they know a certain block height on the -// current used eth backend. -func (s *ChainSync) Prove(ctx context.Context, peer swarm.Address, blockheight uint64) ([]byte, error) { - if !s.outLimiter.Allow(peer.ByteString(), 1) { - return nil, errRateLimitExceeded - } - stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, syncStreamName) - if err != nil { - return nil, fmt.Errorf("new stream: %w", err) - } - defer stream.Close() - - ctx, cancel := context.WithTimeout(ctx, messageTimeout) - defer cancel() - - w, r := protobuf.NewWriterAndReader(stream) - - intBuffer := make([]byte, 8) - n := binary.PutUvarint(intBuffer, blockheight) - - var desc = pb.Describe{BlockHeight: intBuffer[:n]} - if err := w.WriteMsgWithContext(ctx, &desc); err != nil { - return nil, fmt.Errorf("write describe message: %w", err) - } - - var proof pb.Proof - if err := r.ReadMsgWithContext(ctx, &proof); err != nil { - return nil, fmt.Errorf("read proof message: %w", err) - } - return proof.BlockHash, nil -} - -func (s *ChainSync) syncHandler(ctx context.Context, peer p2p.Peer, stream p2p.Stream) (err error) { - defer func() { - if err != nil { - _ = stream.Reset() - } else { - _ = stream.Close() - } - }() - if !s.inLimiter.Allow(peer.Address.ByteString(), 1) { - return errRateLimitExceeded - } - - w, r := protobuf.NewWriterAndReader(stream) - ctx, cancel := context.WithTimeout(ctx, messageTimeout) - defer cancel() - var describe pb.Describe - if err = r.ReadMsgWithContext(ctx, &describe); err != nil { - return fmt.Errorf("read describe: %w", err) - } - - height, _ := binary.Uvarint(describe.BlockHeight) - var blockHash []byte - - if val, ok := s.lru.Get(height); ok { - blockHash = val.([]byte) - } else { - header, err := s.ethClient.HeaderByNumber(ctx, new(big.Int).SetUint64(height)) - if err != nil { - return fmt.Errorf("header by number: %w", err) - } - blockHash = header.Hash().Bytes() - _ = s.lru.Add(height, blockHash) - } - - var proof = pb.Proof{BlockHash: blockHash} - if err = w.WriteMsgWithContext(ctx, &proof); err != nil { - return fmt.Errorf("write proof: %w", err) - } - - return nil -} diff --git a/pkg/chainsync/chainsync_test.go b/pkg/chainsync/chainsync_test.go deleted file mode 100644 index ac4c30e671a..00000000000 --- a/pkg/chainsync/chainsync_test.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2021 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package chainsync_test - -import ( - "context" - "encoding/hex" - "errors" - "math/big" - "sync" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethersphere/bee/pkg/chainsync" - "github.com/ethersphere/bee/pkg/p2p/streamtest" - "github.com/ethersphere/bee/pkg/swarm" - "github.com/ethersphere/bee/pkg/transaction/backendmock" -) - -func TestProve(t *testing.T) { - t.Parallel() - - var ( - expHash = "9de2787d1d80a6164f4bc6359d9017131cbc14402ee0704bff0c6d691701c1dc" - mtx sync.Mutex - calledBlock string - trxBlock = common.HexToHash("0x2") - - nextBlockHeader = &types.Header{ - ParentHash: trxBlock, - } - ) - headerByNum := backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { - mtx.Lock() - calledBlock = number.String() - mtx.Unlock() - return nextBlockHeader, nil - }) - - mock := backendmock.New(headerByNum) - server, err := chainsync.New(nil, mock) - if err != nil { - t.Fatal(err) - } - - recorder := streamtest.New( - streamtest.WithProtocols(server.Protocol()), - ) - - client, err := chainsync.New(recorder, mock) - if err != nil { - t.Fatal(err) - } - - addr := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c") - hash, err := client.Prove(context.Background(), addr, 1) - if err != nil { - t.Fatal(err) - } - - if hex.EncodeToString(hash) != expHash { - t.Fatalf("want '%s' got '%s'", expHash, hash) - } - - mtx.Lock() - if calledBlock != "1" { - t.Fatalf("expected call block 1 got %s", calledBlock) - } - mtx.Unlock() -} - -func TestProveErr(t *testing.T) { - t.Parallel() - - headerByNum := backendmock.WithHeaderbyNumberFunc(func(ctx context.Context, number *big.Int) (*types.Header, error) { - return nil, errors.New("some error") - }) - - mock := backendmock.New(headerByNum) - server, err := chainsync.New(nil, mock) - if err != nil { - t.Fatal(err) - } - - recorder := streamtest.New(streamtest.WithProtocols(server.Protocol())) - - client, err := chainsync.New(recorder, mock) - if err != nil { - t.Fatal(err) - } - - addr := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c") - _, err = client.Prove(context.Background(), addr, 1) - if err == nil { - t.Fatal("expected error but got none") - } -} diff --git a/pkg/chainsync/main_test.go b/pkg/chainsync/main_test.go deleted file mode 100644 index 1d756758445..00000000000 --- a/pkg/chainsync/main_test.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2022 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package chainsync_test - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/pkg/chainsync/pb/chainsync.pb.go b/pkg/chainsync/pb/chainsync.pb.go deleted file mode 100644 index fff0f68e54c..00000000000 --- a/pkg/chainsync/pb/chainsync.pb.go +++ /dev/null @@ -1,493 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: chainsync.proto - -package pb - -import ( - fmt "fmt" - proto "github.com/gogo/protobuf/proto" - io "io" - math "math" - math_bits "math/bits" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package - -type Describe struct { - BlockHeight []byte `protobuf:"bytes,1,opt,name=BlockHeight,proto3" json:"BlockHeight,omitempty"` -} - -func (m *Describe) Reset() { *m = Describe{} } -func (m *Describe) String() string { return proto.CompactTextString(m) } -func (*Describe) ProtoMessage() {} -func (*Describe) Descriptor() ([]byte, []int) { - return fileDescriptor_3cb3bbfca2221464, []int{0} -} -func (m *Describe) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Describe) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Describe.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Describe) XXX_Merge(src proto.Message) { - xxx_messageInfo_Describe.Merge(m, src) -} -func (m *Describe) XXX_Size() int { - return m.Size() -} -func (m *Describe) XXX_DiscardUnknown() { - xxx_messageInfo_Describe.DiscardUnknown(m) -} - -var xxx_messageInfo_Describe proto.InternalMessageInfo - -func (m *Describe) GetBlockHeight() []byte { - if m != nil { - return m.BlockHeight - } - return nil -} - -type Proof struct { - BlockHash []byte `protobuf:"bytes,1,opt,name=BlockHash,proto3" json:"BlockHash,omitempty"` -} - -func (m *Proof) Reset() { *m = Proof{} } -func (m *Proof) String() string { return proto.CompactTextString(m) } -func (*Proof) ProtoMessage() {} -func (*Proof) Descriptor() ([]byte, []int) { - return fileDescriptor_3cb3bbfca2221464, []int{1} -} -func (m *Proof) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Proof) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Proof.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Proof) XXX_Merge(src proto.Message) { - xxx_messageInfo_Proof.Merge(m, src) -} -func (m *Proof) XXX_Size() int { - return m.Size() -} -func (m *Proof) XXX_DiscardUnknown() { - xxx_messageInfo_Proof.DiscardUnknown(m) -} - -var xxx_messageInfo_Proof proto.InternalMessageInfo - -func (m *Proof) GetBlockHash() []byte { - if m != nil { - return m.BlockHash - } - return nil -} - -func init() { - proto.RegisterType((*Describe)(nil), "chainsync.Describe") - proto.RegisterType((*Proof)(nil), "chainsync.Proof") -} - -func init() { proto.RegisterFile("chainsync.proto", fileDescriptor_3cb3bbfca2221464) } - -var fileDescriptor_3cb3bbfca2221464 = []byte{ - // 139 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4f, 0xce, 0x48, 0xcc, - 0xcc, 0x2b, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0x0b, 0x28, - 0xe9, 0x70, 0x71, 0xb8, 0xa4, 0x16, 0x27, 0x17, 0x65, 0x26, 0xa5, 0x0a, 0x29, 0x70, 0x71, 0x3b, - 0xe5, 0xe4, 0x27, 0x67, 0x7b, 0xa4, 0x66, 0xa6, 0x67, 0x94, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0xf0, - 0x04, 0x21, 0x0b, 0x29, 0xa9, 0x72, 0xb1, 0x06, 0x14, 0xe5, 0xe7, 0xa7, 0x09, 0xc9, 0x70, 0x71, - 0x42, 0xc4, 0x13, 0x8b, 0x33, 0xa0, 0x0a, 0x11, 0x02, 0x4e, 0x32, 0x27, 0x1e, 0xc9, 0x31, 0x5e, - 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, 0x31, - 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0xc5, 0x54, 0x90, 0x94, 0xc4, 0x06, 0x76, 0x84, 0x31, 0x20, 0x00, - 0x00, 0xff, 0xff, 0xee, 0x19, 0x2e, 0xd3, 0x97, 0x00, 0x00, 0x00, -} - -func (m *Describe) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Describe) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Describe) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.BlockHeight) > 0 { - i -= len(m.BlockHeight) - copy(dAtA[i:], m.BlockHeight) - i = encodeVarintChainsync(dAtA, i, uint64(len(m.BlockHeight))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *Proof) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Proof) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Proof) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.BlockHash) > 0 { - i -= len(m.BlockHash) - copy(dAtA[i:], m.BlockHash) - i = encodeVarintChainsync(dAtA, i, uint64(len(m.BlockHash))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func encodeVarintChainsync(dAtA []byte, offset int, v uint64) int { - offset -= sovChainsync(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return base -} -func (m *Describe) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.BlockHeight) - if l > 0 { - n += 1 + l + sovChainsync(uint64(l)) - } - return n -} - -func (m *Proof) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.BlockHash) - if l > 0 { - n += 1 + l + sovChainsync(uint64(l)) - } - return n -} - -func sovChainsync(x uint64) (n int) { - return (math_bits.Len64(x|1) + 6) / 7 -} -func sozChainsync(x uint64) (n int) { - return sovChainsync(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (m *Describe) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowChainsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Describe: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Describe: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field BlockHeight", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowChainsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthChainsync - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthChainsync - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.BlockHeight = append(m.BlockHeight[:0], dAtA[iNdEx:postIndex]...) - if m.BlockHeight == nil { - m.BlockHeight = []byte{} - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipChainsync(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthChainsync - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthChainsync - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Proof) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowChainsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Proof: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Proof: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field BlockHash", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowChainsync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthChainsync - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthChainsync - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.BlockHash = append(m.BlockHash[:0], dAtA[iNdEx:postIndex]...) - if m.BlockHash == nil { - m.BlockHash = []byte{} - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipChainsync(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthChainsync - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthChainsync - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipChainsync(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - depth := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowChainsync - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowChainsync - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - case 1: - iNdEx += 8 - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowChainsync - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if length < 0 { - return 0, ErrInvalidLengthChainsync - } - iNdEx += length - case 3: - depth++ - case 4: - if depth == 0 { - return 0, ErrUnexpectedEndOfGroupChainsync - } - depth-- - case 5: - iNdEx += 4 - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - if iNdEx < 0 { - return 0, ErrInvalidLengthChainsync - } - if depth == 0 { - return iNdEx, nil - } - } - return 0, io.ErrUnexpectedEOF -} - -var ( - ErrInvalidLengthChainsync = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowChainsync = fmt.Errorf("proto: integer overflow") - ErrUnexpectedEndOfGroupChainsync = fmt.Errorf("proto: unexpected end of group") -) diff --git a/pkg/chainsync/pb/chainsync.proto b/pkg/chainsync/pb/chainsync.proto deleted file mode 100644 index 74068066953..00000000000 --- a/pkg/chainsync/pb/chainsync.proto +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2021 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -syntax = "proto3"; - -package chainsync; - -option go_package = "pb"; - -message Describe { - bytes BlockHeight = 1; -} - -message Proof { - bytes BlockHash = 1; -} diff --git a/pkg/chainsync/pb/doc.go b/pkg/chainsync/pb/doc.go deleted file mode 100644 index 63e0f143ac3..00000000000 --- a/pkg/chainsync/pb/doc.go +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright 2021 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:generate sh -c "protoc -I . -I \"$(go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)/protobuf\" --gogofaster_out=. chainsync.proto" - -// Package pb holds only Protocol Buffer definitions and generated code. -package pb diff --git a/pkg/node/node.go b/pkg/node/node.go index b4862026a3a..467e41aef50 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -30,7 +30,6 @@ import ( "github.com/ethersphere/bee/pkg/addressbook" "github.com/ethersphere/bee/pkg/api" "github.com/ethersphere/bee/pkg/auth" - "github.com/ethersphere/bee/pkg/chainsync" "github.com/ethersphere/bee/pkg/config" "github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/feeds/factory" @@ -111,7 +110,6 @@ type Bee struct { postageServiceCloser io.Closer priceOracleCloser io.Closer hiveCloser io.Closer - chainSyncerCloser io.Closer saludCloser io.Closer storageIncetivesCloser io.Closer pushSyncCloser io.Closer @@ -1100,16 +1098,6 @@ func NewBee( ) b.resolverCloser = multiResolver - if o.FullNodeMode { - cs, err := chainsync.New(p2ps, chainBackend) - if err != nil { - return nil, fmt.Errorf("new chainsync: %w", err) - } - if err = p2ps.AddProtocol(cs.Protocol()); err != nil { - return nil, fmt.Errorf("chainsync protocol: %w", err) - } - } - feedFactory := factory.New(localStore.Download(true)) steward := steward.New(localStore, retrieve) @@ -1315,11 +1303,7 @@ func (b *Bee) Shutdown() error { } var wg sync.WaitGroup - wg.Add(8) - go func() { - defer wg.Done() - tryClose(b.chainSyncerCloser, "chain syncer") - }() + wg.Add(7) go func() { defer wg.Done() tryClose(b.pssCloser, "pss") diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go index e3d8fe4054e..c2fe6598273 100644 --- a/pkg/node/statestore.go +++ b/pkg/node/statestore.go @@ -74,7 +74,10 @@ func checkOverlay(storer storage.StateStorer, overlay swarm.Address) error { var storedOverlay swarm.Address err := storer.Get(noncedOverlayKey, &storedOverlay) if err != nil { - return err + if !errors.Is(err, storage.ErrNotFound) { + return err + } + return storer.Put(noncedOverlayKey, overlay) } if !storedOverlay.Equal(overlay) { diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 5ea73581ffc..c4f6933fa97 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -222,7 +222,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin // cancel all bins peer.gone() - p.logger.Debug("peer epoch change detected, resetting past synced intervals", "peer_address", peer.address) + p.logger.Debug("peer epoch change detected, resetting past synced intervals", "stored_epoch", storedEpoch, "new_epoch", epoch, "peer_address", peer.address) err = p.resetPeerIntervals(peer.address) if err != nil { diff --git a/pkg/statestore/storeadapter/migration.go b/pkg/statestore/storeadapter/migration.go index 09dc421e3e4..03e3ddb64e4 100644 --- a/pkg/statestore/storeadapter/migration.go +++ b/pkg/statestore/storeadapter/migration.go @@ -16,6 +16,7 @@ func allSteps() migration.Steps { 1: epochMigration, 2: deletePrefix("sync_interval"), 3: deletePrefix("sync_interval"), + 4: deletePrefix("blocklist"), } }