From 2a61400744a41a6c65db1a00334db6f876d2c9d9 Mon Sep 17 00:00:00 2001 From: Josh Dechant Date: Thu, 8 Feb 2024 22:19:35 -0500 Subject: [PATCH] add state for conductor commitments + stuff --- astria/execution/handler.go | 13 +++--- astria/execution/v1alpha2.go | 57 +++++++----------------- astria/mempool/reaper.go | 2 +- astria/sequencer/client.go | 7 ++- astria/state/commitment/kv.go | 74 +++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 12 +---- node/full.go | 53 ++++++++++------------ node/mempool.go | 83 ----------------------------------- 9 files changed, 130 insertions(+), 173 deletions(-) create mode 100644 astria/state/commitment/kv.go delete mode 100644 node/mempool.go diff --git a/astria/execution/handler.go b/astria/execution/handler.go index 1d581b075..ef052aaaa 100644 --- a/astria/execution/handler.go +++ b/astria/execution/handler.go @@ -1,11 +1,11 @@ package execution import ( - log "log/slog" "net" "sync" astriaGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/execution/v1alpha2/executionv1alpha2grpc" + "github.com/cometbft/cometbft/libs/log" "google.golang.org/grpc" ) @@ -17,20 +17,22 @@ type GRPCServerHandler struct { endpoint string server *grpc.Server executionServiceServerV1a2 *astriaGrpc.ExecutionServiceServer + logger log.Logger } // NewServer creates a new gRPC server. // It registers the execution service server. // It registers the gRPC server with the node so it can be stopped on shutdown. -func NewGRPCServerHandler(execServ astriaGrpc.ExecutionServiceServer, endpoint string) *GRPCServerHandler { +func NewGRPCServerHandler(execServ astriaGrpc.ExecutionServiceServer, endpoint string, logger log.Logger) *GRPCServerHandler { server := grpc.NewServer() - log.Info("gRPC server enabled", "endpoint", endpoint) + logger.Info("gRPC server enabled", "endpoint", endpoint) handler := &GRPCServerHandler{ endpoint: endpoint, server: server, executionServiceServerV1a2: &execServ, + logger: logger, } astriaGrpc.RegisterExecutionServiceServer(server, execServ) @@ -49,10 +51,11 @@ func (handler *GRPCServerHandler) Start() error { // Start the gRPC server lis, err := net.Listen("tcp", handler.endpoint) if err != nil { + handler.logger.Error("gRPC server could not be started", "err", err) return err } go handler.server.Serve(lis) - log.Info("gRPC server started", "endpoint", handler.endpoint) + handler.logger.Info("gRPC server started", "endpoint", handler.endpoint) return nil } @@ -62,6 +65,6 @@ func (handler *GRPCServerHandler) Stop() error { defer handler.mu.Unlock() handler.server.GracefulStop() - log.Info("gRPC server stopped", "endpoint", handler.endpoint) + handler.logger.Info("gRPC server stopped", "endpoint", handler.endpoint) return nil } diff --git a/astria/execution/v1alpha2.go b/astria/execution/v1alpha2.go index 99059c1e3..307ac0d35 100644 --- a/astria/execution/v1alpha2.go +++ b/astria/execution/v1alpha2.go @@ -3,10 +3,10 @@ package execution import ( "context" "sync" - "time" astriaGrpc "buf.build/gen/go/astria/execution-apis/grpc/go/astria/execution/v1alpha2/executionv1alpha2grpc" astriaPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/execution/v1alpha2" + "github.com/astriaorg/rollkit/astria/state/commitment" "github.com/astriaorg/rollkit/block" "github.com/astriaorg/rollkit/store" "github.com/astriaorg/rollkit/types" @@ -23,6 +23,7 @@ type ExecutionServiceServerV1Alpha2 struct { // UnimplementedExecutionServiceServer for forward compatibility astriaGrpc.UnimplementedExecutionServiceServer + commitmentStore *commitment.CommitmentState store store.Store blockManager *block.SSManager genesis GenesisInfo @@ -37,12 +38,13 @@ type GenesisInfo struct { CelestiaBlockVariance uint64 } -func NewExecutionServiceServerV1Alpha2(blockManager *block.SSManager, genesis GenesisInfo, store store.Store, logger log.Logger) *ExecutionServiceServerV1Alpha2 { +func NewExecutionServiceServerV1Alpha2(blockManager *block.SSManager, genesis GenesisInfo, commitmentStore *commitment.CommitmentState, store store.Store, logger log.Logger) *ExecutionServiceServerV1Alpha2 { return &ExecutionServiceServerV1Alpha2{ - blockManager: blockManager, - genesis: genesis, - store: store, - logger: logger, + blockManager: blockManager, + genesis: genesis, + commitmentStore: commitmentStore, + store: store, + logger: logger, } } @@ -148,40 +150,10 @@ func (s *ExecutionServiceServerV1Alpha2) GetCommitmentState(ctx context.Context, reqJson, _ := protojson.Marshal(req) s.logger.Info("GetCommitmentState called", "request", reqJson) - var res *astriaPb.CommitmentState - - height := s.blockManager.GetStoreHeight() - - if height == 0 { - genHash := [32]byte{0x0} - pbGenBlock := &astriaPb.Block{ - Number: uint32(0), - Hash: genHash[:], - ParentBlockHash: genHash[:], - Timestamp: timestamppb.New(time.Now()), - } - res = &astriaPb.CommitmentState{ - Soft: pbGenBlock, - Firm: pbGenBlock, - } - } else { - block, err := s.store.GetBlock(ctx, height) - if err != nil { - s.logger.Error("failed finding block with height", "height", height, "error", err) - return nil, err - } - - pbBlock := &astriaPb.Block{ - Number: uint32(block.Height()), - Hash: cmbytes.HexBytes(block.Hash()), - ParentBlockHash: cmbytes.HexBytes(block.LastHeader()), - Timestamp: timestamppb.New(block.Time()), - } - - res = &astriaPb.CommitmentState{ - Soft: pbBlock, - Firm: pbBlock, - } + res, err := s.commitmentStore.GetCommitmentState() + if err != nil { + s.logger.Error("GetCommitmentState failed", "err", err) + return &astriaPb.CommitmentState{}, err } resJson, _ := protojson.Marshal(res) @@ -193,6 +165,11 @@ func (s *ExecutionServiceServerV1Alpha2) GetCommitmentState(ctx context.Context, func (s *ExecutionServiceServerV1Alpha2) UpdateCommitmentState(ctx context.Context, req *astriaPb.UpdateCommitmentStateRequest) (*astriaPb.CommitmentState, error) { reqJson, _ := protojson.Marshal(req) s.logger.Info("UpdateCommitmentState called", "request", reqJson) + + if err := s.commitmentStore.UpdateCommitmentState(req.CommitmentState); err != nil { + s.logger.Error("UpdateCommitmentState failed", "err", err) + } + return req.CommitmentState, nil } diff --git a/astria/mempool/reaper.go b/astria/mempool/reaper.go index e5b5b570d..2dba56708 100644 --- a/astria/mempool/reaper.go +++ b/astria/mempool/reaper.go @@ -57,7 +57,7 @@ func (mr *MempoolReaper) Reap() { if err != nil { panic(fmt.Sprintf("error sending message: %s\n", err)) } - println(res.Log) + mr.logger.Debug("tx response", "log", res.Log) // wait for next tx tx0 = tx0.NextWait() diff --git a/astria/sequencer/client.go b/astria/sequencer/client.go index 8e59eb9ce..f186d0453 100644 --- a/astria/sequencer/client.go +++ b/astria/sequencer/client.go @@ -7,6 +7,7 @@ import ( astriaPb "buf.build/gen/go/astria/astria/protocolbuffers/go/astria/sequencer/v1alpha1" "github.com/astriaorg/go-sequencer-client/client" + "github.com/cometbft/cometbft/libs/log" tendermintPb "github.com/cometbft/cometbft/rpc/core/types" "google.golang.org/protobuf/encoding/protojson" ) @@ -17,9 +18,10 @@ type Client struct { Signer *client.Signer rollupId [32]byte nonce uint32 + logger log.Logger } -func NewClient(sequencerAddr string, private ed25519.PrivateKey, rollupId [32]byte) *Client { +func NewClient(sequencerAddr string, private ed25519.PrivateKey, rollupId [32]byte, logger log.Logger) *Client { c, err := client.NewClient(sequencerAddr) if err != nil { panic(err) @@ -29,6 +31,7 @@ func NewClient(sequencerAddr string, private ed25519.PrivateKey, rollupId [32]by Client: c, Signer: client.NewSigner(private), rollupId: rollupId, + logger: logger, } } @@ -53,7 +56,7 @@ func (c *Client) BroadcastTx(tx []byte) (*tendermintPb.ResultBroadcastTx, error) } signedJson, _ := protojson.Marshal(signed) - fmt.Printf("submitting tx to sequencer: %s\n", signedJson) + c.logger.Info("Submitting tx to sequencer", "signedTx", signedJson) resp, err := c.Client.BroadcastTxSync(context.Background(), signed) if err != nil { diff --git a/astria/state/commitment/kv.go b/astria/state/commitment/kv.go new file mode 100644 index 000000000..1d1fa0e89 --- /dev/null +++ b/astria/state/commitment/kv.go @@ -0,0 +1,74 @@ +package commitment + +import ( + "context" + "fmt" + "time" + + astriaPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/execution/v1alpha2" + ds "github.com/ipfs/go-datastore" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const CommitmentKey = "commitment" + +type CommitmentState struct { + store ds.TxnDatastore + ctx context.Context +} + +func NewCommitmentState(ctx context.Context, store ds.TxnDatastore) *CommitmentState { + return &CommitmentState{ + store: store, + ctx: ctx, + } +} + +func (cs *CommitmentState) UpdateCommitmentState(commitment *astriaPb.CommitmentState) error { + txn, err := cs.store.NewTransaction(cs.ctx, false) + if err != nil { + return fmt.Errorf("failed to create a new batch for transaction: %w", err) + } + defer txn.Discard(cs.ctx) + + key := ds.NewKey(CommitmentKey) + val, err := proto.Marshal(commitment) + if err != nil { + return fmt.Errorf("failed to marshal the commitment: %w", err) + } + + if err := txn.Put(cs.ctx, key, val); err != nil { + return err + } + + return txn.Commit(cs.ctx) +} + +func (cs *CommitmentState) GetCommitmentState() (*astriaPb.CommitmentState, error) { + val, err := cs.store.Get(cs.ctx, ds.NewKey(CommitmentKey)) + if err != nil { + if err == ds.ErrNotFound { + genHash := [32]byte{0x0} + pbGenBlock := &astriaPb.Block{ + Number: uint32(0), + Hash: genHash[:], + ParentBlockHash: genHash[:], + Timestamp: timestamppb.New(time.Now()), + } + return &astriaPb.CommitmentState{ + Soft: pbGenBlock, + Firm: pbGenBlock, + }, nil + } else { + return nil, fmt.Errorf("failed to get the commitment: %w", err) + } + } + + commitment := &astriaPb.CommitmentState{} + if err := proto.Unmarshal(val, commitment); err != nil { + return nil, fmt.Errorf("failed to unmarshal the commitment: %w", err) + } + + return commitment, nil +} diff --git a/go.mod b/go.mod index 5f4d99337..e0ba4d6cb 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( ) require ( - buf.build/gen/go/astria/astria/protocolbuffers/go v1.32.0-20240208041217-cec081a8099b.1 + buf.build/gen/go/astria/astria/protocolbuffers/go v1.32.0-20240208213846-0e715ec578d3.1 buf.build/gen/go/astria/execution-apis/grpc/go v1.3.0-20240207231045-2f6384a93a8d.2 buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.32.0-20240207231045-2f6384a93a8d.1 github.com/astriaorg/go-sequencer-client v0.0.0-20231201013457-0df599de8e74 diff --git a/go.sum b/go.sum index e842f4bca..d751107f2 100644 --- a/go.sum +++ b/go.sum @@ -1,20 +1,12 @@ 4d63.com/gochecknoglobals v0.1.0/go.mod h1:wfdC5ZjKSPr7CybKEcgJhUOgeAQW1+7WcyK8OvUilfo= bitbucket.org/creachadair/shell v0.0.6/go.mod h1:8Qqi/cYk7vPnsOePHroKXDJYmb5x7ENhtiFtfZq8K+M= -buf.build/gen/go/astria/astria/protocolbuffers/go v1.32.0-20240208041217-cec081a8099b.1 h1:fnyvaSUmCcM+Ono8mu4KuR7Al0rcgsrswHRUB/osTaI= -buf.build/gen/go/astria/astria/protocolbuffers/go v1.32.0-20240208041217-cec081a8099b.1/go.mod h1:HTIae3hIWhV69v7f8BJQzr2tP6yd0/gxwLrMiG306RY= +buf.build/gen/go/astria/astria/protocolbuffers/go v1.32.0-20240208213846-0e715ec578d3.1 h1:1NLTTOQkht+eaKjttaFHfN83DKh4wduI+mkelmfuIr4= +buf.build/gen/go/astria/astria/protocolbuffers/go v1.32.0-20240208213846-0e715ec578d3.1/go.mod h1:Ma4TsfKTQby0r4gCkdb6D2ThBxTNu0mSVNZF1udF2ko= buf.build/gen/go/astria/execution-apis/grpc/go v1.3.0-20240207231045-2f6384a93a8d.2 h1:AjjfhXMKvUteizjKbLvE0wINSs7Zp/BhhdTBJTdXEJc= buf.build/gen/go/astria/execution-apis/grpc/go v1.3.0-20240207231045-2f6384a93a8d.2/go.mod h1:LK1fGXWZLwItgvIjWjPyJkEoFvpQ6evGC/nyXlzwZw0= buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.28.1-20240207231045-2f6384a93a8d.4/go.mod h1:5wxRDkWimPnuhDUA4pFBaHMtrViNJAHguLU1Wq8T6x8= buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.32.0-20240207231045-2f6384a93a8d.1 h1:480dXLg2BTRFoXGVkCxUyOoS1v3dWn+LM77kRP30ZIU= buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.32.0-20240207231045-2f6384a93a8d.1/go.mod h1:m409hJcO0kExqrFoQS8fQ7yXHuuM8JRTEYB+09WWVy0= -buf.build/gen/go/cosmos/cosmos-proto/protocolbuffers/go v1.32.0-20211202220400-1935555c206d.1/go.mod h1:GpU2rx3tDDSvCER8/rvvgu6s6LeMU73TKjfBZ89OZKg= -buf.build/gen/go/cosmos/cosmos-sdk/protocolbuffers/go v1.32.0-20230522115704-e7a85cef453e.1/go.mod h1:J8VpbpzO6pccEOWdLbYylDP8lRPifk2EA8tmJNkdZZo= -buf.build/gen/go/cosmos/cosmos-sdk/protocolbuffers/go v1.32.0-20230719110346-aa25660f4ff7.1/go.mod h1:Tl9HTTTqDT4kfcsEwfEyUeFdJmbcgSOXL2q50rG5XBw= -buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.32.0-20221020125208-34d970b699f8.1/go.mod h1:5GqIYthcy/ASmnKcaT26APpxMhZirnIHXHKki69zjWI= -buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.32.0-20230509103710-5e5b9fdd0180.1/go.mod h1:5GqIYthcy/ASmnKcaT26APpxMhZirnIHXHKki69zjWI= -buf.build/gen/go/cosmos/ibc/protocolbuffers/go v1.32.0-20230913112312-7ab44ae956a0.1/go.mod h1:IPgm3IicGPCXFRfFMLOgHFp3kexNEULuIzEJshSNPcY= -buf.build/gen/go/cosmos/ics23/protocolbuffers/go v1.32.0-20221207100654-55085f7c710a.1/go.mod h1:pjXPxEgmuc0apOM+/10DC/vWa3di1I0Z+CxwKqRXWYQ= -buf.build/gen/go/penumbra-zone/penumbra/protocolbuffers/go v1.32.0-20231120132728-bc443669626d.1/go.mod h1:HOI8VDE0aOk+R3EtXPICCUm0pO8D0PSnS7k2fFq2soE= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.31.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= diff --git a/node/full.go b/node/full.go index 770aacd3f..e20aebedf 100644 --- a/node/full.go +++ b/node/full.go @@ -27,6 +27,7 @@ import ( "github.com/astriaorg/rollkit/astria/execution" astriamempool "github.com/astriaorg/rollkit/astria/mempool" "github.com/astriaorg/rollkit/astria/sequencer" + "github.com/astriaorg/rollkit/astria/state/commitment" "github.com/astriaorg/rollkit/block" "github.com/astriaorg/rollkit/config" "github.com/astriaorg/rollkit/mempool" @@ -41,8 +42,9 @@ import ( // prefixes used in KV store to separate main node data from DALC data var ( - mainPrefix = "0" - indexerPrefix = "2" // indexPrefix uses "i", so using "0-2" to avoid clash + mainPrefix = "0" + indexerPrefix = "1" // indexPrefix uses "i", so using "0-2" to avoid clash + commitmentPrefix = "2" ) const ( @@ -66,14 +68,9 @@ type FullNode struct { proxyApp proxy.AppConns eventBus *cmtypes.EventBus - // dalc *da.DAClient - // p2pClient *p2p.Client - // hSyncService *block.HeaderSyncService - // bSyncService *block.BlockSyncService // TODO(tzdybal): consider extracting "mempool reactor" Mempool mempool.Mempool - mempoolIDs *mempoolIDs Store store.Store BlockManager *block.SSManager client rpcclient.Client @@ -184,39 +181,33 @@ func newFullNode( return nil, err } private := ed25519.NewKeyFromSeed(privateKeyBytes) - seqClient := sequencer.NewClient(nodeConfig.Astria.SeqAddress, private, execGenesisInfo.RollupId) - reaper := astriamempool.NewMempoolReaper(seqClient, mempool, logger) + seqClient := sequencer.NewClient(nodeConfig.Astria.SeqAddress, private, execGenesisInfo.RollupId, logger.With("module", "seqclient")) + reaper := astriamempool.NewMempoolReaper(seqClient, mempool, logger.With("module", "reaper")) // init grpc execution api - serviceV1a2 := execution.NewExecutionServiceServerV1Alpha2(blockManager, execGenesisInfo, store, logger) - grpcServerHandler := execution.NewGRPCServerHandler(serviceV1a2, nodeConfig.Astria.GrpcListen) + commitmentStore := commitment.NewCommitmentState(ctx, newPrefixKV(baseKV, commitmentPrefix)) + serviceV1a2 := execution.NewExecutionServiceServerV1Alpha2(blockManager, execGenesisInfo, commitmentStore, store, logger.With("module", "execution")) + grpcServerHandler := execution.NewGRPCServerHandler(serviceV1a2, nodeConfig.Astria.GrpcListen, logger.With("module", "execution")) node := &FullNode{ - proxyApp: proxyApp, - eventBus: eventBus, - genesis: genesis, - nodeConfig: nodeConfig, - // p2pClient: p2pClient, - BlockManager: blockManager, - // dalc: dalc, - Mempool: mempool, - mempoolIDs: newMempoolIDs(), - Store: store, - TxIndexer: txIndexer, - IndexerService: indexerService, - BlockIndexer: blockIndexer, - // hSyncService: headerSyncService, - // bSyncService: blockSyncService, - ctx: ctx, - cancel: cancel, - threadManager: types.NewThreadManager(), - + proxyApp: proxyApp, + eventBus: eventBus, + genesis: genesis, + nodeConfig: nodeConfig, + BlockManager: blockManager, + Mempool: mempool, + Store: store, + TxIndexer: txIndexer, + IndexerService: indexerService, + BlockIndexer: blockIndexer, + ctx: ctx, + cancel: cancel, + threadManager: types.NewThreadManager(), reaper: reaper, grpcServerHandler: grpcServerHandler, } node.BaseService = *service.NewBaseService(logger, "Node", node) - // node.p2pClient.SetTxValidator(node.newTxValidator(p2pMetrics)) node.client = NewFullClient(node) return node, nil diff --git a/node/mempool.go b/node/mempool.go deleted file mode 100644 index 214e8c185..000000000 --- a/node/mempool.go +++ /dev/null @@ -1,83 +0,0 @@ -package node - -import ( - "fmt" - "math" - "sync" - - "github.com/libp2p/go-libp2p/core/peer" -) - -const ( - maxActiveIDs = math.MaxUint16 -) - -type mempoolIDs struct { - mtx sync.RWMutex - peerMap map[peer.ID]uint16 - nextID uint16 // assumes that a node will never have over 65536 active peers - activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter -} - -// Reserve searches for the next unused ID and assigns it to the -// peer. -func (ids *mempoolIDs) ReserveForPeer(peer peer.ID) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - curID := ids.nextPeerID() - ids.peerMap[peer] = curID - ids.activeIDs[curID] = struct{}{} -} - -// nextPeerID returns the next unused peer ID to use. -// This assumes that ids's mutex is already locked. -func (ids *mempoolIDs) nextPeerID() uint16 { - if len(ids.activeIDs) == maxActiveIDs { - panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) - } - - _, idExists := ids.activeIDs[ids.nextID] - for idExists { - ids.nextID++ - _, idExists = ids.activeIDs[ids.nextID] - } - curID := ids.nextID - ids.nextID++ - return curID -} - -// Reclaim returns the ID reserved for the peer back to unused pool. -func (ids *mempoolIDs) Reclaim(peer peer.ID) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - removedID, ok := ids.peerMap[peer] - if ok { - delete(ids.activeIDs, removedID) - delete(ids.peerMap, peer) - } -} - -// GetForPeer returns an ID for the peer. ID is generated if required. -func (ids *mempoolIDs) GetForPeer(peer peer.ID) uint16 { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - id, ok := ids.peerMap[peer] - if !ok { - id = ids.nextPeerID() - ids.peerMap[peer] = id - ids.activeIDs[id] = struct{}{} - } - - return id -} - -func newMempoolIDs() *mempoolIDs { - return &mempoolIDs{ - peerMap: make(map[peer.ID]uint16), - activeIDs: map[uint16]struct{}{0: {}}, - nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx - } -}