Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy all chunk #532

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/indexer-proxy/proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "subql-indexer-proxy"
version = "2.8.0"
version = "2.8.2-beta.1"
edition = "2021"

[dependencies]
Expand Down
8 changes: 7 additions & 1 deletion apps/indexer-proxy/proxy/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,13 @@ async fn handle_group(
)
.await
{
Ok((res_query, res_signature, res_state, _limit)) => {
Ok((
res_query,
res_signature,
res_state,
_limit,
_inactive,
)) => {
json!({
"result": general_purpose::STANDARD.encode(&res_query),
"signature": res_signature,
Expand Down
10 changes: 5 additions & 5 deletions apps/indexer-proxy/proxy/src/payg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ pub async fn query_single_state(
state: QueryState,
network_type: MetricsNetwork,
no_sig: bool,
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>)> {
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>, bool)> {
let project: Project = get_project(project_id).await?;

// compute unit count times
Expand Down Expand Up @@ -529,7 +529,7 @@ pub async fn query_single_state(
})?;

debug!("Handle query channel success");
Ok((data, signature, post_state.to_bs64_old2(), limit))
Ok((data, signature, post_state.to_bs64_old2(), limit, true))
}

// query with multiple state mode
Expand Down Expand Up @@ -658,7 +658,7 @@ pub async fn query_multiple_state(
state: MultipleQueryState,
network_type: MetricsNetwork,
no_sig: bool,
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>)> {
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>, bool)> {
let project = get_project(project_id).await?;

// compute unit count times
Expand All @@ -675,7 +675,7 @@ pub async fn query_multiple_state(
}
})?;
if inactive {
return Ok((vec![], "".to_owned(), state.to_bs64(), None));
return Ok((vec![], "".to_owned(), state.to_bs64(), None, inactive));
}

// query the data.
Expand All @@ -701,7 +701,7 @@ pub async fn query_multiple_state(
post_query_multiple_state(keyname, state_cache).await;

debug!("Handle query channel success");
Ok((data, signature, state.to_bs64(), limit))
Ok((data, signature, state.to_bs64(), limit, inactive))
}

pub async fn extend_channel(
Expand Down
81 changes: 46 additions & 35 deletions apps/indexer-proxy/proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#![deny(warnings)]
use crate::account::ACCOUNT;
use crate::ai::api_stream;
use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload};
use crate::cli::COMMAND;
use crate::contracts::check_agreement_and_consumer;
use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery};
use crate::payg::{
extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel,
query_multiple_state, query_single_state, AuthPayg,
};
use crate::project::get_project;
use crate::sentry_log::make_sentry_message;
use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType};
use crate::{
account::{get_indexer, indexer_healthy},
auth::AuthWhitelistQuery,
};
use axum::extract::ws::WebSocket;
use axum::{
extract::{ConnectInfo, Path, WebSocketUpgrade},
Expand All @@ -43,23 +60,6 @@ use subql_indexer_utils::{
};
use tower_http::cors::{Any, CorsLayer};

use crate::ai::api_stream;
use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload};
use crate::cli::COMMAND;
use crate::contracts::check_agreement_and_consumer;
use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery};
use crate::payg::{
extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel,
query_multiple_state, query_single_state, AuthPayg,
};
use crate::project::get_project;
use crate::sentry_log::make_sentry_message;
use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType};
use crate::{
account::{get_indexer, indexer_healthy},
auth::AuthWhitelistQuery,
};

