Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(prover): tie capacity to a specific block id #413

Merged
merged 10 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/flags/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ var (
Value: uint64(time.Hour.Seconds()),
Category: proverCategory,
}
TempCapacityExpiresAt = &cli.DurationFlag{
Name: "prover.tempCapacityExpiresAt",
Usage: "time in seconds temporary capacity lives for, format: 36s",
Value: 36 * time.Second,
Category: proverCategory,
}
)

// All prover flags.
Expand Down Expand Up @@ -163,4 +169,5 @@ var ProverFlags = MergeFlags(CommonFlags, []cli.Flag{
ProverCapacity,
MaxExpiry,
TaikoTokenAddress,
TempCapacityExpiresAt,
})
83 changes: 63 additions & 20 deletions prover/capacity_manager/capacity_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,105 @@ package capacity_manager

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/log"
)

// CapacityManager manages the prover capacity concurrent-safely.
type CapacityManager struct {
capacity uint64
maxCapacity uint64
mutex sync.RWMutex
capacity map[uint64]bool
tempCapacity []time.Time
tempCapacityExpiresAt time.Duration
maxCapacity uint64
mutex sync.RWMutex
}

// New creates a new CapacityManager instance.
func New(capacity uint64) *CapacityManager {
return &CapacityManager{capacity: capacity, maxCapacity: capacity}
func New(capacity uint64, tempCapacityExpiresAt time.Duration) *CapacityManager {
return &CapacityManager{
capacity: make(map[uint64]bool),
maxCapacity: capacity,
tempCapacity: make([]time.Time, 0),
tempCapacityExpiresAt: tempCapacityExpiresAt,
}
}

// ReadCapacity reads the current capacity.
func (m *CapacityManager) ReadCapacity() uint64 {
m.mutex.RLock()
defer m.mutex.RUnlock()

log.Info("Reading capacity", "capacity", m.capacity)
log.Info("Reading capacity", "capacity", len(m.capacity))

return m.capacity
return m.maxCapacity - uint64((len(m.capacity)))
}

// ReleaseOneCapacity releases one capacity.
func (m *CapacityManager) ReleaseOneCapacity() (uint64, bool) {
func (m *CapacityManager) ReleaseOneCapacity(blockID uint64) (uint64, bool) {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.capacity+1 > m.maxCapacity {
log.Info("Can not release capacity", "currentCapacity", m.capacity, "maxCapacity", m.maxCapacity)
return m.capacity, false
if _, ok := m.capacity[blockID]; !ok {
log.Info("Can not release capacity",
"blockID", blockID,
"currentCapacity", m.maxCapacity-uint64(len(m.capacity)),
"maxCapacity", m.maxCapacity)
return uint64(len(m.capacity)), false
}

m.capacity += 1
delete(m.capacity, blockID)

log.Info("Released capacity", "capacityAfterRelease", m.capacity)
log.Info("Released capacity", "blockID", blockID, "capacityAfterRelease", len(m.capacity))

return m.capacity, true
return m.maxCapacity - uint64(len(m.capacity)), true
}

// TakeOneCapacity takes one capacity.
func (m *CapacityManager) TakeOneCapacity() (uint64, bool) {
func (m *CapacityManager) TakeOneCapacity(blockID uint64) (uint64, bool) {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.capacity == 0 {
log.Info("Could not take one capacity", "capacity", m.capacity)
if len(m.capacity) == int(m.maxCapacity) {
log.Info("Could not take one capacity",
"blockID", blockID,
"currentCapacity", m.maxCapacity-uint64(len(m.capacity)))
return 0, false
}

m.capacity -= 1
m.capacity[blockID] = true

log.Info("Took one capacity",
"blockID", blockID,
"capacityAfterTaking", m.maxCapacity-uint64(len(m.capacity)))

return m.maxCapacity - uint64((len(m.capacity))), true
}

func (m *CapacityManager) TakeOneTempCapacity() (uint64, bool) {
m.mutex.Lock()
defer m.mutex.Unlock()

// clear expired tempCapacities

log.Info("Took one capacity", "capacityAfterTaking", m.capacity)
m.clearExpiredTempCapacities()

return m.capacity, true
if len(m.capacity)+len(m.tempCapacity) >= int(m.maxCapacity) {
log.Info("Could not take one temp capacity",
"capacity", m.maxCapacity-uint64(len(m.capacity)),
"tempCapacity", len(m.tempCapacity))
return 0, false
}

m.tempCapacity = append(m.tempCapacity, time.Now().UTC())

return m.maxCapacity - uint64(len(m.capacity)) - uint64((len(m.tempCapacity))), true
}

func (m *CapacityManager) clearExpiredTempCapacities() {
for i, c := range m.tempCapacity {
if time.Now().UTC().Sub(c) > m.tempCapacityExpiresAt {
m.tempCapacity = append(m.tempCapacity[:i], m.tempCapacity[i+1:]...)
}
}
}
67 changes: 63 additions & 4 deletions prover/capacity_manager/capacity_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package capacity_manager

import (
"time"

"github.com/stretchr/testify/suite"
)

var (
testCapacity uint64 = 1024
testCapacity uint64 = 5
tempCapacityExpiresAt time.Duration = 5 * time.Second
)

type CapacityManagerTestSuite struct {
Expand All @@ -14,24 +17,80 @@ type CapacityManagerTestSuite struct {
}

func (s *CapacityManagerTestSuite) SetupTest() {
s.m = New(testCapacity)
s.m = New(testCapacity, tempCapacityExpiresAt)
}

func (s *CapacityManagerTestSuite) TestReadCapacity() {
s.Equal(testCapacity, s.m.ReadCapacity())
}

func (s *CapacityManagerTestSuite) TestReleaseOneCapacity() {
capacity, released := s.m.ReleaseOneCapacity()
var blockID uint64 = 1
_, released := s.m.ReleaseOneCapacity(blockID)
s.Equal(false, released)

_, ok := s.m.TakeOneCapacity(blockID)

s.Equal(true, ok)

capacity, released := s.m.ReleaseOneCapacity(blockID)
s.Equal(true, released)

s.Equal(testCapacity+1, capacity)
s.Equal(testCapacity+1, s.m.ReadCapacity())
}

func (s *CapacityManagerTestSuite) TestTakeOneCapacity() {
capacity, ok := s.m.TakeOneCapacity()
var blockID uint64 = 1

capacity, ok := s.m.TakeOneCapacity(blockID)
s.True(ok)
s.Equal(testCapacity-1, capacity)
s.Equal(testCapacity-1, s.m.ReadCapacity())
}

func (s *CapacityManagerTestSuite) TestTakeOneTempCapacity() {
// take 3 actual capacity
var sl []uint64 = []uint64{1, 2, 3}

for _, c := range sl {
_, ok := s.m.TakeOneCapacity(c)
s.True(ok)
}

// should be 2 temp capacity left to take
capacity, ok := s.m.TakeOneTempCapacity()
s.True(ok)
s.Equal(int(testCapacity)-len(sl)-1, capacity)

capacity, ok = s.m.TakeOneTempCapacity()
s.True(ok)
s.Equal(int(testCapacity)-len(sl)-2, capacity)

// now it should fail, 3 capacity + 2 temp capacity
capacity, ok = s.m.TakeOneTempCapacity()
s.False(ok)
s.Equal(int(testCapacity)-len(sl)-2, capacity)

// wait until they expire
time.Sleep(s.m.tempCapacityExpiresAt)

// both should be expired, we should be able to take two more
capacity, ok = s.m.TakeOneTempCapacity()
s.True(ok)
s.Equal(int(testCapacity)-len(sl)-1, capacity)

capacity, ok = s.m.TakeOneTempCapacity()
s.True(ok)
s.Equal(int(testCapacity)-len(sl)-2, capacity)

// now remove one actual capacity, simulate "block done being proven"
capacity, ok = s.m.ReleaseOneCapacity(sl[0])
s.True(ok)
s.Equal(int(testCapacity)-len(sl)-1, capacity)

// and we should be able to take another temp capacity
capacity, ok = s.m.TakeOneTempCapacity()
s.True(ok)
s.Equal(int(testCapacity)-len(sl)-2, capacity)
}
2 changes: 2 additions & 0 deletions prover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
ProveBlockMaxTxGasTipCap *big.Int
HTTPServerPort uint64
Capacity uint64
TempCapacityExpiresAt time.Duration
MinProofFee *big.Int
MaxExpiry time.Duration
}
Expand Down Expand Up @@ -172,6 +173,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
WaitReceiptTimeout: time.Duration(c.Uint64(flags.WaitReceiptTimeout.Name)) * time.Second,
ProveBlockGasLimit: proveBlockTxGasLimit,
Capacity: c.Uint64(flags.ProverCapacity.Name),
TempCapacityExpiresAt: c.Duration(flags.TempCapacityExpiresAt.Name),
ProveBlockTxReplacementMultiplier: proveBlockTxReplacementMultiplier,
ProveBlockMaxTxGasTipCap: proveBlockMaxTxGasTipCap,
HTTPServerPort: c.Uint64(flags.ProverHTTPServerPort.Name),
Expand Down
3 changes: 3 additions & 0 deletions prover/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (s *ProverTestSuite) TestNewConfigFromCliContext_OracleProver() {
s.Equal(minProofFee, c.MinProofFee.String())
s.Equal(uint64(3), c.ProveBlockTxReplacementMultiplier)
s.Equal(uint64(256), c.ProveBlockMaxTxGasTipCap.Uint64())
s.Equal(15*time.Second, c.TempCapacityExpiresAt)
s.Nil(new(Prover).InitFromCli(context.Background(), ctx))
s.True(c.ProveUnassignedBlocks)

Expand Down Expand Up @@ -80,6 +81,7 @@ func (s *ProverTestSuite) TestNewConfigFromCliContext_OracleProver() {
"--" + flags.OracleProverPrivateKey.Name, os.Getenv("L1_PROVER_PRIVATE_KEY"),
"--" + flags.Graffiti.Name, "",
"--" + flags.CheckProofWindowExpiredInterval.Name, "30",
"--" + flags.TempCapacityExpiresAt.Name, "15s",
"--" + flags.ProveUnassignedBlocks.Name,
}))
}
Expand Down Expand Up @@ -201,6 +203,7 @@ func (s *ProverTestSuite) SetupApp() *cli.App {
&cli.Uint64Flag{Name: flags.ProverCapacity.Name},
&cli.Uint64Flag{Name: flags.MinProofFee.Name},
&cli.Uint64Flag{Name: flags.ProveBlockTxGasLimit.Name},
&cli.DurationFlag{Name: flags.TempCapacityExpiresAt.Name},
}
app.Action = func(ctx *cli.Context) error {
_, err := NewConfigFromCliContext(ctx)
Expand Down
15 changes: 9 additions & 6 deletions prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) {
p.currentBlocksBeingProvenMutex = new(sync.Mutex)
p.currentBlocksWaitingForProofWindow = make(map[uint64]uint64, 0)
p.currentBlocksWaitingForProofWindowMutex = new(sync.Mutex)
p.capacityManager = capacity.New(cfg.Capacity)
p.capacityManager = capacity.New(cfg.Capacity, cfg.TempCapacityExpiresAt)

// Clients
if p.rpc, err = rpc.NewClient(p.ctx, &rpc.ClientConfig{
Expand Down Expand Up @@ -604,7 +604,7 @@ func (p *Prover) onBlockProposed(
}

if !p.cfg.OracleProver {
if _, ok := p.capacityManager.TakeOneCapacity(); !ok {
if _, ok := p.capacityManager.TakeOneCapacity(event.BlockId.Uint64()); !ok {
return errNoCapacity
}
}
Expand Down Expand Up @@ -658,7 +658,7 @@ func (p *Prover) submitProofOp(ctx context.Context, proofWithHeader *proofProduc
defer func() {
<-p.submitProofConcurrencyGuard
if !p.cfg.OracleProver {
_, released := p.capacityManager.ReleaseOneCapacity()
_, released := p.capacityManager.ReleaseOneCapacity(proofWithHeader.Meta.Id)
if !released {
log.Error("unable to release capacity")
}
Expand Down Expand Up @@ -876,9 +876,12 @@ func (p *Prover) cancelProof(ctx context.Context, blockID uint64) {
cancel()
delete(p.currentBlocksBeingProven, blockID)
if !p.cfg.OracleProver {
capacity, released := p.capacityManager.ReleaseOneCapacity()
capacity, released := p.capacityManager.ReleaseOneCapacity(blockID)
if !released {
log.Error("unable to release capacity while cancelling proof", "capacity", capacity)
log.Error("unable to release capacity while cancelling proof",
"capacity", capacity,
"blockID", blockID,
)
}
}
}
Expand Down Expand Up @@ -1011,7 +1014,7 @@ func (p *Prover) requestProofForBlockId(blockId *big.Int, l1Height *big.Int) err

// make sure to takea capacity before requesting proof
if !p.cfg.OracleProver {
if _, ok := p.capacityManager.TakeOneCapacity(); !ok {
if _, ok := p.capacityManager.TakeOneCapacity(event.BlockId.Uint64()); !ok {
return errNoCapacity
}
}
Expand Down
4 changes: 2 additions & 2 deletions prover/server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (srv *ProverServer) CreateAssignment(c echo.Context) error {
return echo.NewHTTPError(http.StatusUnprocessableEntity, "expiry too long")
}

if srv.capacityManager.ReadCapacity() == 0 {
log.Warn("Prover does not have capacity", "proposerIP", c.RealIP())
if _, ok := srv.capacityManager.TakeOneTempCapacity(); !ok {
log.Warn("Prover unable to take a temporary capacity", "proposerIP", c.RealIP())
return echo.NewHTTPError(http.StatusUnprocessableEntity, "prover does not have capacity")
}

Expand Down
2 changes: 1 addition & 1 deletion prover/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *ProverServerTestSuite) SetupTest() {
proverPrivateKey: l1ProverPrivKey,
minProofFee: common.Big1,
maxExpiry: 24 * time.Hour,
capacityManager: capacity.New(1024),
capacityManager: capacity.New(1024, 100*time.Second),
taikoL1Address: common.HexToAddress(os.Getenv("TAIKO_L1_ADDRESS")),
rpc: rpcClient,
bond: common.Big0,
Expand Down
3 changes: 2 additions & 1 deletion testutils/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/big"
"net/url"
"os"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (s *ClientTestSuite) SetupTest() {
s.Nil(err)

s.ProverEndpoints = []*url.URL{LocalRandomProverEndpoint()}
s.proverServer = NewTestProverServer(s, l1ProverPrivKey, capacity.New(1024), s.ProverEndpoints[0])
s.proverServer = NewTestProverServer(s, l1ProverPrivKey, capacity.New(1024, 100*time.Second), s.ProverEndpoints[0])

tokenBalance, err := rpcCli.TaikoL1.GetTaikoTokenBalance(nil, crypto.PubkeyToAddress(l1ProverPrivKey.PublicKey))
s.Nil(err)
Expand Down