Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the new trigger and generic task node structure #35

Merged
merged 23 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ on:
workflow_dispatch:

jobs:
test:
environment: Test
name: Unit Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: recursive

- name: Run Go test
env:
RPC_URL: "${{ secrets.RPC_URL }}"
BUNDLER_RPC: "${{ secrets.BUNDLER_RPC }}"
run: |
# TODO Implement test for all packages
go test -v ./core/taskengine
go test -v ./pkg/timekeeper

publish-dev-build:
name: Publish dev build docker image to dockerhub
runs-on: 'ubuntu-latest'
Expand All @@ -31,9 +49,10 @@ jobs:
# This is a dedicated repository to house the development/preview build
images: |
avaprotocol/avs-dev
#type=raw,value=latest,enable=${{ github.ref == format('refs/heads/{0}', 'main') }}
tags: |
type=raw,value=latest,enable=${{ github.ref == format('refs/heads/{0}', 'main') }}
type=raw,value={{sha}},enable=${{ github.ref == format('refs/heads/{0}', 'main') }}
type=raw,value=latest
type=raw,value={{sha}}
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
Expand Down
34 changes: 0 additions & 34 deletions .github/workflows/test.yml

This file was deleted.

11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ protoc-gen:
--go-grpc_opt=paths=source_relative \
protobuf/avs.proto

build-docker:
docker compose build
## up: bring up docker compose stack
up:
docker compose up

## unstable-build: generate an unstable for internal test
unstable-build:
docker build --platform=linux/amd64 --build-arg RELEASE_TAG=unstable -t avaprotocol/ap-avs:unstable -f dockerfiles/operator.Dockerfile .
docker push avaprotocol/ap-avs:unstable
docker build --platform=linux/amd64 --build-arg RELEASE_TAG=unstable -t avaprotocol/avs-dev:unstable -f dockerfiles/operator.Dockerfile .
docker push avaprotocol/avs-dev:unstable


## dev-build: build a dev version for local development
dev-build:
Expand All @@ -108,3 +107,7 @@ dev-agg:
## dev-agg: run operator locally with dev build
dev-op:
./out/ap operator --config=config/operator.yaml

## dev-clean: cleanup storage data
dev-clean:
rm -rf /tmp/ap-avs /tmp/ap.sock
56 changes: 50 additions & 6 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
sdktypes "github.com/Layr-Labs/eigensdk-go/types"
"github.com/allegro/bigcache/v3"

"github.com/AvaProtocol/ap-avs/storage"

Expand Down Expand Up @@ -91,6 +92,8 @@ type Aggregator struct {
worker *apqueue.Worker

status AggregatorStatus

cache *bigcache.BigCache
}

