Skip to content

Commit

Permalink
feat: Only allow inserts and deletes operations to be executed in par…
Browse files Browse the repository at this point in the history
…allel (#3257)

* feat: Only allow inserts and deletes operations to be executed in parallel.

* feat: add comment
  • Loading branch information
fengjiachun authored Jan 29, 2024
1 parent 691b649 commit e5a2b04
Showing 1 changed file with 38 additions and 14 deletions.
52 changes: 38 additions & 14 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,29 +217,53 @@ impl RegionServer {
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponse> {
let is_parallel = matches!(
request,
region_request::Body::Inserts(_) | region_request::Body::Deletes(_)
);
let requests = RegionRequest::try_from_request_body(request)
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let tracing_context = TracingContext::from_current_span();
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
let results = if is_parallel {
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
}
});

try_join_all(join_tasks)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?
} else {
let mut results = Vec::with_capacity(requests.len());
// FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
for (region_id, req) in requests {
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
let result = self
.handle_request(region_id, req)
.trace(span)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
results.push(result);
}
});

let results = try_join_all(join_tasks)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
results
};

// merge results by simply sum up affected rows.
// only insert/delete will have multiple results.
Expand Down

0 comments on commit e5a2b04

Please sign in to comment.