#[derive(Serialize)]
pub struct QueryUri {
/// the url refer to specific project
Expand Down Expand Up @@ -484,7 +484,7 @@ async fn ep_payg_handler(
return payg_stream(endpoint.endpoint.clone(), v, state, false).await;
}

let (data, signature, state_data, limit) = match block.to_str() {
let (data, signature, state_data, limit, inactive) = match block.to_str() {
Ok("multiple") => {
let state = match MultipleQueryState::from_bs64(auth) {
Ok(p) => p,
Expand Down Expand Up @@ -527,30 +527,41 @@ async fn ep_payg_handler(

let (body, mut headers) = match res_fmt.to_str() {
Ok("inline") => {
let return_body = if let Ok(return_data) = String::from_utf8(data.clone()) {
if return_data.is_empty() {
let return_body = match String::from_utf8(data.clone()) {
Ok(return_data) => {
if data.is_empty() {
let account = ACCOUNT.read().await;
let indexer = account.indexer;
drop(account);
let indexer_string = format!("{:?}", indexer);
let unique_title = format!(
"payg ep_query_handler, proxy get empty and lead to inline returns empty, deployment_id: {}, ep_name: {}",
deployment, ep_name
);
let msg = format!(
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, base64 data is {:#?}, account address is {:#?}, inactive is {}",
res_fmt, headers, body, data, data.len(), general_purpose::STANDARD.encode(&data), indexer_string, inactive
);
make_sentry_message(&unique_title, &msg);
}
return_data
}
Err(err) => {
let account = ACCOUNT.read().await;
let indexer = account.indexer;
drop(account);
let indexer_string = format!("{:?}", indexer);
let unique_title = format!(
"payg ep_query_handler, inline returns empty, because endpoint returns empty, deployment_id: {}, ep_name: {}",
deployment, ep_name
);
);
let msg = format!(
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}",
res_fmt, headers, body, data
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, err is {:#?}, base64 data is {:#?}, account address is {:#?}, inactive is {}",
res_fmt, headers, body, data, data.len(), err, general_purpose::STANDARD.encode(&data), indexer_string,inactive
);
make_sentry_message(&unique_title, &msg);
"".to_owned()
}
return_data
} else {
let unique_title = format!(
"payg ep_query_handler, inline returns empty, deployment_id: {}, ep_name: {}",
deployment, ep_name
);
let msg = format!(
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}",
res_fmt, headers, body, data
);
make_sentry_message(&unique_title, &msg);
"".to_owned()
};
(
return_body,
Expand Down
3 changes: 2 additions & 1 deletion apps/indexer-proxy/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ http = "1.1.0"
native-tls = "0.2.12"
once_cell = "1.12"
rand_chacha = "0.3"
reqwest = { version = "0.12", features = ["json", "native-tls"] }
reqwest = { version = "0.12", features = ["json", "native-tls", "stream"] }
rustc-hex = "2.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with ={ version = "3.0", features = ["json"] }
subql-contracts = { git = "https://github.com/subquery/network-contracts", tag = "v1.5.0" }
tokio-stream = "0.1.16"
uint = "0.10"
42 changes: 19 additions & 23 deletions apps/indexer-proxy/utils/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde_json::{json, Value};
use serde_with::skip_serializing_none;
use std::error::Error as StdError;
use std::time::Duration;
use tokio_stream::StreamExt;

pub static REQUEST_CLIENT: Lazy<Client> = Lazy::new(reqwest::Client::new);

Expand Down Expand Up @@ -134,38 +135,33 @@ pub async fn post_request_raw(uri: &str, query: String) -> Result<Vec<u8>, Error
// handle request
#[inline]
async fn handle_request_raw(request: RequestBuilder, query: String) -> Result<Vec<u8>, Error> {
let response_result = request
let res = request
.timeout(Duration::from_secs(REQUEST_TIMEOUT))
.header(CONTENT_TYPE, APPLICATION_JSON)
.header(CONNECTION, KEEP_ALIVE)
.body(query.to_owned())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(reqwest::header::CONNECTION, "keep-alive")
.body(query)
.send()
.await;

let res = match response_result {
Ok(res) => res,
Err(_e) => {
return Err(Error::GraphQLInternal(
.await
.or_else(|_e| {
Err(Error::GraphQLInternal(
1010,
"Service exception or timeout".to_owned(),
))
}
};
})?;

let status = res.status();
let body = res
.bytes()
.await
.map(|bytes| bytes.to_vec())
.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?;
let mut body_stream = res.bytes_stream();
let mut body = Vec::new();

// 200~299
if status.is_success() {
Ok(body)
} else {
let err = String::from_utf8(body).unwrap_or("Internal request error".to_owned());
Err(Error::GraphQLInternal(1011, err))
while let Some(chunk) = body_stream.next().await {
let chunk = chunk.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?;
body.extend_from_slice(&chunk);
}

status.is_success().then_some(body.clone()).ok_or_else(|| {
let err = String::from_utf8_lossy(&body).to_string();
Error::GraphQLInternal(1011, err)
})
}

// Request to indexer/consumer proxy
Expand Down
Loading