// NewAggregator creates a new Aggregator with the provided config.
Expand Down Expand Up @@ -130,6 +133,47 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {
//avsRegistryService := avsregistry.NewAvsRegistryServiceChainCaller(avsReader, operatorPubkeysService, c.Logger)
// blsAggregationService := blsagg.NewBlsAggregatorService(avsRegistryService, c.Logger)

cache, err := bigcache.New(context.Background(), bigcache.Config{
// number of shards (must be a power of 2)
Shards: 1024,

// time after which entry can be evicted
LifeWindow: 120 * time.Minute,

// Interval between removing expired entries (clean up).
// If set to <= 0 then no action is performed.
// Setting to < 1 second is counterproductive — bigcache has a one second resolution.
CleanWindow: 5 * time.Minute,

// rps * lifeWindow, used only in initial memory allocation
MaxEntriesInWindow: 1000 * 10 * 60,

// max entry size in bytes, used only in initial memory allocation
MaxEntrySize: 500,

// prints information about additional memory allocation
Verbose: true,

// cache will not allocate more memory than this limit, value in MB
// if value is reached then the oldest entries can be overridden for the new ones
// 0 value means no size limit
HardMaxCacheSize: 8192,

// callback fired when the oldest entry is removed because of its expiration time or no space left
// for the new entry, or because delete was called. A bitmask representing the reason will be returned.
// Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
OnRemove: nil,

// OnRemoveWithReason is a callback fired when the oldest entry is removed because of its expiration time or no space left
// for the new entry, or because delete was called. A constant representing the reason will be passed through.
// Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
// Ignored if OnRemove is specified.
OnRemoveWithReason: nil,
})
if err != nil {
panic("cannot initialize cache storage")
}

return &Aggregator{
logger: c.Logger,
avsWriter: avsWriter,
Expand All @@ -143,15 +187,15 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {

operatorPool: &OperatorPool{},
status: initStatus,

cache: cache,
}, nil
}

// Open and setup our database
func (agg *Aggregator) initDB(ctx context.Context) error {
var err error
agg.db, err = storage.New(&storage.Config{
Path: agg.config.DbPath,
})
agg.db, err = storage.NewWithPath(agg.config.DbPath)

if err != nil {
panic(err)
Expand Down Expand Up @@ -205,12 +249,12 @@ func (agg *Aggregator) Start(ctx context.Context) error {
agg.logger.Info("Starting repl")
go agg.startRepl()

agg.logger.Infof("Starting http server")
go agg.startHttpServer(ctx)

agg.logger.Infof("Starting rpc server")
go agg.startRpcServer(ctx)

agg.logger.Infof("Starting http server")
go agg.startHttpServer(ctx)

// Setup wait signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand Down
30 changes: 23 additions & 7 deletions aggregator/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ package aggregator
import (
"context"
"fmt"
"math/big"
"strings"
"time"

"github.com/AvaProtocol/ap-avs/core/auth"
"github.com/AvaProtocol/ap-avs/core/chainio/aa"
"github.com/AvaProtocol/ap-avs/model"
avsproto "github.com/AvaProtocol/ap-avs/protobuf"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang-jwt/jwt/v5"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

const (
Expand All @@ -29,6 +29,11 @@ const (
func (r *RpcServer) GetKey(ctx context.Context, payload *avsproto.GetKeyReq) (*avsproto.KeyResp, error) {
submitAddress := common.HexToAddress(payload.Owner)

r.config.Logger.Info("process getkey",
"owner", payload.Owner,
"expired", payload.ExpiredAt,
)

if strings.Contains(payload.Signature, ".") {
authenticated, err := auth.VerifyJwtKeyForUser(r.config.JwtSecret, payload.Signature, submitAddress)
if err != nil {
Expand All @@ -48,7 +53,10 @@ func (r *RpcServer) GetKey(ctx context.Context, payload *avsproto.GetKeyReq) (*a

signature, err := hexutil.Decode(payload.Signature)
if err != nil {
return nil, err
return nil, status.Errorf(codes.InvalidArgument, auth.InvalidSignatureFormat)
}
if len(signature) < crypto.RecoveryIDOffset || len(signature) < crypto.RecoveryIDOffset {
return nil, status.Errorf(codes.InvalidArgument, auth.InvalidSignatureFormat)
}
// https://stackoverflow.com/questions/49085737/geth-ecrecover-invalid-signature-recovery-id
if signature[crypto.RecoveryIDOffset] == 27 || signature[crypto.RecoveryIDOffset] == 28 {
Expand Down Expand Up @@ -131,11 +139,19 @@ func (r *RpcServer) verifyAuth(ctx context.Context) (*model.User, error) {
Address: common.HexToAddress(claims["sub"].(string)),
}

smartAccountAddress, err := aa.GetSenderAddress(r.ethrpc, user.Address, big.NewInt(0))
if err != nil {
return nil, fmt.Errorf("Rpc error")
// caching to reduce hitting eth rpc node
cachekey := "default-wallet" + user.Address.Hex()
if value, err := r.cache.Get(cachekey); err == nil {
defaultSmartWallet := common.BytesToAddress(value)
user.SmartAccountAddress = &defaultSmartWallet
} else {
if err := user.LoadDefaultSmartWallet(r.smartWalletRpc); err != nil {
return nil, fmt.Errorf("Rpc error")
}

// We don't care if its error out in caching
r.cache.Set(cachekey, user.SmartAccountAddress.Bytes())
}
user.SmartAccountAddress = smartAccountAddress

return &user, nil
}
Expand Down
10 changes: 10 additions & 0 deletions aggregator/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ func (agg *Aggregator) stopRepl() {
}

}

// Repl allow an operator to look into node storage directly with a REPL interface.
// It doesn't listen via TCP socket but directly unix socket on file system.
func (agg *Aggregator) startRepl() {
var err error

if _, err := os.Stat(agg.config.SocketPath); err == nil {
// File exists, most likely result of a previous crash without cleaning, attempt to delete
os.Remove(agg.config.SocketPath)
}
repListener, err = net.Listen("unix", agg.config.SocketPath)

if err != nil {
return
}
Expand All @@ -48,6 +57,7 @@ func handleConnection(agg *Aggregator, conn net.Conn) {

reader := bufio.NewReader(conn)
fmt.Fprintln(conn, "AP CLI REPL")
fmt.Fprintln(conn, "Use `list <prefix>*` to list key, `get <key>` to inspect content ")
fmt.Fprintln(conn, "-------------------------")

for {
Expand Down
Loading