diff --git a/astria/mempool/reaper.go b/astria/mempool/reaper.go index 2dba56708..307bcf04b 100644 --- a/astria/mempool/reaper.go +++ b/astria/mempool/reaper.go @@ -1,7 +1,6 @@ package mempool import ( - "fmt" "sync" "github.com/astriaorg/rollkit/astria/sequencer" @@ -13,7 +12,6 @@ type MempoolReaper struct { c *sequencer.Client mempool *mempool.CListMempool logger log.Logger - mu sync.Mutex started bool stopCh chan struct{} @@ -53,11 +51,12 @@ func (mr *MempoolReaper) Reap() { mr.logger.Info("reaped tx from mempool", "tx", mempoolTx.Tx()) // submit to shared sequencer - res, err := mr.c.BroadcastTx(mempoolTx.Tx()) + err := mr.c.SendMessageViaComposer(mempoolTx.Tx()) if err != nil { - panic(fmt.Sprintf("error sending message: %s\n", err)) + mr.logger.Error("error sending message: %s\n", err) + return } - mr.logger.Debug("tx response", "log", res.Log) + mr.logger.Debug("succesfully sent transaction to composer") // wait for next tx tx0 = tx0.NextWait() diff --git a/astria/sequencer/client.go b/astria/sequencer/client.go index 91b7c2ba3..2fa2a338e 100644 --- a/astria/sequencer/client.go +++ b/astria/sequencer/client.go @@ -6,32 +6,43 @@ import ( "fmt" astriaPb "buf.build/gen/go/astria/astria/protocolbuffers/go/astria/sequencer/v1" + "buf.build/gen/go/astria/composer-apis/grpc/go/astria/composer/v1alpha1/composerv1alpha1grpc" + astriaComposerPb "buf.build/gen/go/astria/composer-apis/protocolbuffers/go/astria/composer/v1alpha1" "github.com/astriaorg/go-sequencer-client/client" "github.com/cometbft/cometbft/libs/log" tendermintPb "github.com/cometbft/cometbft/rpc/core/types" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/encoding/protojson" ) // SequencerClient is a client for interacting with the sequencer. type Client struct { - Client *client.Client - Signer *client.Signer - rollupId [32]byte - nonce uint32 - logger log.Logger + Client *client.Client + composerClient *grpc.ClientConn + Signer *client.Signer + rollupId []byte + nonce uint32 + logger log.Logger } -func NewClient(sequencerAddr string, private ed25519.PrivateKey, rollupId [32]byte, logger log.Logger) *Client { +func NewClient(sequencerAddr string, composerAddr string, private ed25519.PrivateKey, rollupId []byte, logger log.Logger) *Client { c, err := client.NewClient(sequencerAddr) if err != nil { panic(err) } + conn, err := grpc.Dial(composerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + panic(err) + } + return &Client{ - Client: c, - Signer: client.NewSigner(private), - rollupId: rollupId, - logger: logger, + Client: c, + composerClient: conn, + Signer: client.NewSigner(private), + rollupId: rollupId, + logger: logger, } } @@ -97,3 +108,18 @@ func (c *Client) BroadcastTx(tx []byte) (*tendermintPb.ResultBroadcastTx, error) return resp, nil } + +func (sc *Client) SendMessageViaComposer(tx []byte) error { + grpcCollectorServiceClient := composerv1alpha1grpc.NewGrpcCollectorServiceClient(sc.composerClient) + // if the request succeeds, then an empty response will be returned which can be ignored for now + _, err := grpcCollectorServiceClient.SubmitRollupTransaction(context.Background(), &astriaComposerPb.SubmitRollupTransactionRequest{ + RollupId: sc.rollupId, + Data: tx, + }) + if err != nil { + return err + } + + return nil + +} diff --git a/config/config.go b/config/config.go index abd646756..7268d7f0d 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ const ( flagAstriaGrpcListen = "rollkit.astria_grpc_listen" flagAstriaSeqAddress = "rollkit.astria_seq_addr" + flagAstriaComposerRpc = "rollkit.astria_composer_rpc_addr" flagAstriaSeqPrivate = "rollkit.astria_seq_private" flagAstriaSeqInitialHeight = "rollkit.astria_seq_initial_height" flagDAVariance = "rollkit.da_variance" @@ -42,6 +43,7 @@ type BlockManagerConfig struct { type AstriaSeqConfig struct { GrpcListen string `mapstructure:"astria_grpc_listen"` SeqAddress string `mapstructure:"astria_seq_addr"` + ComposerRpc string `mapstructure:"astria_composer_rpc_addr"` SeqPrivate string `mapstructure:"astria_seq_private"` SeqInitialHeight uint64 `mapstructure:"astria_seq_initial_height"` } @@ -82,6 +84,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error { nc.Astria.GrpcListen = v.GetString(flagAstriaGrpcListen) nc.Astria.SeqAddress = v.GetString(flagAstriaSeqAddress) + nc.Astria.ComposerRpc = v.GetString(flagAstriaComposerRpc) nc.Astria.SeqPrivate = v.GetString(flagAstriaSeqPrivate) nc.Astria.SeqInitialHeight = v.GetUint64(flagAstriaSeqInitialHeight) @@ -95,6 +98,7 @@ func AddFlags(cmd *cobra.Command) { def := DefaultNodeConfig cmd.Flags().String(flagAstriaGrpcListen, def.Astria.GrpcListen, "Astria gRPC listen address for execution api") cmd.Flags().String(flagAstriaSeqAddress, def.Astria.SeqAddress, "Astria sequencer address") + cmd.Flags().String(flagAstriaComposerRpc, def.Astria.ComposerRpc, "Astria composer RPC address") cmd.Flags().String(flagAstriaSeqPrivate, def.Astria.SeqPrivate, "Astria sequencer private key") cmd.Flags().Uint64(flagDAStartHeight, def.DAStartHeight, "The first block height of celestia chain to use for rollup transactions") cmd.Flags().Uint64(flagAstriaSeqInitialHeight, def.Astria.SeqInitialHeight, "The first block height of sequencer chain to use for rollup transactions") diff --git a/config/defaults.go b/config/defaults.go index 1d9c23afe..3d4c9164e 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -26,6 +26,7 @@ var DefaultNodeConfig = NodeConfig{ Astria: AstriaSeqConfig{ GrpcListen: ":50051", SeqAddress: "http://localhost:26657", + ComposerRpc: "127.0.0.1:5053", SeqPrivate: "2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90", SeqInitialHeight: 1, }, diff --git a/go.mod b/go.mod index 260ab310b..12217cece 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,8 @@ require ( ) require ( + buf.build/gen/go/astria/composer-apis/grpc/go v1.3.0-20240329163554-64ef75007d48.2 // indirect + buf.build/gen/go/astria/composer-apis/protocolbuffers/go v1.33.0-20240329163554-64ef75007d48.1 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect diff --git a/go.sum b/go.sum index c55062372..f67bd3926 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,11 @@ bitbucket.org/creachadair/shell v0.0.6/go.mod h1:8Qqi/cYk7vPnsOePHroKXDJYmb5x7ENhtiFtfZq8K+M= buf.build/gen/go/astria/astria/protocolbuffers/go v1.33.0-20240403190008-c770a4039013.1 h1:/JMLS3FOO3uvJaV1dO9AowylUiA2axxyQ+MVWxgGnZQ= buf.build/gen/go/astria/astria/protocolbuffers/go v1.33.0-20240403190008-c770a4039013.1/go.mod h1:ajk84nq5amXeWBNpY1SIIWNjaqlylLm8W5SJd8AqXBM= +buf.build/gen/go/astria/composer-apis/grpc/go v1.3.0-20240329163554-64ef75007d48.2 h1:3ruOAo7ec4ydfI0S8e9PpP216zaVFKOva0ybj0WfwrY= +buf.build/gen/go/astria/composer-apis/grpc/go v1.3.0-20240329163554-64ef75007d48.2/go.mod h1:wvrUH/9cbYsOP+H1CqwfAYpdn4sRQ0DsP2TBYeWLtpk= +buf.build/gen/go/astria/composer-apis/protocolbuffers/go v1.28.1-20240329163554-64ef75007d48.4/go.mod h1:KHkk6Dwc7uRTiQ0kSuN/PbePwD9RP9/VdLwJ9eWiZYo= +buf.build/gen/go/astria/composer-apis/protocolbuffers/go v1.33.0-20240329163554-64ef75007d48.1 h1:OmL6g54SD3F1MkNx2UCMqcQxSwjCjbs/jxdfhHCHxhk= +buf.build/gen/go/astria/composer-apis/protocolbuffers/go v1.33.0-20240329163554-64ef75007d48.1/go.mod h1:LClp/p6nNkJZhhXypS4e/qIpPhT+7T/MdLBiGi9WsKk= buf.build/gen/go/astria/execution-apis/grpc/go v1.3.0-20240209225522-97e3bc68f856.2 h1:mK0jVG2+QlJrPKsgL46KKh2ZqHc8lyexQP3TUcOvvNU= buf.build/gen/go/astria/execution-apis/grpc/go v1.3.0-20240209225522-97e3bc68f856.2/go.mod h1:soA8k5qokjmp9DmV6jdcWndlVdSYjfa8KJZgsJrvixc= buf.build/gen/go/astria/execution-apis/protocolbuffers/go v1.28.1-20240209225522-97e3bc68f856.4/go.mod h1:5wxRDkWimPnuhDUA4pFBaHMtrViNJAHguLU1Wq8T6x8= diff --git a/node/full.go b/node/full.go index 7a63172a0..eaa87f3f0 100644 --- a/node/full.go +++ b/node/full.go @@ -162,7 +162,7 @@ func newFullNode( return nil, err } private := ed25519.NewKeyFromSeed(privateKeyBytes) - seqClient := sequencer.NewClient(nodeConfig.Astria.SeqAddress, private, execGenesisInfo.RollupId, logger.With("module", "seqclient")) + seqClient := sequencer.NewClient(nodeConfig.Astria.SeqAddress, nodeConfig.Astria.ComposerRpc, private, execGenesisInfo.RollupId[:], logger.With("module", "seqclient")) reaper := astriamempool.NewMempoolReaper(seqClient, mempool, logger.With("module", "reaper")) // init grpc execution api