Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the polling interval configurable. #170

Merged
merged 6 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/getting_started/fhevm/coprocessor/contracts.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Deploy initial contracts
# Deploy initial contractsL

manoranjith marked this conversation as resolved.
Show resolved Hide resolved
Following is an example of how to deploy initial contracts on the Ethereum Sepolia testnet. Deploying on Ethereum mainnet should be almost identical and should be possible by just changing the SEPOLIA_RPC_URL to ETHEREUM_MAINNET_RPC_URL and poiting to a correct RPC node.

Expand Down Expand Up @@ -36,7 +36,7 @@ For the `SEPOLIA_RPC_URL` env variable, you can either get one from a service pr
- `node_modules/fhevm-core-contracts/addresses/.env.fhepayment` for FHEPayment address
- `gateway/.env.gateway` for GatewayContract address.

This script is found exactly inside the [`./precompute-addresses.sh` file](https://github.com/zama-ai/fhevm/blob/main/precompute-addresses.sh):
This script is found exactly inside the [`./precompute-addresses.sh` file](https://github.com/zama-ai/fhevm/blob/v0.6.0-0/precompute-addresses.sh):

```
#!/bin/bash
Expand Down
4 changes: 2 additions & 2 deletions fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion fhevm-engine/coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "coprocessor"
version = "0.1.1"
version = "0.1.2"
manoranjith marked this conversation as resolved.
Show resolved Hide resolved
default-run = "coprocessor"
authors.workspace = true
edition.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions fhevm-engine/coprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Reload database and apply schemas from scratch
make recreate_db
```
Run the server and background fhe worker

```
cargo run -- --run-server --run-bg-worker --worker-polling-interval-ms 1000
```
cargo run -- --run-server --run-bg-worker
```
4 changes: 4 additions & 0 deletions fhevm-engine/coprocessor/src/daemon_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub struct Args {
#[arg(long)]
pub run_bg_worker: bool,

/// Polling interval for the background worker to fetch jobs
#[arg(long, default_value_t = 1000)]
pub worker_polling_interval_ms: u64,

/// Generate fhe keys and exit
#[arg(long)]
pub generate_fhe_keys: bool,
Expand Down
4 changes: 2 additions & 2 deletions fhevm-engine/coprocessor/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{db_queries::populate_cache_with_tenant_keys, types::TfheTenantKeys};
use fhevm_engine_common::types::{FhevmError, Handle, SupportedFheCiphertexts};
use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts};
use fhevm_engine_common::{tfhe_ops::current_ciphertext_version, types::SupportedFheOperations};
use itertools::Itertools;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -85,7 +85,7 @@ async fn tfhe_worker_cycle(
WORK_ITEMS_NOTIFICATIONS_COUNTER.inc();
info!(target: "tfhe_worker", "Received work_available notification from postgres");
},
_ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => {
_ = tokio::time::sleep(tokio::time::Duration::from_millis(args.worker_polling_interval_ms)) => {
dartdart26 marked this conversation as resolved.
Show resolved Hide resolved
WORK_ITEMS_POLL_COUNTER.inc();
info!(target: "tfhe_worker", "Polling the database for more work on timer");
},
Expand Down
2 changes: 1 addition & 1 deletion fhevm-engine/fhevm-engine-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fhevm-engine-common"
version = "0.1.1"
version = "0.6.0-6"
authors.workspace = true
edition.workspace = true
license.workspace = true
Expand Down
16 changes: 15 additions & 1 deletion fhevm-engine/fhevm-go-coproc/fhevm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"google.golang.org/protobuf/proto"
)

const DEFAULT_FHEVM_COPROCESSOR_SYNC_INTERVAL = "1s"

type FheUintType uint8

const (
Expand Down Expand Up @@ -144,6 +146,7 @@ type ApiImpl struct {
store *SqliteComputationStore
address common.Address
aclContractAddress common.Address
flushInterval time.Duration
}

type SessionImpl struct {
Expand Down Expand Up @@ -429,10 +432,21 @@ func InitCoprocessor() (CoprocessorApi, error) {
}
aclContractAddress := common.HexToAddress(aclContractAddressHex)

coprocessorSyncIntervalString, isConfigured := os.LookupEnv("FHEVM_COPROCESSOR_SYNC_INTERVAL")
if !isConfigured {
coprocessorSyncIntervalString = DEFAULT_FHEVM_COPROCESSOR_SYNC_INTERVAL
}
coprocessorSyncInterval, err := time.ParseDuration(coprocessorSyncIntervalString)
if err != nil {
return nil, fmt.Errorf("parsing FHEVM_COPROCESSOR_SYNC_INTERVAL (%s): %w", coprocessorSyncIntervalString, err)
}
fmt.Printf("Using coprocessor sync interval %s\n", coprocessorSyncInterval.String())

apiImpl := ApiImpl{
store: ciphertextDb,
address: fhevmContractAddress,
aclContractAddress: aclContractAddress,
flushInterval: coprocessorSyncInterval,
}

// background job to submit computations to coprocessor
Expand All @@ -448,7 +462,7 @@ func scheduleCoprocessorFlushes(impl *ApiImpl) {
// timer to send polling for messages every 10 seconds
go func() {
for {
time.Sleep(time.Millisecond * 10000)
time.Sleep(impl.flushInterval)
select {
case impl.store.jobChannel <- true:
default:
Expand Down
Loading