Skip to content

Commit

Permalink
hacking rollkit to work with astria shared sequencer
Browse files Browse the repository at this point in the history
  • Loading branch information
mycodecrafting committed Feb 7, 2024
1 parent d91adbc commit b618beb
Show file tree
Hide file tree
Showing 12 changed files with 1,490 additions and 170 deletions.
67 changes: 67 additions & 0 deletions astria/execution/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package execution

import (
log "log/slog"
"net"
"sync"

astriaGrpc "buf.build/gen/go/astria/astria/grpc/go/astria/execution/v1alpha2/executionv1alpha2grpc"
"google.golang.org/grpc"
)

// GRPCServerHandler is the gRPC server handler.
// It gives us a way to attach the gRPC server to the node so it can be stopped on shutdown.
type GRPCServerHandler struct {
mu sync.Mutex

endpoint string
server *grpc.Server
executionServiceServerV1a2 *astriaGrpc.ExecutionServiceServer
}

// NewServer creates a new gRPC server.
// It registers the execution service server.
// It registers the gRPC server with the node so it can be stopped on shutdown.
func NewGRPCServerHandler(execServ astriaGrpc.ExecutionServiceServer, endpoint string) *GRPCServerHandler {
server := grpc.NewServer()

log.Info("gRPC server enabled", "endpoint", endpoint)

handler := &GRPCServerHandler{
endpoint: endpoint,
server: server,
executionServiceServerV1a2: &execServ,
}

astriaGrpc.RegisterExecutionServiceServer(server, execServ)
return handler
}

// Start starts the gRPC server if it is enabled.
func (handler *GRPCServerHandler) Start() error {
handler.mu.Lock()
defer handler.mu.Unlock()

if handler.endpoint == "" {
return nil
}

// Start the gRPC server
lis, err := net.Listen("tcp", handler.endpoint)
if err != nil {
return err
}
go handler.server.Serve(lis)
log.Info("gRPC server started", "endpoint", handler.endpoint)
return nil
}

// Stop stops the gRPC server.
func (handler *GRPCServerHandler) Stop() error {
handler.mu.Lock()
defer handler.mu.Unlock()

handler.server.GracefulStop()
log.Info("gRPC server stopped", "endpoint", handler.endpoint)
return nil
}
152 changes: 152 additions & 0 deletions astria/execution/v1alpha2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package execution

