Skip to content

Commit

Permalink
big change
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Oct 23, 2024
1 parent dd25217 commit 2f8f850
Show file tree
Hide file tree
Showing 14 changed files with 2,273 additions and 1,405 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ aptos-transaction-filter = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
once_cell = { workspace = true }
prost = { workspace = true }
redis = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-scoped = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tonic-reflection = { workspace = true }
Expand Down
64 changes: 23 additions & 41 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::service::RawDataServerWrapper;
use crate::{data_service::DataService, service::RawDataServerWrapper};
use anyhow::{bail, Result};
use aptos_indexer_grpc_server_framework::RunnableConfig;
use aptos_indexer_grpc_utils::{
compression_util::StorageFormat, config::IndexerGrpcFileStoreConfig,
in_memory_cache::InMemoryCacheConfig, types::RedisUrl,
cache_operator::CacheOperator, compression_util::StorageFormat,
config::IndexerGrpcFileStoreConfig, in_memory_cache::InMemoryCacheConfig, types::RedisUrl,
};
use aptos_protos::{
indexer::v1::FILE_DESCRIPTOR_SET as INDEXER_V1_FILE_DESCRIPTOR_SET,
transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET,
util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET,
};
use aptos_transaction_filter::BooleanTransactionFilter;
use once_cell::sync::{Lazy, OnceCell};
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use tonic::{codec::CompressionEncoding, transport::Server};

pub const SERVER_NAME: &str = "idxdatasvc";
Expand All @@ -29,6 +30,8 @@ const DEFAULT_MAX_RESPONSE_CHANNEL_SIZE: usize = 3;
const HTTP2_PING_INTERVAL_DURATION: std::time::Duration = std::time::Duration::from_secs(60);
const HTTP2_PING_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::from_secs(10);

static DATA_SERVICE: OnceCell<DataService<'static>> = OnceCell::new();

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct TlsConfig {
Expand Down Expand Up @@ -65,9 +68,6 @@ pub struct IndexerGrpcDataServiceConfig {
pub file_store_config: IndexerGrpcFileStoreConfig,
/// Redis read replica address.
pub redis_read_replica_address: RedisUrl,
/// Support compressed cache data.
#[serde(default = "IndexerGrpcDataServiceConfig::default_enable_cache_compression")]
pub enable_cache_compression: bool,
#[serde(default)]
pub in_memory_cache_config: InMemoryCacheConfig,
/// Any transaction that matches this filter will be stripped. This means we remove
Expand Down Expand Up @@ -105,7 +105,6 @@ impl IndexerGrpcDataServiceConfig {
disable_auth_check,
file_store_config,
redis_read_replica_address,
enable_cache_compression,
in_memory_cache_config,
txns_to_strip_filter,
}
Expand Down Expand Up @@ -153,43 +152,14 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);

let cache_storage_format: StorageFormat = if self.enable_cache_compression {
StorageFormat::Lz4CompressedProto
} else {
StorageFormat::Base64UncompressedProto
};

println!(
">>>> Starting Redis connection: {:?}",
&self.redis_read_replica_address.0
);
let redis_conn = redis::Client::open(self.redis_read_replica_address.0.clone())?
.get_tokio_connection_manager()
.await?;
println!(">>>> Redis connection established");
// InMemoryCache.
let in_memory_cache =
aptos_indexer_grpc_utils::in_memory_cache::InMemoryCache::new_with_redis_connection(
self.in_memory_cache_config.clone(),
redis_conn,
cache_storage_format,
)
.await?;
println!(">>>> InMemoryCache established");
let (handler_tx, handler_rx) = tokio::sync::mpsc::channel(100);
// Add authentication interceptor.
let server = RawDataServerWrapper::new(
self.redis_read_replica_address.clone(),
self.file_store_config.clone(),
self.data_service_response_channel_size,
self.txns_to_strip_filter.clone(),
cache_storage_format,
Arc::new(in_memory_cache),
)?;
let svc = aptos_protos::indexer::v1::raw_data_server::RawDataServer::new(server)
let raw_data_server =
RawDataServerWrapper::new(handler_tx, self.data_service_response_channel_size)?;
let svc = aptos_protos::indexer::v1::raw_data_server::RawDataServer::new(raw_data_server)
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
println!(">>>> Starting gRPC server: {:?}", &svc);

let svc_clone = svc.clone();
let reflection_service_clone = reflection_service.clone();
Expand Down Expand Up @@ -234,6 +204,18 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
}));
}

let redis_client = redis::Client::open(self.redis_read_replica_address.0.clone()).unwrap();
let conn =
futures::executor::block_on(redis_client.get_tokio_connection_manager()).unwrap();
let cache_operator = CacheOperator::new(conn, StorageFormat::Lz4CompressedProto);

tasks.push(tokio::task::spawn_blocking(move || {
DATA_SERVICE
.get_or_init(|| DataService::new(cache_operator))
.run(handler_rx);
Ok(())
}));

futures::future::try_join_all(tasks).await?;
Ok(())
}
Expand Down
Loading

0 comments on commit 2f8f850

Please sign in to comment.