Skip to content

Commit

Permalink
add more log of utf8 failure
Browse files Browse the repository at this point in the history
  • Loading branch information
getong committed Nov 20, 2024
1 parent 471c398 commit 5b663ee
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 66 deletions.
2 changes: 1 addition & 1 deletion apps/indexer-proxy/proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "subql-indexer-proxy"
version = "2.7.1"
version = "2.8.0-beta.1"
edition = "2021"

[dependencies]
Expand Down
8 changes: 7 additions & 1 deletion apps/indexer-proxy/proxy/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,13 @@ async fn handle_group(
)
.await
{
Ok((res_query, res_signature, res_state, _limit)) => {
Ok((
res_query,
res_signature,
res_state,
_limit,
_inactive,
)) => {
json!({
"result": general_purpose::STANDARD.encode(&res_query),
"signature": res_signature,
Expand Down
10 changes: 5 additions & 5 deletions apps/indexer-proxy/proxy/src/payg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ pub async fn query_single_state(
state: QueryState,
network_type: MetricsNetwork,
no_sig: bool,
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>)> {
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>, bool)> {
let project: Project = get_project(project_id).await?;

// compute unit count times
Expand Down Expand Up @@ -529,7 +529,7 @@ pub async fn query_single_state(
})?;

debug!("Handle query channel success");
Ok((data, signature, post_state.to_bs64_old2(), limit))
Ok((data, signature, post_state.to_bs64_old2(), limit, true))
}

// query with multiple state mode
Expand Down Expand Up @@ -658,7 +658,7 @@ pub async fn query_multiple_state(
state: MultipleQueryState,
network_type: MetricsNetwork,
no_sig: bool,
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>)> {
) -> Result<(Vec<u8>, String, String, Option<(i64, i64)>, bool)> {
let project = get_project(project_id).await?;

// compute unit count times
Expand All @@ -675,7 +675,7 @@ pub async fn query_multiple_state(
}
})?;
if inactive {
return Ok((vec![], "".to_owned(), state.to_bs64(), None));
return Ok((vec![], "".to_owned(), state.to_bs64(), None, inactive));
}

// query the data.
Expand All @@ -701,7 +701,7 @@ pub async fn query_multiple_state(
post_query_multiple_state(keyname, state_cache).await;

debug!("Handle query channel success");
Ok((data, signature, state.to_bs64(), limit))
Ok((data, signature, state.to_bs64(), limit, inactive))
}

pub async fn extend_channel(
Expand Down
81 changes: 46 additions & 35 deletions apps/indexer-proxy/proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#![deny(warnings)]
use crate::account::ACCOUNT;
use crate::ai::api_stream;
use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload};
use crate::cli::COMMAND;
use crate::contracts::check_agreement_and_consumer;
use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery};
use crate::payg::{
extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel,
query_multiple_state, query_single_state, AuthPayg,
};
use crate::project::get_project;
use crate::sentry_log::make_sentry_message;
use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType};
use crate::{
account::{get_indexer, indexer_healthy},
auth::AuthWhitelistQuery,
};
use axum::extract::ws::WebSocket;
use axum::{
extract::{ConnectInfo, Path, WebSocketUpgrade},
Expand All @@ -43,23 +60,6 @@ use subql_indexer_utils::{
};
use tower_http::cors::{Any, CorsLayer};

use crate::ai::api_stream;
use crate::auth::{create_jwt, AuthQuery, AuthQueryLimit, Payload};
use crate::cli::COMMAND;
use crate::contracts::check_agreement_and_consumer;
use crate::metrics::{get_owner_metrics, MetricsNetwork, MetricsQuery};
use crate::payg::{
extend_channel, fetch_channel_cache, merket_price, open_state, pay_channel,
query_multiple_state, query_single_state, AuthPayg,
};
use crate::project::get_project;
use crate::sentry_log::make_sentry_message;
use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType};
use crate::{
account::{get_indexer, indexer_healthy},
auth::AuthWhitelistQuery,
};

#[derive(Serialize)]
pub struct QueryUri {
/// the url refer to specific project
Expand Down Expand Up @@ -484,7 +484,7 @@ async fn ep_payg_handler(
return payg_stream(endpoint.endpoint.clone(), v, state, false).await;
}

let (data, signature, state_data, limit) = match block.to_str() {
let (data, signature, state_data, limit, inactive) = match block.to_str() {
Ok("multiple") => {
let state = match MultipleQueryState::from_bs64(auth) {
Ok(p) => p,
Expand Down Expand Up @@ -527,30 +527,41 @@ async fn ep_payg_handler(

let (body, mut headers) = match res_fmt.to_str() {
Ok("inline") => {
let return_body = if let Ok(return_data) = String::from_utf8(data.clone()) {
if return_data.is_empty() {
let return_body = match String::from_utf8(data.clone()) {
Ok(return_data) => {
if data.is_empty() {
let account = ACCOUNT.read().await;
let indexer = account.indexer;
drop(account);
let indexer_string = format!("{:?}", indexer);
let unique_title = format!(
"payg ep_query_handler, proxy get empty and lead to inline returns empty, deployment_id: {}, ep_name: {}",
deployment, ep_name
);
let msg = format!(
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, base64 data is {:#?}, account address is {:#?}, inactive is {}",
res_fmt, headers, body, data, data.len(), general_purpose::STANDARD.encode(&data), indexer_string, inactive
);
make_sentry_message(&unique_title, &msg);
}
return_data
}
Err(err) => {
let account = ACCOUNT.read().await;
let indexer = account.indexer;
drop(account);
let indexer_string = format!("{:?}", indexer);
let unique_title = format!(
"payg ep_query_handler, inline returns empty, because endpoint returns empty, deployment_id: {}, ep_name: {}",
deployment, ep_name
);
);
let msg = format!(
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}",
res_fmt, headers, body, data
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:#?}, data length is {}, err is {:#?}, base64 data is {:#?}, account address is {:#?}, inactive is {}",
res_fmt, headers, body, data, data.len(), err, general_purpose::STANDARD.encode(&data), indexer_string,inactive
);
make_sentry_message(&unique_title, &msg);
"".to_owned()
}
return_data
} else {
let unique_title = format!(
"payg ep_query_handler, inline returns empty, deployment_id: {}, ep_name: {}",
deployment, ep_name
);
let msg = format!(
"res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}",
res_fmt, headers, body, data
);
make_sentry_message(&unique_title, &msg);
"".to_owned()
};
(
return_body,
Expand Down
3 changes: 2 additions & 1 deletion apps/indexer-proxy/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ http = "1.1.0"
native-tls = "0.2.12"
once_cell = "1.12"
rand_chacha = "0.3"
reqwest = { version = "0.12", features = ["json", "native-tls"] }
reqwest = { version = "0.12", features = ["json", "native-tls", "stream"] }
rustc-hex = "2.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_with ={ version = "3.0", features = ["json"] }
subql-contracts = { git = "https://github.com/subquery/network-contracts", tag = "v1.5.0" }
tokio-stream = "0.1.16"
uint = "0.10"
42 changes: 19 additions & 23 deletions apps/indexer-proxy/utils/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde_json::{json, Value};
use serde_with::skip_serializing_none;
use std::error::Error as StdError;
use std::time::Duration;
use tokio_stream::StreamExt;

pub static REQUEST_CLIENT: Lazy<Client> = Lazy::new(reqwest::Client::new);

Expand Down Expand Up @@ -134,38 +135,33 @@ pub async fn post_request_raw(uri: &str, query: String) -> Result<Vec<u8>, Error
// handle request
#[inline]
async fn handle_request_raw(request: RequestBuilder, query: String) -> Result<Vec<u8>, Error> {
let response_result = request
let res = request
.timeout(Duration::from_secs(REQUEST_TIMEOUT))
.header(CONTENT_TYPE, APPLICATION_JSON)
.header(CONNECTION, KEEP_ALIVE)
.body(query.to_owned())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(reqwest::header::CONNECTION, "keep-alive")
.body(query)
.send()
.await;

let res = match response_result {
Ok(res) => res,
Err(_e) => {
return Err(Error::GraphQLInternal(
.await
.or_else(|_e| {
Err(Error::GraphQLInternal(
1010,
"Service exception or timeout".to_owned(),
))
}
};
})?;

let status = res.status();
let body = res
.bytes()
.await
.map(|bytes| bytes.to_vec())
.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?;
let mut body_stream = res.bytes_stream();
let mut body = Vec::new();

// 200~299
if status.is_success() {
Ok(body)
} else {
let err = String::from_utf8(body).unwrap_or("Internal request error".to_owned());
Err(Error::GraphQLInternal(1011, err))
while let Some(chunk) = body_stream.next().await {
let chunk = chunk.map_err(|e| Error::GraphQLQuery(1011, e.to_string()))?;
body.extend_from_slice(&chunk);
}

status.is_success().then_some(body.clone()).ok_or_else(|| {
let err = String::from_utf8_lossy(&body).to_string();
Error::GraphQLInternal(1011, err)
})
}

// Request to indexer/consumer proxy
Expand Down

0 comments on commit 5b663ee

Please sign in to comment.