From e262bbd89df39dcff79bce952c9720bbb299eb55 Mon Sep 17 00:00:00 2001 From: Georgi Date: Wed, 18 Dec 2024 11:22:16 +0200 Subject: [PATCH] feat(native): enhance traceability with additional logs for better debugging --- fhevm-engine/fhevm-go-native/fhevm/api.go | 183 +++++++++++++++++-- fhevm-engine/fhevm-go-native/fhevm/fhelib.go | 7 + 2 files changed, 175 insertions(+), 15 deletions(-) diff --git a/fhevm-engine/fhevm-go-native/fhevm/api.go b/fhevm-engine/fhevm-go-native/fhevm/api.go index 11579534..20b70165 100644 --- a/fhevm-engine/fhevm-go-native/fhevm/api.go +++ b/fhevm-engine/fhevm-go-native/fhevm/api.go @@ -8,11 +8,14 @@ import ( "math/big" "os" "sort" + "strings" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + _ "github.com/mattn/go-sqlite3" grpc "google.golang.org/grpc" ) @@ -134,6 +137,11 @@ type ExtraData struct { FheRandSeed [32]byte } +// Implement String method for ExtraData +func (ed ExtraData) String() string { + return fmt.Sprintf("ExtraData {FheRandSeed: %s}", common.BytesToHash(ed.FheRandSeed[:]).TerminalString()) +} + type ExecutorSession interface { Execute(input []byte, ed ExtraData, output []byte) error ContractAddress() common.Address @@ -157,6 +165,20 @@ type CacheBlockData struct { materializedCiphertexts map[string][]byte } +// Implement the fmt.Stringer interface +func (c CacheBlockData) String() string { + if len(c.materializedCiphertexts) == 0 { + return "MaterializedCiphertexts: none" + } + var sb strings.Builder + sb.WriteString("MaterializedCiphertexts: ") + for key, value := range c.materializedCiphertexts { + sb.WriteString(fmt.Sprintf(" %s: %s, ", key, common.BytesToHash(value).TerminalString())) + } + + return sb.String() +} + type BlockCiphertextQueue struct { queue []*ComputationToInsert // filter duplicates @@ -194,6 +216,17 @@ type ComputationOperand struct { FheUintType FheUintType } +// Implement the fmt.Stringer interface +func (c ComputationOperand) String() string { + return fmt.Sprintf( + "ComputationOperand {IsScalar: %t, Handle: %s, CompressedCiphertext len: %d, FheUintType: %s}", + c.IsScalar, + common.BytesToHash(c.Handle).TerminalString(), + len(c.CompressedCiphertext), + c.FheUintType, + ) +} + type ComputationToInsert struct { segmentId SegmentId Operation FheOp @@ -202,6 +235,23 @@ type ComputationToInsert struct { CommitBlockId int64 } +// Return Handle as TerminalString +func (c ComputationToInsert) Handle() string { + return common.BytesToHash(c.OutputHandle).TerminalString() +} + +// Implement the fmt.Stringer interface +func (c ComputationToInsert) String() string { + return fmt.Sprintf( + "ComputationToInsert { SegmentId: %d, Operation: %s, OutputHandle: %s, Operands: %v, CommitBlockId: %d}", + c.segmentId, + c.Operation, + c.Handle(), + c.Operands, + c.CommitBlockId, + ) +} + type SessionComputationStore struct { insertedHandles map[string]int invalidatedSegments map[SegmentId]bool @@ -234,7 +284,10 @@ func (executorApi *ApiImpl) CreateSession(blockNumber int64) ExecutorSession { } func (executorApi *ApiImpl) PreloadCiphertexts(blockNumber int64, api ChainStorageApi) error { + log := logger("preload") + computations := executorApi.loadComputationsFromStateToCache(blockNumber, api) + log.Info("Preload ciphertexts", "block", blockNumber, "length", computations) if computations > 0 { return executorProcessPendingComputations(executorApi) } @@ -246,8 +299,9 @@ func (executorApi *ApiImpl) loadComputationsFromStateToCache(startBlockNumber in loadStartTime := time.Now() computations := 0 defer func() { + log := logger("preload") duration := time.Since(loadStartTime) - fmt.Printf("ciphertext cache preloaded with %d ciphertexts in %dms\n", computations, duration.Milliseconds()) + log.Info("Preload done", "computations", computations, "duration", duration) }() // TODO: figure out the limit how long in future blocks we should preload @@ -342,8 +396,12 @@ func (executorApi *ApiImpl) loadComputationsFromStateToCache(startBlockNumber in } func (sessionApi *SessionImpl) Commit(blockNumber int64, storage ChainStorageApi) error { + log := logger("commit") + + log.Debug("Session store ciphertexts", "block", blockNumber) err := sessionApi.sessionStore.Commit(storage) if err != nil { + log.Error("Commit failed", "block", blockNumber, "error", err) return err } @@ -351,13 +409,16 @@ func (sessionApi *SessionImpl) Commit(blockNumber int64, storage ChainStorageApi if err != nil { return err } - return nil } func (sessionApi *SessionImpl) Execute(dataOrig []byte, ed ExtraData, outputOrig []byte) error { + log := logger("session::execute") + if len(dataOrig) < 4 { - return fmt.Errorf("input data must be at least 4 bytes for signature, got %d", len(dataOrig)) + err := fmt.Errorf("input data must be at least 4 bytes for signature, got %d", len(dataOrig)) + log.Error("Execute failed", "error", err) + return err } // make copies so we could assume array is immutable later @@ -374,12 +435,26 @@ func (sessionApi *SessionImpl) Execute(dataOrig []byte, ed ExtraData, outputOrig if len(output) >= 32 { // where to get output handle from? outputHandle := output[0:32] - return method.runFunction(sessionApi, callData, ed, outputHandle) + handle := common.BytesToHash(outputHandle).TerminalString() + + log.Debug("Call", "method", *method, "calldata len", len(callData), + "extra data", ed, "handle", handle) + + err := method.runFunction(sessionApi, callData, ed, outputHandle) + if err != nil { + log.Error("Computation not inserted", method, "handle", handle, "error", err) + } + + return err } else { - return errors.New("no output data provided") + err := errors.New("no output data provided") + log.Error("Execute failed", "error", err) + return err } } else { - return fmt.Errorf("signature %d not recognized", signature) + err := fmt.Errorf("signature %d not recognized", signature) + log.Error("Execute failed", "error", err) + return err } } @@ -417,6 +492,8 @@ func (dbApi *SessionComputationStore) InsertComputationBatch(computations []Comp } func (dbApi *SessionComputationStore) InsertComputation(computation ComputationToInsert) error { + log := logger("store") + _, found := dbApi.insertedHandles[string(computation.OutputHandle)] if !found { // preserve insertion order @@ -427,6 +504,8 @@ func (dbApi *SessionComputationStore) InsertComputation(computation ComputationT // he can have faster commit computation.CommitBlockId = dbApi.blockNumber + 5 dbApi.inserts = append(dbApi.inserts, computation) + log.Info("Insert computation", + "inserts count", len(dbApi.inserts), "computation", computation) } return nil @@ -445,8 +524,6 @@ func (dbApi *SessionComputationStore) Commit(storage ChainStorageApi) error { dbApi.invalidatedSegments = make(map[SegmentId]bool) dbApi.segmentCount = 0 - fmt.Printf("Inserting %d computations into the cache\n", len(finalInserts)) - evmInserter := EvmStorageComputationStore{ currentBlockNumber: dbApi.blockNumber, contractStorageAddress: dbApi.contractStorageAddress, @@ -574,6 +651,10 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain // in 5 or 10 blocks from current block, depending on how much they pay. // We create buckets, how many blocks in the future user wants // his ciphertexts to be evaluated + + // Get the current process ID + log := logger("evm_store") + buckets := make(map[int64][]*ComputationToInsert) // index the buckets for _, comp := range computations { @@ -582,6 +663,11 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain } buckets[comp.CommitBlockId] = append(buckets[comp.CommitBlockId], &comp) } + + if len(buckets) != 0 { + log.Info("New buckets added", "buckets", len(buckets)) + } + // collect all their keys and sort because golang doesn't traverse map // in deterministic order allKeys := make([]int, 0) @@ -602,6 +688,13 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain for idx, comp := range bucket { layout := blockQueueStorageLayout(queueBlockNumber, int64(idx)) ciphertextsInBlock = ciphertextsInBlock.Add(ciphertextsInBlock, one) + + log.Info("Persist computation to LateCommit queue", + "handle", comp.Handle(), + "commit block", queueBlockNumber, + "count addr", countAddress.TerminalString(), + "ciphertextsInBlock", ciphertextsInBlock.Int64()) + metadata := computationMetadata(*comp) evmStorage.SetState(dbApi.contractStorageAddress, layout.metadata, metadata) evmStorage.SetState(dbApi.contractStorageAddress, layout.outputHandle, common.BytesToHash(comp.OutputHandle)) @@ -631,6 +724,7 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain queueBlockNumber := int64(key) bucket := buckets[queueBlockNumber] ctsStorage := dbApi.cache.ciphertextsToCompute[queueBlockNumber] + if ctsStorage == nil { ctsStorage = &BlockCiphertextQueue{ queue: make([]*ComputationToInsert, 0), @@ -648,8 +742,14 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain dbApi.hydrateComputationFromEvmState(evmStorage, comp) ctsStorage.queue = append(ctsStorage.queue, comp) ctsStorage.enqueuedCiphertext[string(comp.OutputHandle)] = true + + log.Info("Add bucket to Cache", + "commit block", queueBlockNumber, + "handle", comp.Handle(), + "cache length", len(ctsStorage.queue)) } } + } // notify about work available @@ -662,7 +762,9 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain } func (dbApi *EvmStorageComputationStore) hydrateComputationFromEvmState(evmStorage ChainStorageApi, comp *ComputationToInsert) error { + log := logger("evm_store") + // hydrate operands from storage for idx := range comp.Operands { if !comp.Operands[idx].IsScalar { if len(comp.Operands[idx].Handle) != 32 { @@ -671,6 +773,9 @@ func (dbApi *EvmStorageComputationStore) hydrateComputationFromEvmState(evmStora hash := common.BytesToHash(comp.Operands[idx].Handle) resultCt := ReadBytesToAddress(evmStorage, dbApi.contractStorageAddress, hash) comp.Operands[idx].CompressedCiphertext = resultCt + + log.Info("Hydrate computation", "handle", comp.Handle(), + "operand_handle", hash.TerminalString(), "ciphertext len", len(resultCt)) } } @@ -744,11 +849,15 @@ func ReadBytesToAddress(api ChainStorageApi, contractAddress common.Address, add } func (executorApi *ApiImpl) flushFheResultsToState(blockNumber int64, api ChainStorageApi) error { + log := logger("flush") + // cleanup the queue for the block number countAddress := blockNumberToQueueItemCountAddress(blockNumber) ciphertextsInBlock := api.GetState(executorApi.contractStorageAddress, countAddress).Big() ctCount := ciphertextsInBlock.Int64() + log.Debug("Flush ciphertexts", "block number", blockNumber, "count addr", countAddress.TerminalString(), "count", ctCount) + zero := common.BigToHash(big.NewInt(0)) one := big.NewInt(1) @@ -761,6 +870,10 @@ func (executorApi *ApiImpl) flushFheResultsToState(blockNumber int64, api ChainS ctAddr := blockQueueStorageLayout(blockNumber, int64(i)) metadata := bytesToMetadata(api.GetState(executorApi.contractStorageAddress, ctAddr.metadata)) outputHandle := api.GetState(executorApi.contractStorageAddress, ctAddr.outputHandle) + + log.Info("Reset computation LateCommit queue", "block number", blockNumber, + "handle", outputHandle.TerminalString()) + handlesToMaterialize = append(handlesToMaterialize, outputHandle) api.SetState(executorApi.contractStorageAddress, ctAddr.metadata, zero) api.SetState(executorApi.contractStorageAddress, ctAddr.outputHandle, zero) @@ -780,6 +893,10 @@ func (executorApi *ApiImpl) flushFheResultsToState(blockNumber int64, api ChainS // set 0 as count if ctCount > 0 { api.SetState(executorApi.contractStorageAddress, countAddress, zero) + + log.Debug("Reset count addr", + "block number", blockNumber, + "count addr", countAddress.TerminalString(), "count", ctCount) } // materialize handles in storage assuming they exist in the cache @@ -807,12 +924,16 @@ func (executorApi *ApiImpl) materializeHandlesInStorage(blockNumber int64, handl return nil } + log := logger("materialize") for _, handle := range handles { ciphertext, ok := blockData.materializedCiphertexts[string(handle[:])] if !ok { return errors.New("ciphertext not found in cache") } + log.Info("Persist ciphertext to state", "block number", blockNumber, "handle", + handle.TerminalString(), "ciphertext length", len(ciphertext)) + putBytesToAddress(api, contractAddr, handle, ciphertext) } @@ -864,9 +985,8 @@ func InitExecutor() (ExecutorApi, error) { if !hasAddr { return nil, errors.New("FHEVM_EXECUTOR_URL is set but FHEVM_CONTRACT_ADDRESS is not set") } - fhevmContractAddress := common.HexToAddress(contractAddr) - fmt.Printf("Coprocessor contract address: %s\n", fhevmContractAddress) + fhevmContractAddress := common.HexToAddress(contractAddr) aclContractAddressHex := os.Getenv("ACL_CONTRACT_ADDRESS") if !common.IsHexAddress(aclContractAddressHex) { return nil, fmt.Errorf("bad or missing ACL_CONTRACT_ADDRESS: %s", aclContractAddressHex) @@ -876,6 +996,12 @@ func InitExecutor() (ExecutorApi, error) { // pick hardcoded value in the beginning, we can change later storageAddress := common.HexToAddress("0x0000000000000000000000000000000000000070") + logger("init").Info("FHEVM initialized", + "Executor addr", executorUrl, + "FHEVM contract", contractAddr, + "ACL contract", aclContractAddressHex, + "Storage contract", storageAddress.Hex()) + workAvailableChan := make(chan bool, 10) cache := &CiphertextCache{ @@ -901,6 +1027,7 @@ func InitExecutor() (ExecutorApi, error) { } func executorWorkerThread(impl *ApiImpl) { + log := logger("worker") for { // try reading notification from channel <-impl.cache.workAvailableChan @@ -911,12 +1038,14 @@ func executorWorkerThread(impl *ApiImpl) { err := executorProcessPendingComputations(impl) if err != nil { - fmt.Printf("executor error while processing pending computations: %s\n", err) + log.Error("Failed to execute", "executor_err", fmt.Sprintf("%v", err)) } } } func executorProcessPendingComputations(impl *ApiImpl) error { + log := logger("sync compute") + startTime := time.Now() impl.cache.lock.Lock() defer func() { @@ -958,6 +1087,9 @@ func executorProcessPendingComputations(impl *ApiImpl) error { ctToBlockIndex := make(map[string]int64) for block, compute := range impl.cache.ciphertextsToCompute { + log.Info("Processing block", + "commit block", block, "computations", len(compute.queue)) + for _, ct := range compute.queue { syncInputs := make([]*SyncInput, 0, len(ct.Operands)) resultHandles := make([][]byte, 0, 1) @@ -983,29 +1115,44 @@ func executorProcessPendingComputations(impl *ApiImpl) error { } } - request.Computations = append(request.Computations, &SyncComputation{ + comp := &SyncComputation{ Operation: FheOperation(ct.Operation), Inputs: syncInputs, ResultHandles: resultHandles, - }) + } + + request.Computations = append(request.Computations, comp) + log.Info("Add operation", "op", comp.Operation, "handle", ct.Handle()) + ctToBlockIndex[string(ct.OutputHandle)] = block } } - fmt.Printf("sending grpc request with %d computations and %d ciphertexts\n", len(request.Computations), len(request.CompressedCiphertexts)) + log.Info("Sending request", + "computations", len(request.Computations), + "compressed ciphertexts", len(request.CompressedCiphertexts)) + + if len(request.Computations) != 0 { + for _, compCt := range request.CompressedCiphertexts { + log.Info("Request with compressed ciphertext", "handle", common.BytesToHash(compCt.Handle).TerminalString(), + "compCt len", len(compCt.Serialization)) + } + } client := NewFhevmExecutorClient(conn) response, err := client.SyncCompute(context.Background(), &request) if err != nil { return err } + ciphertexts := response.GetResultCiphertexts() if ciphertexts == nil { return errors.New(response.GetError().String()) } + log.Info("Response", "ciphertexts count", len(ciphertexts.Ciphertexts)) + outCts := ciphertexts.Ciphertexts - fmt.Printf("got %d ciphertext responses from the executor\n", len(outCts)) for _, ct := range outCts { theBlock, exists := ctToBlockIndex[string(ct.Handle)] if !exists { @@ -1021,6 +1168,8 @@ func executorProcessPendingComputations(impl *ApiImpl) error { } blockData.materializedCiphertexts[string(ct.Handle)] = ct.Serialization + + log.Info("Result block data", "block", common.BytesToHash(ct.Handle).TerminalString()) } // reset map of the queue @@ -1028,3 +1177,7 @@ func executorProcessPendingComputations(impl *ApiImpl) error { return nil } + +func logger(ctx string) log.Logger { + return log.Root().With("module", fmt.Sprintf("fhevm:%s", ctx)) +} diff --git a/fhevm-engine/fhevm-go-native/fhevm/fhelib.go b/fhevm-engine/fhevm-go-native/fhevm/fhelib.go index fd523742..5846f746 100644 --- a/fhevm-engine/fhevm-go-native/fhevm/fhelib.go +++ b/fhevm-engine/fhevm-go-native/fhevm/fhelib.go @@ -17,6 +17,13 @@ type FheLibMethod struct { NonScalarDisabled bool } +func (m FheLibMethod) String() string { + return fmt.Sprintf( + "FheLibMethod(Name: %s, ArgTypes: %s, ScalarSupport: %t, NonScalarDisabled: %t)", + m.Name, m.ArgTypes, m.ScalarSupport, m.NonScalarDisabled, + ) +} + var signatureToFheLibMethod = map[uint32]*FheLibMethod{} func FheLibMethods() []*FheLibMethod {