From 2106597cfa5e0c1c6643624b225eedfd8042761c Mon Sep 17 00:00:00 2001 From: allnil Date: Thu, 31 Oct 2024 13:41:18 +0000 Subject: [PATCH 1/2] feat: initial work, align wvm client interface, add cli flags and envs --- flags/flags.go | 1 + server/config.go | 2 + server/load_store.go | 27 +++++- store/precomputed_key/wvm/cli.go | 61 +++++++++++++ store/{ => precomputed_key/wvm}/wvm.go | 121 ++++++++++++++++--------- store/store.go | 12 +-- 6 files changed, 174 insertions(+), 50 deletions(-) create mode 100644 store/precomputed_key/wvm/cli.go rename store/{ => precomputed_key/wvm}/wvm.go (77%) diff --git a/flags/flags.go b/flags/flags.go index 9ee798bb..879ad185 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -21,6 +21,7 @@ const ( S3Category = "S3 Cache/Fallback" VerifierCategory = "KZG and Cert Verifier" VerifierDeprecatedCategory = "DEPRECATED VERIFIER FLAGS -- THESE WILL BE REMOVED IN V2.0.0" + WVMCategory = "WVM Storage option" ) const ( diff --git a/server/config.go b/server/config.go index 3dc9d55c..22eb4120 100644 --- a/server/config.go +++ b/server/config.go @@ -11,6 +11,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/wvm" "github.com/Layr-Labs/eigenda-proxy/utils" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" @@ -33,6 +34,7 @@ type Config struct { // secondary storage RedisConfig redis.Config S3Config s3.Config + WVMConfig wvm.Config } // ReadConfig ... parses the Config from the provided flags or environment variables. diff --git a/server/load_store.go b/server/load_store.go index be298c87..2c8f2573 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -10,6 +10,7 @@ import ( "github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis" "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3" + "github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/wvm" "github.com/Layr-Labs/eigenda-proxy/verify" "github.com/Layr-Labs/eigenda/api/clients" "github.com/ethereum/go-ethereum/log" @@ -18,7 +19,9 @@ import ( // TODO - create structured abstraction for dependency injection vs. overloading stateless functions // populateTargets ... creates a list of storage backends based on the provided target strings -func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redis.Store) []store.PrecomputedKeyStore { +func populateTargets(targets []string, s3 store.PrecomputedKeyStore, + redis *redis.Store, wvm store.PrecomputedKeyStore, +) []store.PrecomputedKeyStore { stores := make([]store.PrecomputedKeyStore, len(targets)) for i, f := range targets { @@ -37,6 +40,12 @@ func populateTargets(targets []string, s3 store.PrecomputedKeyStore, redis *redi } stores[i] = s3 + case store.WVMBackendType: + if wvm == nil { + panic(fmt.Sprintf("S3 backend is not configured but specified in targets: %s", f)) + } + stores[i] = wvm + case store.EigenDABackendType, store.MemoryBackendType: panic(fmt.Sprintf("Invalid target for fallback: %s", f)) @@ -56,6 +65,8 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr // create S3 backend store (if enabled) var err error var s3Store store.PrecomputedKeyStore + // create WVM backend store (if enabled) + var wvmStore store.PrecomputedKeyStore var redisStore *redis.Store if cfg.EigenDAConfig.S3Config.Bucket != "" && cfg.EigenDAConfig.S3Config.Endpoint != "" { @@ -66,6 +77,16 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } } + if cfg.EigenDAConfig.WVMConfig.Enabled { + if cfg.EigenDAConfig.WVMConfig.Endpoint != "" { + log.Info("Using WVM backend") + wvmStore, err = wvm.NewStore(&cfg.EigenDAConfig.WVMConfig, log) + if err != nil { + return nil, fmt.Errorf("failed to create WVM store: %w", err) + } + } + } + if cfg.EigenDAConfig.RedisConfig.Endpoint != "" { log.Info("Using Redis backend") // create Redis backend store @@ -120,8 +141,8 @@ func LoadStoreManager(ctx context.Context, cfg CLIConfig, log log.Logger, m metr } // create secondary storage router - fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore) - caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore) + fallbacks := populateTargets(cfg.EigenDAConfig.FallbackTargets, s3Store, redisStore, wvmStore) + caches := populateTargets(cfg.EigenDAConfig.CacheTargets, s3Store, redisStore, wvmStore) secondary := store.NewSecondaryManager(log, m, caches, fallbacks) if secondary.Enabled() { // only spin-up go routines if secondary storage is enabled diff --git a/store/precomputed_key/wvm/cli.go b/store/precomputed_key/wvm/cli.go new file mode 100644 index 00000000..38e0b19e --- /dev/null +++ b/store/precomputed_key/wvm/cli.go @@ -0,0 +1,61 @@ +package wvm + +import ( + "github.com/urfave/cli/v2" +) + +var ( + EndpointFlagName = withFlagPrefix("endpoint") + ChainIDFlagName = withFlagPrefix("chain_id") + ArchiverAddressFlagName = withFlagPrefix("archiver_address") + TimeoutFlagName = withFlagPrefix("timeout") +) + +func withFlagPrefix(s string) string { + return "wvm." + s +} + +func withEnvPrefix(envPrefix, s string) []string { + return []string{envPrefix + "_WVM_" + s} +} + +// CLIFlags ... used for WVM backend configuration +// category is used to group the flags in the help output (see https://cli.urfave.org/v2/examples/flags/#grouping) +func CLIFlags(envPrefix, category string) []cli.Flag { + return []cli.Flag{ + &cli.StringFlag{ + Name: EndpointFlagName, + Usage: "endpoint for WVM chain rpc", + EnvVars: withEnvPrefix(envPrefix, "ENDPOINT"), + Category: category, + }, + &cli.StringFlag{ + Name: ChainIDFlagName, + Usage: "chain ID of WVM chain", + EnvVars: withEnvPrefix(envPrefix, "CHAIN_ID"), + Category: category, + }, + &cli.StringFlag{ + Name: ArchiverAddressFlagName, + Usage: "user's wvm chain address used for archiving data", + EnvVars: withEnvPrefix(envPrefix, "ARCHIVER_ADDRESS"), + Category: category, + }, + // &cli.DurationFlag{ + // Name: TimeoutFlagName, + // Usage: "timeout for S3 storage operations (e.g. get, put)", + // Value: 5 * time.Second, + // EnvVars: withEnvPrefix(envPrefix, "TIMEOUT"), + // Category: category, + // }, + } +} + +func ReadConfig(ctx *cli.Context) Config { + return Config{ + Endpoint: ctx.String(EndpointFlagName), + ChainID: ctx.Uint64(ChainIDFlagName), + ArchiverAddress: ctx.String(ArchiverAddressFlagName), + // Timeout: ctx.Duration(TimeoutFlagName), + } +} diff --git a/store/wvm.go b/store/precomputed_key/wvm/wvm.go similarity index 77% rename from store/wvm.go rename to store/precomputed_key/wvm/wvm.go index 9408f956..c40ea6cc 100644 --- a/store/wvm.go +++ b/store/precomputed_key/wvm/wvm.go @@ -1,4 +1,4 @@ -package store +package wvm import ( "bytes" @@ -16,7 +16,7 @@ import ( "strings" "time" - "github.com/Layr-Labs/eigenda-proxy/verify" + "github.com/Layr-Labs/eigenda-proxy/store" "github.com/andybalholm/brotli" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -29,37 +29,63 @@ import ( cache "github.com/patrickmn/go-cache" ) -type WVMClient struct { - client *ethclient.Client - log log.Logger - wvmCache *cache.Cache +// Config...WVM client configuration +type Config struct { + Enabled bool + // RPC endpoint of WVM chain + Endpoint string + // WVM chain id + ChainID uint64 + // User's archiver address + // Should be populated with tWVM tokens used in WVM testnet + // Please check out WVM Faucet to claim $tWVM. + // Please read README to gen an additional information + ArchiverAddress string +} + +// Store...wraps wvm client, ethclient and concurrent internal cache +type Store struct { + client *ethclient.Client + log log.Logger + txCache *cache.Cache + cfg *Config } const ( wvmRPCURL = "https://testnet-rpc.wvm.dev" // wvm alphanet rpc url wvmChainID = 9496 // wvm alphanet chain id - wvmArchiverAddress = "0xF8a5a479f04d1565b925A67D088b8fC3f8f0b7eF" // we use it as a "from" address - wvmArchivePoolAddress = "0x606dc1BE30A5966FcF3C10D907d1B76A7B1Bbbd9" // we use it as a "to" address + ArchivePoolAddress = "0x0000000000000000000000000000000000000000" // the data settling address, a unified standard across WeaveVM archiving services ) -func NewWVMClient(log log.Logger) *WVMClient { +func NewStore(cfg *Config, log log.Logger) (*Store, error) { client, err := ethclient.Dial(wvmRPCURL) if err != nil { - panic(fmt.Sprintf("failed to connect to the WVM client: %v", err)) + return nil, fmt.Errorf("failed to connect to the WVM client: %w", err) } privKey := os.Getenv("WVM_PRIV_KEY") if privKey == "" { - panic("wvm archiver signer key is empty") + return nil, fmt.Errorf("wvm archiver private key is empty") } - return &WVMClient{client: client, log: log, wvmCache: cache.New(24*15*time.Hour, 24*time.Hour)} + return &Store{client: client, cfg: cfg, log: log, txCache: cache.New(24*15*time.Hour, 24*time.Hour)}, nil +} + +func (wvm *Store) BackendType() store.BackendType { + return store.S3BackendType } -func (wvm *WVMClient) Store(ctx context.Context, cert *verify.Certificate, eigenBlobData []byte) error { - wvm.log.Info("WVM: save BLOB in wvm chain", "batch id", cert.BlobVerificationProof.BatchId, "blob index", cert.BlobVerificationProof.BlobIndex) +func (wvm *Store) Verify(_ context.Context, key []byte, value []byte) error { + h := crypto.Keccak256Hash(value) + if !bytes.Equal(h[:], key) { + return fmt.Errorf("key does not match value, expected: %s got: %s", hex.EncodeToString(key), h.Hex()) + } + + return nil +} - wvmData, err := wvm.WvmEncode(eigenBlobData) +func (wvm *Store) Put(ctx context.Context, key []byte, value []byte) error { + wvmData, err := wvm.WvmEncode(value) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } @@ -69,12 +95,12 @@ func (wvm *WVMClient) Store(ctx context.Context, cert *verify.Certificate, eigen return fmt.Errorf("failed to store data in wvm: %w", err) } - gas, err := wvm.estimateGas(ctx, wvmArchiverAddress, wvmArchivePoolAddress, wvmData) + gas, err := wvm.estimateGas(ctx, wvm.cfg.ArchiverAddress, ArchivePoolAddress, wvmData) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } - wvmRawTx, err := wvm.createRawTransaction(ctx, wvmArchivePoolAddress, string(wvmData), gas) + wvmRawTx, err := wvm.createRawTransaction(ctx, ArchivePoolAddress, string(wvmData), gas) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } @@ -86,15 +112,16 @@ func (wvm *WVMClient) Store(ctx context.Context, cert *verify.Certificate, eigen wvm.log.Info("WVM:TX Hash:", "tx hash", wvmTxHash) - wvm.wvmCache.Set(commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex), wvmTxHash, cache.DefaultExpiration) + wvm.txCache.Set(string(key), wvmTxHash, cache.DefaultExpiration) + wvm.log.Info("WVM:saved wvm Tx Hash by batch_id:blob_index", "tx hash", wvmTxHash, - "key", commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) + "key", key) return nil } -func (wvm *WVMClient) WvmEncode(eigenBlob []byte) ([]byte, error) { +func (wvm *Store) WvmEncode(eigenBlob []byte) ([]byte, error) { eigenBlobLen := len(eigenBlob) wvm.log.Info("WVM:eigen blob received", "eigen blob size", eigenBlobLen) @@ -121,7 +148,7 @@ func (wvm *WVMClient) WvmEncode(eigenBlob []byte) ([]byte, error) { return brotliOut.Bytes(), nil } -func (wvm *WVMClient) WvmDecode(calldataBlob string) ([]byte, error) { +func (wvm *Store) WvmDecode(calldataBlob string) ([]byte, error) { // trim calldata compressedBlob, err := hex.DecodeString(calldataBlob[2:]) if err != nil { @@ -142,21 +169,23 @@ func (wvm *WVMClient) WvmDecode(calldataBlob string) ([]byte, error) { return decompressedEncoded, nil } -// GetWvmTxHashByCommitment uses commitment to get wvm tx hash from the internal map(temprorary hack) -// and returns it to the caller -func (wvm *WVMClient) GetWvmTxHashByCommitment(cert *verify.Certificate) (string, error) { - wvmTxHash, found := wvm.wvmCache.Get(commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) - if !found { - wvm.log.Info("wvm tx hash using provided commitment NOT FOUND", "provided key", commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) - return "", fmt.Errorf("wvm tx hash for provided commitment not found") +func (wvm *Store) Get(ctx context.Context, key []byte) ([]byte, error) { + wvmTxHash, err := wvm.GetWvmTxHashByCommitment(key) + if err != nil { + return nil, err } - wvm.log.Info("wvm tx hash using provided commitment FOUND", "provided key", commitmentKey(cert.BlobVerificationProof.BatchId, cert.BlobVerificationProof.BlobIndex)) + wvm.log.Info("found wvm tx hash using provided commitment", "provided key", string(key)) - return wvmTxHash.(string), nil + data, err := wvm.get(ctx, wvmTxHash) + if err != nil { + return nil, fmt.Errorf("failed to get eigenda blob from wvm: %w", err) + } + + return data, nil } -func (wvm *WVMClient) GetBlobFromWvm(ctx context.Context, wvmTxHash string) ([]byte, error) { +func (wvm *Store) get(ctx context.Context, wvmTxHash string) ([]byte, error) { r, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://wvm-data-retriever.shuttleapp.rs/calldata/%s", wvmTxHash), nil) @@ -164,6 +193,8 @@ func (wvm *WVMClient) GetBlobFromWvm(ctx context.Context, wvmTxHash string) ([]b return nil, fmt.Errorf("failed to call wvm-data-retriever") } + defer r.Body.Close() + type WvmResponse struct { ArweaveBlockHash string `json:"arweave_block_hash"` Calldata string `json:"calldata"` @@ -171,8 +202,6 @@ func (wvm *WVMClient) GetBlobFromWvm(ctx context.Context, wvmTxHash string) ([]b WvmBlockHash string `json:"wvm_block_hash"` } - defer r.Body.Close() - body, err := io.ReadAll(r.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) @@ -198,10 +227,24 @@ func (wvm *WVMClient) GetBlobFromWvm(ctx context.Context, wvmTxHash string) ([]b return wvmDecodedBlob, nil } +// GetWvmTxHashByCommitment uses commitment to get wvm tx hash from the internal map(temprorary hack) +// and returns it to the caller +func (wvm *Store) GetWvmTxHashByCommitment(key []byte) (string, error) { + wvmTxHash, found := wvm.txCache.Get(string(key)) + if !found { + wvm.log.Info("wvm tx hash using provided commitment NOT FOUND", "provided key", string(key)) + return "", fmt.Errorf("wvm tx hash for provided commitment not found") + } + + wvm.log.Info("wvm tx hash using provided commitment FOUND", "provided key", string(key)) + + return wvmTxHash.(string), nil +} + // HELPERS // getSuggestedGasPrice connects to an Ethereum node via RPC and retrieves the current suggested gas price. -func (wvm *WVMClient) getSuggestedGasPrice(ctx context.Context) error { +func (wvm *Store) getSuggestedGasPrice(ctx context.Context) error { gasPrice, err := wvm.client.SuggestGasPrice(ctx) if err != nil { return fmt.Errorf("failed to suggest gas price: %w", err) @@ -213,7 +256,7 @@ func (wvm *WVMClient) getSuggestedGasPrice(ctx context.Context) error { } // estimateGas tries estimates the suggested amount of gas that required to execute a given transaction. -func (wvm *WVMClient) estimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { +func (wvm *Store) estimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { var ( fromAddr = common.HexToAddress(from) toAddr = common.HexToAddress(to) @@ -251,7 +294,7 @@ func (wvm *WVMClient) estimateGas(ctx context.Context, from, to string, data []b } // createRawTransaction creates a raw EIP-1559 transaction and returns it as a hex string. -func (wvm *WVMClient) createRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { +func (wvm *Store) createRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { baseFee, err := wvm.client.SuggestGasPrice(ctx) if err != nil { return "", err @@ -357,7 +400,7 @@ type Transaction struct { TransactionCost string `json:"transactionCost,omitempty"` } -func (wvm *WVMClient) sendRawTransaction(ctx context.Context, rawTx string) (string, error) { +func (wvm *Store) sendRawTransaction(ctx context.Context, rawTx string) (string, error) { rawTxBytes, err := hex.DecodeString(rawTx) if err != nil { return "", err @@ -433,7 +476,3 @@ func convertHexField(tx *Transaction, field string) error { return nil } - -func commitmentKey(batchID, blobIndex uint32) string { - return fmt.Sprintf("%d:%d", batchID, blobIndex) -} diff --git a/store/store.go b/store/store.go index 6026837e..0e5ccd70 100644 --- a/store/store.go +++ b/store/store.go @@ -15,6 +15,8 @@ const ( RedisBackendType Unknown + + WVMBackendType ) var ( @@ -32,6 +34,8 @@ func (b BackendType) String() string { return "S3" case RedisBackendType: return "Redis" + case WVMBackendType: + return "WVM" case Unknown: fallthrough default: @@ -49,6 +53,8 @@ func StringToBackendType(s string) BackendType { return MemoryBackendType case "s3": return S3BackendType + case "wvm": + return WVMBackendType case "redis": return RedisBackendType case "unknown": @@ -79,12 +85,6 @@ type GeneratedKeyStore interface { Put(ctx context.Context, value []byte) (key []byte, err error) } -type WVMedKeyGeneratedStore interface { - GeneratedKeyStore - GetWvmTxHashByCommitment(ctx context.Context, key []byte) (string, error) - GetBlobFromWvm(ctx context.Context, key []byte) ([]byte, error) -} - type PrecomputedKeyStore interface { Store // Get retrieves the given key if it's present in the key-value data store. From eeb798591650e96f110f3453045601968ba3e571 Mon Sep 17 00:00:00 2001 From: allnil Date: Thu, 31 Oct 2024 14:18:52 +0000 Subject: [PATCH 2/2] chore: separate rpc client from service logic in wvm store type --- store/precomputed_key/wvm/cli.go | 2 +- store/precomputed_key/wvm/eth_rpc_client.go | 277 ++++++++++++++++ store/precomputed_key/wvm/wvm.go | 336 +++----------------- 3 files changed, 320 insertions(+), 295 deletions(-) create mode 100644 store/precomputed_key/wvm/eth_rpc_client.go diff --git a/store/precomputed_key/wvm/cli.go b/store/precomputed_key/wvm/cli.go index 38e0b19e..3b05bf28 100644 --- a/store/precomputed_key/wvm/cli.go +++ b/store/precomputed_key/wvm/cli.go @@ -54,7 +54,7 @@ func CLIFlags(envPrefix, category string) []cli.Flag { func ReadConfig(ctx *cli.Context) Config { return Config{ Endpoint: ctx.String(EndpointFlagName), - ChainID: ctx.Uint64(ChainIDFlagName), + ChainID: ctx.Int64(ChainIDFlagName), ArchiverAddress: ctx.String(ArchiverAddressFlagName), // Timeout: ctx.Duration(TimeoutFlagName), } diff --git a/store/precomputed_key/wvm/eth_rpc_client.go b/store/precomputed_key/wvm/eth_rpc_client.go new file mode 100644 index 00000000..5c33bb0c --- /dev/null +++ b/store/precomputed_key/wvm/eth_rpc_client.go @@ -0,0 +1,277 @@ +package wvm + +import ( + "bytes" + "context" + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "fmt" + "math/big" + "os" + "reflect" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +type EthRPCClient struct { + log log.Logger + client *ethclient.Client + chainID int64 +} + +func NewEthRPCClient(log log.Logger, cfg *Config) (*EthRPCClient, error) { + client, err := ethclient.Dial(cfg.Endpoint) + if err != nil { + return nil, fmt.Errorf("failed to connect to the WVM client: %w", err) + } + privKey := os.Getenv("WVM_PRIV_KEY") + if privKey == "" { + return nil, fmt.Errorf("wvm archiver private key is empty") + } + return &EthRPCClient{log, client, cfg.ChainID}, nil +} + +// getSuggestedGasPrice connects to an Ethereum node via RPC and retrieves the current suggested gas price. +func (rpc *EthRPCClient) getSuggestedGasPrice(ctx context.Context) error { + gasPrice, err := rpc.client.SuggestGasPrice(ctx) + if err != nil { + return fmt.Errorf("failed to suggest gas price: %w", err) + } + + rpc.log.Info("wvm chain suggested gas price", "gas", gasPrice.String()) + + return nil +} + +// estimateGas tries estimates the suggested amount of gas that required to execute a given transaction. +func (rpc *EthRPCClient) estimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { + var ( + fromAddr = common.HexToAddress(from) + toAddr = common.HexToAddress(to) + bytesData []byte + err error + ) + + var encoded string + if string(data) != "" { + if ok := strings.HasPrefix(string(data), "0x"); !ok { + encoded = hexutil.Encode(data) + } + + bytesData, err = hexutil.Decode(encoded) + if err != nil { + return 0, err + } + } + + msg := ethereum.CallMsg{ + From: fromAddr, + To: &toAddr, + Gas: 0x00, + Data: bytesData, + } + + gas, err := rpc.client.EstimateGas(ctx, msg) + if err != nil { + return 0, err + } + + rpc.log.Info("WVM estimated Gas Price", "price", gas) + + return gas, nil +} + +// createRawTransaction creates a raw EIP-1559 transaction and returns it as a hex string. +func (rpc *EthRPCClient) createRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { + baseFee, err := rpc.client.SuggestGasPrice(ctx) + if err != nil { + return "", err + } + + // WVM: maybe we don't need this, but e.g. + // increment := new(big.Int).Mul(big.NewInt(2), big.NewInt(params.GWei)) + // gasFeeCap := new(big.Int).Add(baseFee, increment) + // gasFeeCap.Add(gasFeeCap, priorityFee) + + gasFeeCap := baseFee + + // address shenanigans + // Decode the provided private key. + privKey := os.Getenv("WVM_PRIV_KEY") + if privKey == "" { + panic("wvm archiver signer key is empty") + } + pKeyBytes, err := hexutil.Decode("0x" + privKey) + if err != nil { + return "", err + } + // Convert the private key bytes to an ECDSA private key. + ecdsaPrivateKey, err := crypto.ToECDSA(pKeyBytes) + if err != nil { + return "", err + } + // Extract the public key from the ECDSA private key. + publicKey := ecdsaPrivateKey.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + if !ok { + return "", fmt.Errorf("error casting public key to ECDSA") + } + // Compute the Ethereum address of the signer from the public key. + fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) + // Retrieve the nonce for the signer's account, representing the transaction count. + nonce, err := rpc.client.PendingNonceAt(ctx, fromAddress) + if err != nil { + return "", err + } + + // Prepare data payload. + var hexData string + if strings.HasPrefix(data, "0x") { + hexData = data + } else { + hexData = hexutil.Encode([]byte(data)) + } + bytesData, err := hexutil.Decode(hexData) + if err != nil { + return "", err + } + + toAddr := common.HexToAddress(to) + txData := types.DynamicFeeTx{ + ChainID: big.NewInt(rpc.chainID), + Nonce: nonce, + GasTipCap: big.NewInt(0), + GasFeeCap: gasFeeCap, + Gas: gasLimit, + To: &toAddr, + Data: bytesData, + } + + tx := types.NewTx(&txData) + signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(big.NewInt(rpc.chainID)), ecdsaPrivateKey) + if err != nil { + return "", err + } + + // Encode the signed transaction into RLP (Recursive Length Prefix) format for transmission. + var buf bytes.Buffer + err = signedTx.EncodeRLP(&buf) + if err != nil { + return "", err + } + + // Return the RLP-encoded transaction as a hexadecimal string. + rawTxRLPHex := hex.EncodeToString(buf.Bytes()) + + return rawTxRLPHex, nil +} + +// Transaction represents the structure of the transaction JSON. +type Transaction struct { + Type string `json:"type"` + ChainID string `json:"chainId"` + Nonce string `json:"nonce"` + To string `json:"to"` + Gas string `json:"gas"` + GasPrice string `json:"gasPrice,omitempty"` + MaxPriorityFeePerGas string `json:"maxPriorityFeePerGas"` + MaxFeePerGas string `json:"maxFeePerGas"` + Value string `json:"value"` + Input string `json:"input"` + AccessList []string `json:"accessList"` + V string `json:"v"` + R string `json:"r"` + S string `json:"s"` + YParity string `json:"yParity"` + Hash string `json:"hash"` + TransactionTime string `json:"transactionTime,omitempty"` + TransactionCost string `json:"transactionCost,omitempty"` +} + +// helper function +func convertHexField(tx *Transaction, field string) error { + typeOfTx := reflect.TypeOf(*tx) + + // Get the value of the Transaction struct + txValue := reflect.ValueOf(tx).Elem() + + // Parse the hexadecimal string as an integer + hexStr := txValue.FieldByName(field).String() + + intValue, err := strconv.ParseUint(hexStr[2:], 16, 64) + if err != nil { + return err + } + + // Convert the integer to a decimal string + decimalStr := strconv.FormatUint(intValue, 10) + + // Check if the field exists + _, ok := typeOfTx.FieldByName(field) + if !ok { + return fmt.Errorf("field %s does not exist in Transaction struct", field) + } + + // Set the field value to the decimal string + txValue.FieldByName(field).SetString(decimalStr) + + return nil +} + +func (rpc *EthRPCClient) sendRawTransaction(ctx context.Context, rawTx string) (string, error) { + rawTxBytes, err := hex.DecodeString(rawTx) + if err != nil { + return "", err + } + + tx := new(types.Transaction) + + err = rlp.DecodeBytes(rawTxBytes, &tx) + if err != nil { + return "", err + } + + err = rpc.client.SendTransaction(ctx, tx) + if err != nil { + return "", err + } + + var txDetails Transaction + txBytes, err := tx.MarshalJSON() + if err != nil { + return "", err + } + if err := json.Unmarshal(txBytes, &txDetails); err != nil { + return "", err + } + + txDetails.TransactionTime = tx.Time().Format(time.RFC822) + txDetails.TransactionCost = tx.Cost().String() + + convertFields := []string{"Nonce", "MaxPriorityFeePerGas", "MaxFeePerGas", "Value", "Type", "Gas"} + for _, field := range convertFields { + if err := convertHexField(&txDetails, field); err != nil { + return "", err + } + } + + txJSON, err := json.MarshalIndent(txDetails, "", "\t") + if err != nil { + return "", err + } + + rpc.log.Info("WVM:raw TX Receipt:", "tx receipt", string(txJSON)) + + return txDetails.Hash, nil +} diff --git a/store/precomputed_key/wvm/wvm.go b/store/precomputed_key/wvm/wvm.go index c40ea6cc..557a6f0b 100644 --- a/store/precomputed_key/wvm/wvm.go +++ b/store/precomputed_key/wvm/wvm.go @@ -3,29 +3,17 @@ package wvm import ( "bytes" "context" - "crypto/ecdsa" "encoding/hex" "encoding/json" "fmt" "io" - "math/big" "net/http" - "os" - "reflect" - "strconv" - "strings" "time" "github.com/Layr-Labs/eigenda-proxy/store" "github.com/andybalholm/brotli" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" cache "github.com/patrickmn/go-cache" ) @@ -35,7 +23,7 @@ type Config struct { // RPC endpoint of WVM chain Endpoint string // WVM chain id - ChainID uint64 + ChainID int64 // User's archiver address // Should be populated with tWVM tokens used in WVM testnet // Please check out WVM Faucet to claim $tWVM. @@ -45,30 +33,27 @@ type Config struct { // Store...wraps wvm client, ethclient and concurrent internal cache type Store struct { - client *ethclient.Client - log log.Logger - txCache *cache.Cache - cfg *Config + rpcClient *EthRPCClient + log log.Logger + txCache *cache.Cache + cfg *Config } const ( - wvmRPCURL = "https://testnet-rpc.wvm.dev" // wvm alphanet rpc url - wvmChainID = 9496 // wvm alphanet chain id - + // wvmRPCURL = "https://testnet-rpc.wvm.dev" // wvm alphanet rpc url ArchivePoolAddress = "0x0000000000000000000000000000000000000000" // the data settling address, a unified standard across WeaveVM archiving services ) func NewStore(cfg *Config, log log.Logger) (*Store, error) { - client, err := ethclient.Dial(wvmRPCURL) + rpcClient, err := NewEthRPCClient(log, cfg) if err != nil { - return nil, fmt.Errorf("failed to connect to the WVM client: %w", err) - } - privKey := os.Getenv("WVM_PRIV_KEY") - if privKey == "" { - return nil, fmt.Errorf("wvm archiver private key is empty") + return nil, fmt.Errorf("failed to initialize rpc client for wvm chain: %w", err) } - return &Store{client: client, cfg: cfg, log: log, txCache: cache.New(24*15*time.Hour, 24*time.Hour)}, nil + return &Store{ + rpcClient: rpcClient, + cfg: cfg, log: log, txCache: cache.New(24*15*time.Hour, 24*time.Hour), + }, nil } func (wvm *Store) BackendType() store.BackendType { @@ -85,27 +70,27 @@ func (wvm *Store) Verify(_ context.Context, key []byte, value []byte) error { } func (wvm *Store) Put(ctx context.Context, key []byte, value []byte) error { - wvmData, err := wvm.WvmEncode(value) + wvmData, err := wvm.wvmEncode(value) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } - err = wvm.getSuggestedGasPrice(ctx) + err = wvm.rpcClient.getSuggestedGasPrice(ctx) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } - gas, err := wvm.estimateGas(ctx, wvm.cfg.ArchiverAddress, ArchivePoolAddress, wvmData) + gas, err := wvm.rpcClient.estimateGas(ctx, wvm.cfg.ArchiverAddress, ArchivePoolAddress, wvmData) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } - wvmRawTx, err := wvm.createRawTransaction(ctx, ArchivePoolAddress, string(wvmData), gas) + wvmRawTx, err := wvm.rpcClient.createRawTransaction(ctx, ArchivePoolAddress, string(wvmData), gas) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } - wvmTxHash, err := wvm.sendRawTransaction(ctx, wvmRawTx) + wvmTxHash, err := wvm.rpcClient.sendRawTransaction(ctx, wvmRawTx) if err != nil { return fmt.Errorf("failed to store data in wvm: %w", err) } @@ -121,54 +106,6 @@ func (wvm *Store) Put(ctx context.Context, key []byte, value []byte) error { return nil } -func (wvm *Store) WvmEncode(eigenBlob []byte) ([]byte, error) { - eigenBlobLen := len(eigenBlob) - - wvm.log.Info("WVM:eigen blob received", "eigen blob size", eigenBlobLen) - - brotliOut := bytes.Buffer{} - writer := brotli.NewWriterOptions(&brotliOut, brotli.WriterOptions{Quality: 6}) - - in := bytes.NewReader(eigenBlob) - n, err := io.Copy(writer, in) - if err != nil { - panic(err) - } - - if int(n) != len(eigenBlob) { - panic("WVM:size mismatch during brotli compression") - } - - if err := writer.Close(); err != nil { - panic(fmt.Errorf("WVM: brotli writer close fail: %w", err)) - } - - wvm.log.Info("WVM:compressed by brotli", "eigen blob size", eigenBlobLen, "eigen blob size compressed with brotli", brotliOut.Len()) - - return brotliOut.Bytes(), nil -} - -func (wvm *Store) WvmDecode(calldataBlob string) ([]byte, error) { - // trim calldata - compressedBlob, err := hex.DecodeString(calldataBlob[2:]) - if err != nil { - return nil, err - } - - wvm.log.Info("WVM:compressed eigen blob received for decompression", "compressed blob size", len(compressedBlob)) - - brotliReader := brotli.NewReader(bytes.NewReader(compressedBlob)) - - decompressedEncoded, err := io.ReadAll(brotliReader) - if err != nil { - return nil, fmt.Errorf("WVM: failed to decompress brotli data: %w", err) - } - - wvm.log.Info("WVM:blob successfully decompressed", "decompressed blob size", len(compressedBlob)) - - return decompressedEncoded, nil -} - func (wvm *Store) Get(ctx context.Context, key []byte) ([]byte, error) { wvmTxHash, err := wvm.GetWvmTxHashByCommitment(key) if err != nil { @@ -176,7 +113,6 @@ func (wvm *Store) Get(ctx context.Context, key []byte) ([]byte, error) { } wvm.log.Info("found wvm tx hash using provided commitment", "provided key", string(key)) - data, err := wvm.get(ctx, wvmTxHash) if err != nil { return nil, fmt.Errorf("failed to get eigenda blob from wvm: %w", err) @@ -215,7 +151,7 @@ func (wvm *Store) get(ctx context.Context, wvmTxHash string) ([]byte, error) { wvm.log.Info("Received data from WVM", "arweave_block_hash", wvmData.ArweaveBlockHash, "wvm_block_hash", wvmData.WvmBlockHash) - wvmDecodedBlob, err := wvm.WvmDecode(wvmData.Calldata) + wvmDecodedBlob, err := wvm.wvmDecode(wvmData.Calldata) if err != nil { return nil, fmt.Errorf("failed to decode calldata to eigen decoded blob: %w", err) } @@ -241,238 +177,50 @@ func (wvm *Store) GetWvmTxHashByCommitment(key []byte) (string, error) { return wvmTxHash.(string), nil } -// HELPERS - -// getSuggestedGasPrice connects to an Ethereum node via RPC and retrieves the current suggested gas price. -func (wvm *Store) getSuggestedGasPrice(ctx context.Context) error { - gasPrice, err := wvm.client.SuggestGasPrice(ctx) - if err != nil { - return fmt.Errorf("failed to suggest gas price: %w", err) - } - - wvm.log.Info("WVM suggested Gas Price", "gas", gasPrice.String()) - - return nil -} - -// estimateGas tries estimates the suggested amount of gas that required to execute a given transaction. -func (wvm *Store) estimateGas(ctx context.Context, from, to string, data []byte) (uint64, error) { - var ( - fromAddr = common.HexToAddress(from) - toAddr = common.HexToAddress(to) - bytesData []byte - err error - ) - - var encoded string - if string(data) != "" { - if ok := strings.HasPrefix(string(data), "0x"); !ok { - encoded = hexutil.Encode(data) - } - - bytesData, err = hexutil.Decode(encoded) - if err != nil { - return 0, err - } - } - - msg := ethereum.CallMsg{ - From: fromAddr, - To: &toAddr, - Gas: 0x00, - Data: bytesData, - } - - gas, err := wvm.client.EstimateGas(ctx, msg) - if err != nil { - return 0, err - } - - wvm.log.Info("WVM estimated Gas Price", "price", gas) - - return gas, nil -} - -// createRawTransaction creates a raw EIP-1559 transaction and returns it as a hex string. -func (wvm *Store) createRawTransaction(ctx context.Context, to string, data string, gasLimit uint64) (string, error) { - baseFee, err := wvm.client.SuggestGasPrice(ctx) - if err != nil { - return "", err - } - - // WVM: maybe we don't need this, but e.g. - // increment := new(big.Int).Mul(big.NewInt(2), big.NewInt(params.GWei)) - // gasFeeCap := new(big.Int).Add(baseFee, increment) - // gasFeeCap.Add(gasFeeCap, priorityFee) +func (wvm *Store) wvmEncode(eigenBlob []byte) ([]byte, error) { + eigenBlobLen := len(eigenBlob) - gasFeeCap := baseFee + wvm.log.Info("WVM:eigen blob received", "eigen blob size", eigenBlobLen) - // address shenanigans - // Decode the provided private key. - privKey := os.Getenv("WVM_PRIV_KEY") - if privKey == "" { - panic("wvm archiver signer key is empty") - } - pKeyBytes, err := hexutil.Decode("0x" + privKey) - if err != nil { - return "", err - } - // Convert the private key bytes to an ECDSA private key. - ecdsaPrivateKey, err := crypto.ToECDSA(pKeyBytes) - if err != nil { - return "", err - } - // Extract the public key from the ECDSA private key. - publicKey := ecdsaPrivateKey.Public() - publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) - if !ok { - return "", fmt.Errorf("error casting public key to ECDSA") - } - // Compute the Ethereum address of the signer from the public key. - fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) - // Retrieve the nonce for the signer's account, representing the transaction count. - nonce, err := wvm.client.PendingNonceAt(ctx, fromAddress) - if err != nil { - return "", err - } + brotliOut := bytes.Buffer{} + writer := brotli.NewWriterOptions(&brotliOut, brotli.WriterOptions{Quality: 6}) - // Prepare data payload. - var hexData string - if strings.HasPrefix(data, "0x") { - hexData = data - } else { - hexData = hexutil.Encode([]byte(data)) - } - bytesData, err := hexutil.Decode(hexData) + in := bytes.NewReader(eigenBlob) + n, err := io.Copy(writer, in) if err != nil { - return "", err - } - - toAddr := common.HexToAddress(to) - txData := types.DynamicFeeTx{ - ChainID: big.NewInt(wvmChainID), - Nonce: nonce, - GasTipCap: big.NewInt(0), - GasFeeCap: gasFeeCap, - Gas: gasLimit, - To: &toAddr, - Data: bytesData, + return nil, fmt.Errorf("failed to read buffer to encode eigen blob: %w", err) } - tx := types.NewTx(&txData) - signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(big.NewInt(wvmChainID)), ecdsaPrivateKey) - if err != nil { - return "", err + if int(n) != len(eigenBlob) { + return nil, fmt.Errorf("WVM:size mismatch during brotli compression") } - // Encode the signed transaction into RLP (Recursive Length Prefix) format for transmission. - var buf bytes.Buffer - err = signedTx.EncodeRLP(&buf) - if err != nil { - return "", err + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("WVM: brotli writer close fail: %w", err) } - // Return the RLP-encoded transaction as a hexadecimal string. - rawTxRLPHex := hex.EncodeToString(buf.Bytes()) - - return rawTxRLPHex, nil -} + wvm.log.Info("WVM:compressed by brotli", "eigen blob size", eigenBlobLen, "eigen blob size compressed with brotli", brotliOut.Len()) -// Transaction represents the structure of the transaction JSON. -type Transaction struct { - Type string `json:"type"` - ChainID string `json:"chainId"` - Nonce string `json:"nonce"` - To string `json:"to"` - Gas string `json:"gas"` - GasPrice string `json:"gasPrice,omitempty"` - MaxPriorityFeePerGas string `json:"maxPriorityFeePerGas"` - MaxFeePerGas string `json:"maxFeePerGas"` - Value string `json:"value"` - Input string `json:"input"` - AccessList []string `json:"accessList"` - V string `json:"v"` - R string `json:"r"` - S string `json:"s"` - YParity string `json:"yParity"` - Hash string `json:"hash"` - TransactionTime string `json:"transactionTime,omitempty"` - TransactionCost string `json:"transactionCost,omitempty"` + return brotliOut.Bytes(), nil } -func (wvm *Store) sendRawTransaction(ctx context.Context, rawTx string) (string, error) { - rawTxBytes, err := hex.DecodeString(rawTx) - if err != nil { - return "", err - } - - tx := new(types.Transaction) - - err = rlp.DecodeBytes(rawTxBytes, &tx) - if err != nil { - return "", err - } - - err = wvm.client.SendTransaction(ctx, tx) - if err != nil { - return "", err - } - - var txDetails Transaction - txBytes, err := tx.MarshalJSON() - if err != nil { - return "", err - } - if err := json.Unmarshal(txBytes, &txDetails); err != nil { - return "", err - } - - txDetails.TransactionTime = tx.Time().Format(time.RFC822) - txDetails.TransactionCost = tx.Cost().String() - - convertFields := []string{"Nonce", "MaxPriorityFeePerGas", "MaxFeePerGas", "Value", "Type", "Gas"} - for _, field := range convertFields { - if err := convertHexField(&txDetails, field); err != nil { - return "", err - } - } - - txJSON, err := json.MarshalIndent(txDetails, "", "\t") +func (wvm *Store) wvmDecode(calldataBlob string) ([]byte, error) { + // trim calldata + compressedBlob, err := hex.DecodeString(calldataBlob[2:]) if err != nil { - return "", err + return nil, err } - wvm.log.Info("WVM:raw TX Receipt:", "tx receipt", string(txJSON)) - - return txDetails.Hash, nil -} - -// helper function -func convertHexField(tx *Transaction, field string) error { - typeOfTx := reflect.TypeOf(*tx) - - // Get the value of the Transaction struct - txValue := reflect.ValueOf(tx).Elem() + wvm.log.Info("WVM:compressed eigen blob received for decompression", "compressed blob size", len(compressedBlob)) - // Parse the hexadecimal string as an integer - hexStr := txValue.FieldByName(field).String() + brotliReader := brotli.NewReader(bytes.NewReader(compressedBlob)) - intValue, err := strconv.ParseUint(hexStr[2:], 16, 64) + decompressedEncoded, err := io.ReadAll(brotliReader) if err != nil { - return err - } - - // Convert the integer to a decimal string - decimalStr := strconv.FormatUint(intValue, 10) - - // Check if the field exists - _, ok := typeOfTx.FieldByName(field) - if !ok { - return fmt.Errorf("field %s does not exist in Transaction struct", field) + return nil, fmt.Errorf("WVM: failed to decompress brotli data: %w", err) } - // Set the field value to the decimal string - txValue.FieldByName(field).SetString(decimalStr) + wvm.log.Info("WVM:blob successfully decompressed", "decompressed blob size", len(compressedBlob)) - return nil + return decompressedEncoded, nil }