From 2e25da61535709f7474076420b1f0d6f3eb001de Mon Sep 17 00:00:00 2001 From: Eugene Formanenko Date: Wed, 2 Oct 2024 14:57:32 +0400 Subject: [PATCH] feat: add dataset info --- .github/workflows/docker.yml | 31 +++--- Cargo.lock | 21 +++- Cargo.toml | 5 +- mainnet.config.yml | 184 +---------------------------------- src/api_types.rs | 13 +++ src/cli.rs | 95 +++++++++++++++--- src/http_server.rs | 86 ++++++++++++++-- src/main.rs | 25 ++++- src/network/client.rs | 19 +++- src/network/datasets.rs | 108 ++++++++++++++++++++ src/network/mod.rs | 2 + src/network/state.rs | 10 +- src/network/storage.rs | 2 +- tethys.config.yml | 17 +++- 14 files changed, 368 insertions(+), 250 deletions(-) create mode 100644 src/api_types.rs create mode 100644 src/network/datasets.rs diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 06d16e4..49b7a7e 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -1,10 +1,6 @@ name: docker on: workflow_dispatch: # manually run - inputs: - tag: - description: image tag - required: true env: CI: true @@ -20,32 +16,29 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 + - uses: SebRollen/toml-action@v1.2.0 + id: read_toml + with: + file: 'Cargo.toml' + field: 'package.version' + - name: Docker login uses: docker/login-action@v3 with: username: ${{ secrets.DOCKER_LOGIN }} password: ${{ secrets.DOCKER_TOKEN }} - - name: Build query gateway - uses: docker/build-push-action@v5 - with: - context: . - load: true - tags: subsquid/query-gateway:test - cache-from: type=gha - cache-to: type=gha,mode=max - - - name: Get query gateway version - run: echo "GATEWAY_VERSION=$(docker run --rm subsquid/query-gateway:test subsquid-query-gateway --version | cut -d ' ' -f2)" >> $GITHUB_ENV - - name: Build & publish query gateway uses: docker/build-push-action@v5 + env: + VERSION: ${{ steps.read_toml.outputs.value }} with: context: . - platforms: linux/amd64,linux/arm/v7,linux/arm64/v8,linux/386 + # Build only for Linux AMD64 now, uncomment and re-build before public release + platforms: linux/amd64 +# platforms: linux/amd64,linux/arm/v7,linux/arm64/v8,linux/386 push: true tags: | - subsquid/query-gateway:${{ env.GATEWAY_VERSION }} - subsquid/query-gateway:${{ inputs.tag }} + subsquid/sqd-portal:${{ env.VERSION }} cache-from: type=gha cache-to: type=gha,mode=max diff --git a/Cargo.lock b/Cargo.lock index a315865..e7a7e67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5399,7 +5399,7 @@ dependencies = [ [[package]] name = "sqd-portal" -version = "0.1.0" +version = "0.2.8" dependencies = [ "anyhow", "async-stream", @@ -5428,6 +5428,7 @@ dependencies = [ "tokio 1.38.0", "tokio-stream", "tokio-util", + "tower-http", "tracing", "tracing-futures", "tracing-subscriber", @@ -6053,11 +6054,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" +dependencies = [ + "bitflags 2.6.0", + "bytes 1.7.1", + "http 1.1.0", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" diff --git a/Cargo.toml b/Cargo.toml index f031c17..6c36b0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqd-portal" -version = "0.1.0" +version = "0.2.9" edition = "2021" [dependencies] @@ -16,7 +16,7 @@ itertools = "0.12" lazy_static = "1.4.0" parking_lot = "0.12" prometheus-client = "0.22" -reqwest = { version = "0.12.7", features = ["stream", "gzip"] } +reqwest = { version = "0.12.7", features = ["stream", "gzip", "json"] } scopeguard = "1.2" semver = "1" serde = { version = "1", features = ["derive"] } @@ -31,6 +31,7 @@ tokio-util = "0.7.11" tracing = { version = "0.1", features = ["async-await"] } tracing-futures = { version = "0.2.5", features = ["tokio", "futures-03"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } +tower-http = { version = "0.6.1", features = ["cors"] } uuid = { version = "1", features = ["v4", "fast-rng"] } sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", version = "1.0.7" } diff --git a/mainnet.config.yml b/mainnet.config.yml index 5e7a742..a2ac880 100644 --- a/mainnet.config.yml +++ b/mainnet.config.yml @@ -1,182 +1,4 @@ hostname: http://0.0.0.0:8000 -available_datasets: - 0g-testnet: s3://0g-testnet-traceless-1 - aleph-zero-evm-mainnet: s3://aleph-zero-evm-mainnet - arbitrum-nova: s3://arbitrum-nova-1 - arbitrum-one: s3://arbitrum-one - arbitrum-sepolia: s3://arbitrum-sepolia-traceless - arthera-mainnet: s3://arthera-mainnet - astar-mainnet: s3://astar-mainnet-traceless-1 - astar-zkyoto: s3://astar-zkyoto-1-traceless - astar-zkevm-mainnet: s3://astar-zkevm-mainnet-1-traceless - avalanche-mainnet: s3://ava-mainnet-1 - avalanche-testnet: s3://ava-testnet-1 - b3-sepolia: s3://b3-sepolia - b3-mainnet: s3://b3-mainnet - base-mainnet: s3://base - base-sepolia: s3://base-sepolia - berachain-bartio: s3://berachain-bartio - binance-mainnet: s3://binance-mainnet - binance-testnet: s3://binance-testnet-traceless-1 - bitfinity-mainnet: s3://bitfinity-mainnet - bitfinity-testnet: s3://bitfinity-testnet - blast-l2-mainnet: s3://blast-l2-mainnet-1 - blast-sepolia: s3://blast-sepolia-1-traceless - bitgert-mainnet: s3://bitgert-mainnet-1 - bitgert-testnet: s3://bitgert-testnet-1 - bob-mainnet: s3://bob-mainnet - canto: s3://canto-traceless - canto-testnet: s3://canto-testnet-traceless - core-mainnet: s3://core-mainnet-1 - cyberconnect-l2-testnet: s3://cyber-l2-testnet-2 - cyber-mainnet: s3://cyber-mainnet-1 - crossfi-testnet: s3://crossfi-testnet-traceless - dfk-chain: s3://dfk-chain-traceless-1 - dogechain-mainnet: s3://dogechain-mainnet-traceless-1 - dogechain-testnet: s3://dogechain-testnet-traceless-1 - ethereum-holesky: s3://ethereum-holesky-1 - ethereum-mainnet: s3://ethereum-mainnet-1 - ethereum-sepolia: s3://ethereum-sepolia-1 - etherlink-mainnet: s3://etherlink-mainnet-traceless - etherlink-testnet: s3://etherlink-testnet-3-traceless - exosama: s3://exosama-1 - fantom-mainnet: s3://fantom-mainnet-traceless - fantom-testnet: s3://fantom-testnet-traceless-1 - flare-mainnet: s3://flare-mainnet-traceless-1 - galxe-gravity: s3://galxe-gravity - gelato-arbitrum-blueberry: s3://arbitrum-blueberry-1 - gelato-opcelestia-raspberry: s3://opcelestia-raspberry-traceless - gnosis-mainnet: s3://gnosis-mainnet-1 - immutable-zkevm-mainnet: s3://immutable-zkevm-mainnet-traceless-1 - immutable-zkevm-testnet: s3://immutable-zkevm-testnet-traceless-1 - kyoto-testnet: s3://kyoto-testnet-traceless - linea-mainnet: s3://linea-mainnet-1 - manta-pacific: s3://manta-pacific-traceless - manta-pacific-sepolia: s3://manta-pacific-sepolia - mantle-mainnet: s3://mantle-mainnet-1-traceless - mantle-sepolia: s3://mantle-sepolia-1-traceless - merlin-mainnet: s3://merlin-mainnet-traceless - merlin-testnet: s3://merlin-testnet-traceless - metis-mainnet: s3://metis-mainnet-3 - mode-mainnet: s3://mode-mainnet-traceless - moonbase-testnet: s3://moonbase-testnet-1 - moonbeam-mainnet: s3://moonbeam-evm-2 - moonriver-mainnet: s3://moonriver-mainnet-1 - moonsama: s3://moonsama-traceless-1 - nakachain: s3://nakachain - neon-mainnet: s3://neon-mainnet-traceless - neon-devnet: s3://neon-devnet-traceless-1 - neox-testnet: s3://neox-testnet - opbnb-mainnet: s3://opbnb-mainnet-traceless-1 - opbnb-testnet: s3://opbnb-testnet-traceless-1 - optimism-mainnet: s3://optimism-2 - optimism-sepolia: s3://optimism-sepolia-traceless - peaq-mainnet: s3://peaq-mainnet-traceless-1 - plume-testnet: s3://plume-testnet - plume-devnet: s3://plume-devnet - polygon-mainnet: s3://polygon-mainnet-1 - polygon-amoy-testnet: s3://polygon-amoy-testnet-traceless - polygon-zkevm-mainnet: s3://polygon-zkevm-mainnet-traceless - polygon-zkevm-cardona-testnet: s3://polygon-zkevm-cardona-testnet-traceless-1 - prom-testnet: s3://prom-testnet-1-traceless - prom-testnet-v2: s3://prom-testnet-v2 - puppynet: s3://puppynet - scroll-mainnet: s3://scroll-mainnet - scroll-sepolia: s3://scroll-sepolia - shiden-mainnet: s3://shiden-mainnet - shibarium: s3://shibarium - shibuya-testnet: s3://shibuya-testnet-traceless-1 - skale-nebula: s3://skale-nebula-traceless-1 - stratovm-sepolia: s3://stratovm-sepolia - superseed-sepolia: s3://superseed-sepolia-traceless - taiko-mainnet: s3://taiko-mainnet-traceless - tanssi: s3://tanssi-traceless-1 - x1-testnet: s3://x1-testnet-traceless - xlayer-mainnet: s3://xlayer-mainnet-traceless - xlayer-testnet: s3://xlayer-testnet-traceless - zksync-mainnet: s3://zksync-mainnet-1 - zksync-sepolia: s3://zksync-sepolia-2 - zora-mainnet: s3://zora-mainnet - zora-sepolia: s3://zora-sepolia-traceless-1 - acala: s3://acala-4 - acurast-canary: s3://acurast-canary - agung: s3://agung-4 - aleph-zero: s3://aleph-zero - aleph-zero-testnet: s3://aleph-zero-testnet - amplitude: s3://amplitude-4 - asset-hub-kusama: s3://asset-hub-kusama-4 - asset-hub-polkadot: s3://asset-hub-polkadot-4 - asset-hub-rococo: s3://asset-hub-rococo-4 - astar-substrate: s3://astar-substrate - avail: s3://avail-1 - basilisk: s3://basilisk-4 - bifrost-kusama: s3://bifrost-kusama-4 - bifrost-polkadot: s3://bifrost-polkadot-4 - bittensor-testnet: s3://bittensor-testnet - bittensor: s3://bittensor-4 - bridge-hub-kusama: s3://bridge-hub-kusama-4 - bridge-hub-polkadot: s3://bridge-hub-polkadot-4 - bridge-hub-rococo: s3://bridge-hub-rococo-4 - bridge-hub-westend: s3://bridge-hub-westend-4 - centrifuge: s3://centrifuge-4 - cere: s3://cere-4 - chainflip: s3://chainflip-4 - collectives-polkadot: s3://collectives-polkadot-4 - collectives-westend: s3://collectives-westend-4 - crust: s3://crust-4 - clover: s3://clover-4 - data-avail: s3://data-avail-4 - dancebox: s3://dancebox-4 - darwinia-crab: s3://darwinia-crab-4 - darwinia: s3://darwinia-4 - degen-chain: s3://degen-traceless - eden: s3://eden-4 - enjin-matrix: s3://enjin-matrix-4 - enjin-canary-matrix: s3://enjin-canary-matrix - equilibrium: s3://equilibrium - foucoco: s3://foucoco-5 - frequency: s3://frequency-4 - gemini-3h: s3://gemini-3h - hydradx: s3://hydradx-4 - integritee: s3://integritee - interlay: s3://interlay-4 - invarch-parachain: s3://invarch-parachain-4 - invarch-tinkernet: s3://invarch-tinkernet-5 - joystream: s3://joystream-4 - karura: s3://karura-4 - khala: s3://khala-4 - kilt: s3://kilt-4 - kintsugi: s3://kintsugi-4 - kusama: s3://kusama-4 - litentry: s3://litentry-4 - moonbeam-substrate: s3://moonbeam-substrate-4 - moonbase-substrate: s3://moonbase-substrate-4 - moonriver-substrate: s3://moonriver-substrate-4 - paseo: s3://paseo-5 - peaq-mainnet-substrate: s3://peaq-mainnet-5 - pendulum: s3://pendulum-4 - phala-testnet: s3://phala-testnet-4 - phala: s3://phala-4 - picasso: s3://picasso - polimec: s3://polimec-4 - polkadex: s3://polkadex - polkadot: s3://polkadot-4 - polymesh: s3://polymesh-4 - reef-testnet: s3://reef-testnet-4-1 - reef: s3://reef-4 - robonomics: s3://robonomics-4 - rococo: s3://rococo-4 - rolimec: s3://rolimec-4 - shiden-substrate: s3://shiden-substrate-4 - shibuya-substrate: s3://shibuya-substrate - sora-mainnet: s3://sora-mainnet-4 - subsocial-parachain: s3://subsocial-parachain-4-1 - ternoa: s3://ternoa-4 - turing-avail: s3://avail-turing - turing-mainnet: s3://turing-mainnet - vara: s3://vara-4 - vara-testnet: s3://vara-testnet-4 - westend: s3://westend-4 - zeitgeist-testnet: s3://zeitgeist-testnet-4 - zeitgeist: s3://zeitgeist-4 - zkverify-testnet: s3://zkverify-testnet +sqd_network: + datasets: https://cdn.subsquid.io/sqd-network/datasets.yml + serve: "all" \ No newline at end of file diff --git a/src/api_types.rs b/src/api_types.rs new file mode 100644 index 0000000..431d6ce --- /dev/null +++ b/src/api_types.rs @@ -0,0 +1,13 @@ +use sqd_contract_client::PeerId; + +#[derive(serde::Serialize)] +pub(crate) struct PortalConfigApiResponse { + pub libp2p_key: PeerId, +} + +#[derive(serde::Serialize)] +pub(crate) struct AvailableDatasetApiResponse { + pub slug: String, + pub aliases: Vec, + pub real_time: bool, +} diff --git a/src/cli.rs b/src/cli.rs index 307bb9e..a569c45 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,11 +1,11 @@ +use crate::types::DatasetId; use clap::Parser; use serde::Deserialize; +use serde_with::serde_derive::Serialize; use serde_with::{serde_as, DurationSeconds}; -use std::time::Duration; -use std::{collections::HashMap, net::SocketAddr}; use sqd_network_transport::{PeerId, TransportArgs}; - -use crate::types::DatasetId; +use std::net::SocketAddr; +use std::time::Duration; #[derive(Parser)] #[command(version)] @@ -82,8 +82,62 @@ where Ok(s.trim_end_matches('/').to_owned()) } +fn default_served_datasets() -> Vec { + vec![] +} + +fn default_serve() -> String { + "all".into() +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SqdNetworkConfig { + pub datasets: Option, + + #[serde(default = "default_serve")] + pub serve: String, +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatasetSourceConfig { + pub kind: String, + pub name_ref: String, + + pub id: String, +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatasetConfig { + pub slug: String, + pub aliases: Option>, + pub data_sources: Vec, + // // FIXME move to centralized service? + // pub bucket: String, + // pub name: Option, + // pub chain_id: Option, + // pub url: Option, + // pub chain_ss58_prefix: Option, + // pub chain_type: Option, + // pub is_testnet: Option, + // pub data: Option, + // pub tier: Option, +} + +impl DatasetConfig { + pub fn network_dataset_id(&self) -> Option { + if let Some(source) = self.data_sources.iter().find(|s| s.kind == "sqd_network") { + Some(DatasetId::from_url(&source.id)) + } else { + None + } + } +} + #[serde_as] -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { #[serde(deserialize_with = "parse_hostname")] pub hostname: String, @@ -139,8 +193,10 @@ pub struct Config { )] pub chain_update_interval: Duration, - // Dataset alias -> bucket URL - pub available_datasets: HashMap, + #[serde(rename = "serve", default = "default_served_datasets")] + pub available_datasets: Vec, + + pub sqd_network: Option, } impl Config { @@ -149,13 +205,28 @@ impl Config { Ok(serde_yaml::from_slice(file_contents.as_slice())?) } - pub fn dataset_id(&self, dataset: &str) -> Option { - self.available_datasets - .get(dataset) - .map(DatasetId::from_url) + pub fn find_dataset(&self, slug: &str) -> Option<&DatasetConfig> { + self.available_datasets.iter().find(|d| { + if d.slug == slug { + return true; + } + + if let Some(ref aliases) = d.aliases { + return aliases.contains(&slug.to_string()); + } + + return false; + }) + } + + pub fn dataset_id(&self, slug: &str) -> Option { + self.find_dataset(slug) + .and_then(|dataset| dataset.network_dataset_id()) } pub fn dataset_ids(&self) -> impl Iterator + '_ { - self.available_datasets.values().map(DatasetId::from_url) + self.available_datasets + .iter() + .filter_map(|d| d.network_dataset_id()) } } diff --git a/src/http_server.rs b/src/http_server.rs index 7c2e365..3f764ce 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use axum::http::Method; use axum::{ async_trait, body::Body, @@ -14,7 +15,9 @@ use itertools::Itertools; use prometheus_client::registry::Registry; use sqd_contract_client::PeerId; use sqd_messages::query_result; +use tower_http::cors::{Any, CorsLayer}; +use crate::api_types::{AvailableDatasetApiResponse, PortalConfigApiResponse}; use crate::{ cli::Config, controller::task_manager::TaskManager, @@ -38,12 +41,12 @@ async fn get_height( } async fn get_worker( - Path((dataset, start_block)): Path<(String, u64)>, + Path((slug, start_block)): Path<(String, u64)>, Extension(client): Extension>, Extension(config): Extension>, ) -> Response { - let Some(dataset_id) = config.dataset_id(&dataset) else { - return RequestError::NotFound(format!("Unknown dataset: {dataset}")).into_response(); + let Some(dataset_id) = config.dataset_id(&slug) else { + return RequestError::NotFound(format!("Unknown dataset: {slug}")).into_response(); }; let worker_id = match client.find_worker(&dataset_id, start_block) { @@ -51,7 +54,7 @@ async fn get_worker( None => { return ( StatusCode::SERVICE_UNAVAILABLE, - format!("No available worker for dataset {dataset} block {start_block}"), + format!("No available worker for dataset {slug} block {start_block}"), ) .into_response() } @@ -123,8 +126,59 @@ async fn execute_stream( .unwrap() } -async fn get_network_state(Extension(client): Extension>) -> impl IntoResponse { - axum::Json(client.network_state()) +async fn get_status( + Extension(client): Extension>, + Extension(config): Extension>, +) -> impl IntoResponse { + axum::Json(PortalConfigApiResponse { + libp2p_key: client.peer_id(), + }) +} + +async fn get_datasets(Extension(config): Extension>) -> impl IntoResponse { + let datasets = (*config).clone().available_datasets; + + let res: Vec = datasets + .into_iter() + .map(|d| { + // FIXME empty strings for id, name? + AvailableDatasetApiResponse { + slug: d.slug, + aliases: d.aliases.unwrap_or_default(), + real_time: false, + } + }) + .collect(); + + axum::Json(res) +} + +async fn get_dataset_state( + Path(slug): Path, + Extension(client): Extension>, + Extension(config): Extension>, +) -> impl IntoResponse { + let Some(dataset_id) = config.dataset_id(&slug) else { + return RequestError::NotFound(format!("Unknown dataset: {slug}")).into_response(); + }; + + axum::Json(client.dataset_state(dataset_id)).into_response() +} + +async fn get_dataset_metadata( + Path(slug): Path, + Extension(config): Extension>, +) -> impl IntoResponse { + let Some(dataset) = config.find_dataset(&slug) else { + return RequestError::NotFound(format!("Unknown dataset: {slug}")).into_response(); + }; + + axum::Json(AvailableDatasetApiResponse { + slug: dataset.slug.clone(), + aliases: dataset.aliases.clone().unwrap_or_default(), + real_time: false, + }) + .into_response() } async fn get_metrics(Extension(registry): Extension>) -> impl IntoResponse { @@ -154,17 +208,29 @@ pub async fn run_server( addr: &SocketAddr, config: Arc, ) -> anyhow::Result<()> { + let cors = CorsLayer::new() + .allow_methods([Method::GET, Method::POST, Method::OPTIONS, Method::PUT]) + .allow_headers(Any) + .allow_origin(Any); + tracing::info!("Starting HTTP server listening on {addr}"); let app = Router::new() + .route("/datasets", get(get_datasets)) + .route("/datasets/:dataset/height", get(get_height)) + .route("/datasets/:dataset/stream", post(execute_stream_restricted)) + .route("/datasets/:dataset/stream/debug", post(execute_stream)) + .route("/datasets/:dataset/state", get(get_dataset_state)) + .route("/datasets/:dataset/metadata", get(get_dataset_metadata)) + // backward compatibility routes + .route("/datasets/:dataset/:start_block/worker", get(get_worker)) .route("/network/:dataset/height", get(get_height)) - .route("/network/:dataset/stream", post(execute_stream_restricted)) - .route("/network/:dataset/debug", post(execute_stream)) - // for backward compatibility .route("/network/:dataset/:start_block/worker", get(get_worker)) .route("/query/:dataset_id/:worker_id", post(execute_query)) - .route("/network/state", get(get_network_state)) + // end backward compatibility routes .layer(axum::middleware::from_fn(logging::middleware)) .route("/metrics", get(get_metrics)) + .route("/status", get(get_status)) + .layer(cors) .layer(Extension(task_manager)) .layer(Extension(network_state)) .layer(Extension(config)) diff --git a/src/main.rs b/src/main.rs index 434846b..c580c7a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ +use std::borrow::Cow; use std::sync::Arc; +use crate::network::datasets_load; use clap::Parser; use cli::Cli; use controller::task_manager::TaskManager; @@ -8,6 +10,7 @@ use network::NetworkClient; use prometheus_client::registry::Registry; use tokio_util::sync::CancellationToken; +mod api_types; mod cli; mod controller; mod http_server; @@ -43,24 +46,36 @@ fn setup_tracing(json: bool) -> anyhow::Result<()> { #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); - let args = Cli::parse(); + let mut args = Cli::parse(); setup_tracing(args.json_log)?; + + if let Ok(datasets) = datasets_load(&args.config).await { + args.config.available_datasets = datasets + }; + let config = Arc::new(args.config); - let mut metrics_registry = Registry::default(); + let network_client = + Arc::new(NetworkClient::new(args.transport, args.logs_collector_id, config.clone()).await?); + + let mut metrics_registry = Registry::with_labels( + vec![( + Cow::Borrowed("portal_id"), + Cow::Owned(network_client.peer_id().to_string()), + )] + .into_iter(), + ); metrics::register_metrics(metrics_registry.sub_registry_with_prefix("portal")); sqd_network_transport::metrics::register_metrics( metrics_registry.sub_registry_with_prefix("transport"), ); - let cancellation_token = CancellationToken::new(); - let network_client = - Arc::new(NetworkClient::new(args.transport, args.logs_collector_id, config.clone()).await?); tracing::info!("Network client initialized"); let task_manager = Arc::new(TaskManager::new( network_client.clone(), config.max_parallel_streams, )); + let cancellation_token = CancellationToken::new(); let (res, ()) = tokio::join!( run_server( task_manager, diff --git a/src/network/client.rs b/src/network/client.rs index 8fd815c..2f32109 100644 --- a/src/network/client.rs +++ b/src/network/client.rs @@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{Stream, StreamExt}; use parking_lot::Mutex; -use serde::Serialize; use sqd_contract_client::{Client as ContractClient, PeerId}; use sqd_messages::{query_result, Ping, Query, QueryResult}; use sqd_network_transport::{ @@ -12,6 +11,8 @@ use sqd_network_transport::{ use tokio::{sync::oneshot, time::Instant}; use tokio_util::sync::CancellationToken; +use super::{NetworkState, StorageClient}; +use crate::network::state::DatasetState; use crate::{ cli::Config, metrics, @@ -19,8 +20,6 @@ use crate::{ utils::UseOnce, }; -use super::{NetworkState, StorageClient}; - lazy_static::lazy_static! { static ref SUPPORTED_VERSIONS: semver::VersionReq = "~1.2.0".parse().expect("Invalid version requirement"); } @@ -36,6 +35,7 @@ pub struct NetworkClient { dataset_storage: StorageClient, dataset_update_interval: Duration, chain_update_interval: Duration, + local_peer_id: PeerId, } struct QueryTask { @@ -53,10 +53,13 @@ impl NetworkClient { let agent_into = get_agent_info!(); let transport_builder = P2PTransportBuilder::from_cli(args, agent_into).await?; let contract_client = transport_builder.contract_client(); + let local_peer_id = transport_builder.local_peer_id().clone(); + let mut gateway_config = GatewayConfig::new(logs_collector); gateway_config.query_config.request_timeout = config.transport_timeout; let (incoming_events, transport_handle) = transport_builder.build_gateway(gateway_config)?; + Ok(NetworkClient { dataset_update_interval: config.dataset_update_interval, chain_update_interval: config.chain_update_interval, @@ -66,6 +69,7 @@ impl NetworkClient { contract_client, tasks: Mutex::new(HashMap::new()), dataset_storage, + local_peer_id, }) } @@ -97,6 +101,7 @@ impl NetworkClient { .current_epoch() .await .unwrap_or_else(|e| panic!("Couldn't get current epoch: {e}")); + tracing::info!("Current epoch: {current_epoch}"); let mut interval = tokio::time::interval_at( Instant::now() + self.chain_update_interval, @@ -288,7 +293,11 @@ impl NetworkClient { Ok(()) } - pub fn network_state(&self) -> impl Serialize { - self.network_state.lock().network_state() + pub fn dataset_state(&self, dataset_id: DatasetId) -> Option { + self.network_state.lock().dataset_state(dataset_id).cloned() + } + + pub fn peer_id(&self) -> PeerId { + self.local_peer_id } } diff --git a/src/network/datasets.rs b/src/network/datasets.rs new file mode 100644 index 0000000..447acbc --- /dev/null +++ b/src/network/datasets.rs @@ -0,0 +1,108 @@ +use crate::cli::{Config, DatasetConfig, DatasetSourceConfig}; +use anyhow::Context; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::io::BufReader; + +#[derive(Serialize, Deserialize, Debug)] +struct Dataset { + pub id: String, + pub name: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct DatasetList { + #[serde(rename = "sqd-network-datasets")] + sqd_network_datasets: Vec, +} + +async fn fetch_remote_file(url: &str) -> anyhow::Result { + tracing::debug!("Fetching remote file from {}", url); + + let response = reqwest::get(url).await?; + let text = response.text().await?; + + let parser = + serde_yaml::from_str(&text).with_context(|| format!("failed to parse dataset {}", url)); + + Ok(parser?) +} + +async fn load_local_file(url: &str) -> anyhow::Result { + let full_path = url.replace("file:/", ""); + + tracing::debug!("Loading local file from {}", full_path); + + let file = File::open(full_path.clone()) + .with_context(|| format!("failed to open file {}", full_path))?; + let reader = BufReader::new(file); + + let parser = serde_yaml::from_reader(reader) + .with_context(|| format!("failed to parse dataset {}", full_path)); + + Ok(parser?) +} + +pub async fn datasets_load(config: &Config) -> anyhow::Result> { + let serve: Option<&str> = config.sqd_network.as_ref().map(|s| s.serve.as_ref()); + + if let Ok(Some(file)) = load_file(config).await { + tracing::debug!( + "File loaded, {} datasets found", + file.sqd_network_datasets.len() + ); + + let predefined = config.available_datasets.clone(); + if serve.as_deref() == Some("none") { + return Ok(predefined); + } + + let loaded = file + .sqd_network_datasets + .iter() + // .filter(|n| { + // let exist = defined.iter().find(|d| { + // d.data_sources + // .iter() + // .find(|y| y.kind == "sqd_network" && y.name_ref === ) + // .is_some() + // }); + // + // exist.is_some() + // }) + .map(|d| DatasetConfig { + slug: d.name.clone(), + aliases: None, + data_sources: vec![DatasetSourceConfig { + kind: "sqd_network".into(), + name_ref: d.name.clone(), + id: d.id.clone(), + }], + }) + .collect(); + + // FIXME merge with predefined + + Ok(loaded) + } else { + tracing::warn!("File loaded with error"); + + Ok(config.available_datasets.clone()) + } +} + +pub async fn load_file(config: &Config) -> anyhow::Result> { + if let Some(url) = config + .sqd_network + .as_ref() + .and_then(|n| n.datasets.as_ref()) + { + if url.starts_with("file://") { + load_local_file(&url).await.map(|r| Some(r)) + } else { + fetch_remote_file(&url).await.map(|r| Some(r)) + } + } else { + Ok(None) + } +} diff --git a/src/network/mod.rs b/src/network/mod.rs index 8965a75..3064551 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -1,8 +1,10 @@ mod client; +mod datasets; mod priorities; mod state; mod storage; pub use client::NetworkClient; +pub use datasets::datasets_load; pub use state::NetworkState; pub use storage::StorageClient; diff --git a/src/network/state.rs b/src/network/state.rs index 0cd8011..7019ebc 100644 --- a/src/network/state.rs +++ b/src/network/state.rs @@ -95,11 +95,7 @@ impl NetworkState { .unwrap_or_else(RangeSet::empty); let entry = self.dataset_states.entry(dataset_id.clone()).or_default(); entry.update(worker_id, dataset_state); - metrics::report_dataset_updated( - &dataset_id, - entry.highest_seen_block, - entry.first_gap, - ); + metrics::report_dataset_updated(&dataset_id, entry.highest_seen_block, entry.first_gap); } } @@ -145,7 +141,7 @@ impl NetworkState { last_pings.get(worker_id).is_some_and(|t| *t > deadline) } - pub fn network_state(&self) -> HashMap { - self.dataset_states.clone() + pub fn dataset_state(&self, dataset_id: DatasetId) -> Option<&DatasetState> { + self.dataset_states.get(&dataset_id) } } diff --git a/src/network/storage.rs b/src/network/storage.rs index a55f1dd..e537cd0 100644 --- a/src/network/storage.rs +++ b/src/network/storage.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use sqd_contract_client::Network; use parking_lot::{Mutex, RwLock}; use serde::{de::DeserializeOwned, Deserialize}; +use sqd_contract_client::Network; use crate::{ metrics, diff --git a/tethys.config.yml b/tethys.config.yml index 4d4e042..ae7bfa2 100644 --- a/tethys.config.yml +++ b/tethys.config.yml @@ -9,8 +9,15 @@ default_request_multiplier: 1 default_retries: 3 default_timeout_quantile: 0.5 dataset_update_interval_sec: 300 -available_datasets: - arbitrum-one: "s3://arbitrum-one" - base-mainnet: "s3://base" - ethereum-mainnet: "s3://ethereum-mainnet" - zksync-mainnet: "s3://zksync-mainnet-1" +sqd_network: + datasets: https://cdn.subsquid.io/sqd-network/datasets.yml + serve: "none" +serve: + - slug: arbitrum-one + bucket: "s3://arbitrum-one" + - slug: base-mainnet + bucket: "s3://base" + - slug: ethereum-mainnet + bucket: "s3://ethereum-mainnet" + - slug: zksync-mainnet + bucket: "s3://zksync-mainnet-1"