import (
"context"
"sync"

astriaGrpc "buf.build/gen/go/astria/astria/grpc/go/astria/execution/v1alpha2/executionv1alpha2grpc"
astriaPb "buf.build/gen/go/astria/astria/protocolbuffers/go/astria/execution/v1alpha2"
cmbytes "github.com/cometbft/cometbft/libs/bytes"
"github.com/cometbft/cometbft/libs/log"
"github.com/rollkit/rollkit/block"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

type ExecutionServiceServerV1Alpha2 struct {
// NOTE - from the generated code: All implementations must embed
// UnimplementedExecutionServiceServer for forward compatibility
astriaGrpc.UnimplementedExecutionServiceServer

store store.Store
blockManager *block.SSManager
logger log.Logger
blockExecutionLock sync.Mutex
}

func NewExecutionServiceServerV1Alpha2(blockManager *block.SSManager, store store.Store, logger log.Logger) *ExecutionServiceServerV1Alpha2 {
return &ExecutionServiceServerV1Alpha2{
blockManager: blockManager,
store: store,
logger: logger,
}
}

// GetBlock retrieves a block by its identifier.
func (s *ExecutionServiceServerV1Alpha2) GetBlock(ctx context.Context, req *astriaPb.GetBlockRequest) (*astriaPb.Block, error) {
s.logger.Info("GetBlock called", "request", req)

res, err := s.getBlockFromIdentifier(ctx, req.GetIdentifier())
if err != nil {
s.logger.Error("failed finding block", err)
return nil, err
}

s.logger.Info("GetBlock completed", "request", req, "response", res)
return res, nil
}

// BatchGetBlocks will return an array of Blocks given an array of block identifiers.
func (s *ExecutionServiceServerV1Alpha2) BatchGetBlocks(ctx context.Context, req *astriaPb.BatchGetBlocksRequest) (*astriaPb.BatchGetBlocksResponse, error) {
s.logger.Info("BatchGetBlocks called", "request", req)
var blocks []*astriaPb.Block

ids := req.GetIdentifiers()
for _, id := range ids {
block, err := s.getBlockFromIdentifier(ctx, id)
if err != nil {
s.logger.Error("failed finding block with id", id, "error", err)
return nil, err
}

blocks = append(blocks, block)
}

res := &astriaPb.BatchGetBlocksResponse{
Blocks: blocks,
}

s.logger.Info("BatchGetBlocks completed", "request", req, "response", res)
return res, nil
}

// ExecuteBlock drives deterministic derivation of a rollup block from sequencer block data
func (s *ExecutionServiceServerV1Alpha2) ExecuteBlock(ctx context.Context, req *astriaPb.ExecuteBlockRequest) (*astriaPb.Block, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

s.logger.Info("ExecuteBlock called", "request", req)

s.blockExecutionLock.Lock()
defer s.blockExecutionLock.Unlock()

txs := make(types.Txs, len(req.Transactions))
for i := range txs {
txs[i] = types.Tx(req.Transactions[i])
}

block, err := s.blockManager.PublishBlock(ctx, types.Hash(req.PrevBlockHash), req.Timestamp.AsTime(), txs)
if err != nil {
s.logger.Error("failed to publish block to chain", "hash", block.Hash(), "prevHash", req.PrevBlockHash, "err", err)
return nil, status.Error(codes.Internal, "failed to insert block to chain")
}

res := &astriaPb.Block{
Number: uint32(block.Height()),
Hash: cmbytes.HexBytes(block.Hash()),
ParentBlockHash: cmbytes.HexBytes(block.LastHeader()),
Timestamp: timestamppb.New(block.Time()),
}

s.logger.Info("ExecuteBlock completed", "request", req, "response", res)
return res, nil
}

// GetCommitmentState fetches the current CommitmentState of the chain.
func (s *ExecutionServiceServerV1Alpha2) GetCommitmentState(ctx context.Context, req *astriaPb.GetCommitmentStateRequest) (*astriaPb.CommitmentState, error) {
s.logger.Info("GetCommitmentState called", "request", req)
return nil, nil
}

// UpdateCommitmentState replaces the whole CommitmentState with a new CommitmentState.
func (s *ExecutionServiceServerV1Alpha2) UpdateCommitmentState(ctx context.Context, req *astriaPb.UpdateCommitmentStateRequest) (*astriaPb.CommitmentState, error) {
s.logger.Info("UpdateCommitmentState called", "request", req)
return nil, nil
}

func (s *ExecutionServiceServerV1Alpha2) getBlockFromIdentifier(ctx context.Context, identifier *astriaPb.BlockIdentifier) (*astriaPb.Block, error) {
var block *types.Block
var err error

switch idType := identifier.Identifier.(type) {
case *astriaPb.BlockIdentifier_BlockNumber:
block, err = s.store.GetBlock(ctx, uint64(identifier.GetBlockNumber()))
if err != nil {
return nil, err
}
case *astriaPb.BlockIdentifier_BlockHash:
block, err = s.store.GetBlockByHash(ctx, types.Hash(identifier.GetBlockHash()))
if err != nil {
return nil, err
}
default:
return nil, status.Errorf(codes.InvalidArgument, "identifier has unexpected type %T", idType)
}

if block == nil {
return nil, status.Errorf(codes.NotFound, "Couldn't locate block with identifier %s", identifier.Identifier)
}

return &astriaPb.Block{
Number: uint32(block.Height()),
Hash: cmbytes.HexBytes(block.Hash()),
ParentBlockHash: cmbytes.HexBytes(block.LastHeader()),
Timestamp: timestamppb.New(block.Time()),
}, nil
}
111 changes: 111 additions & 0 deletions astria/mempool/reaper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package mempool

import (
"fmt"
"sync"

"github.com/rollkit/rollkit/mempool"
"github.com/rollkit/rollkit/types"

"github.com/rollkit/rollkit/astria/sequencer"
)

type MempoolReaper struct {
c *sequencer.Client
mempool *mempool.CListMempool

mu sync.Mutex
started bool
stopCh chan struct{}
}

func NewMempoolReaper(client *sequencer.Client, mempool *mempool.CListMempool) *MempoolReaper {
return &MempoolReaper{
c: client,
mempool: mempool,
started: false,
stopCh: make(chan struct{}),
}
}

// reap tx from the mempool as they occur
func (mr *MempoolReaper) Reap() {
for {
select {
case <-mr.stopCh:
return
default:
// wait for tx to be in mempool
ch := mr.mempool.TxsWaitChan()
<-ch

// get first tx in pool
tx0 := mr.mempool.TxsFront()
TxNext:
for {
select {
case <-mr.stopCh:
return
default:
mempoolTx := tx0.Value.(*mempoolTx)

// submit to shared sequencer
res, err := mr.c.BroadcastTx(mempoolTx.tx)
if err != nil {
panic(fmt.Sprintf("error sending message: %s\n", err))
}
println(res.Log)

// wait for next tx
tx0 = tx0.NextWait()

// tx was last element and was removed (pool is empty?)
if tx0 == nil {
break TxNext
}
}
}
}
}
}

func (mr *MempoolReaper) Start() error {
mr.mu.Lock()
defer mr.mu.Unlock()

// Ensure Reap is only run once
if mr.started {
return nil
}

go mr.Reap()
mr.started = true
return nil
}

func (mr *MempoolReaper) Stop() error {
mr.mu.Lock()
defer mr.mu.Unlock()

if !mr.started {
return nil
}

close(mr.stopCh)
mr.started = false
return nil
}

// copied from rollkit clist_mempool.go
//--------------------------------------------------------------------------------

// mempoolTx is a transaction that successfully ran
type mempoolTx struct {
height uint64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
tx types.Tx //

// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> bool
senders sync.Map
}
Loading

0 comments on commit b618beb

Please sign in to comment.