Skip to content

Commit

Permalink
handle grpc error (#419)
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Sep 7, 2023
1 parent dd34500 commit 5ac72f2
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,27 @@ where
) -> Result<<Self as Plan>::Result> {
// limit concurrent requests
let permit = permits.acquire().await.unwrap();
let mut resp = plan.execute().await?;
let res = plan.execute().await;
drop(permit);

let is_grpc_error = |e: &Error| matches!(e, Error::GrpcAPI(_) | Error::Grpc(_));
let mut resp = match res {
Ok(resp) => resp,
Err(e) if is_grpc_error(&e) => {
return Self::handle_grpc_error(
pd_client,
plan,
region_store,
backoff,
permits,
preserve_region_results,
e,
)
.await;
}
Err(e) => return Err(e),
};

if let Some(e) = resp.key_errors() {
Ok(vec![Err(Error::MultipleKeyErrors(e))])
} else if let Some(e) = resp.region_error() {
Expand Down Expand Up @@ -272,6 +290,34 @@ where
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}

async fn handle_grpc_error(
pd_client: Arc<PdC>,
plan: P,
region_store: RegionStore,
mut backoff: Backoff,
permits: Arc<Semaphore>,
preserve_region_results: bool,
e: Error,
) -> Result<<Self as Plan>::Result> {
debug!("handle grpc error: {:?}", e);
let ver_id = region_store.region_with_leader.ver_id();
pd_client.invalidate_region_cache(ver_id).await;
match backoff.next_delay_duration() {
Some(duration) => {
sleep(duration).await;
Self::single_plan_handler(
pd_client,
plan,
backoff,
permits,
preserve_region_results,
)
.await
}
None => Err(e),
}
}
}

impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> {
Expand Down

0 comments on commit 5ac72f2

Please sign in to comment.