Skip to content

Commit

Permalink
feat(executor): support output query plan profiles to logs (databendl…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 authored Dec 12, 2023
1 parent 680a74c commit 676dc82
Show file tree
Hide file tree
Showing 25 changed files with 242 additions and 51 deletions.
5 changes: 5 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_meta_app::principal::OnErrorMode;
use common_meta_app::principal::RoleInfo;
use common_meta_app::principal::UserDefinedConnection;
use common_meta_app::principal::UserInfo;
use common_pipeline_core::processors::profile::PlanProfile;
use common_pipeline_core::processors::profile::Profile;
use common_pipeline_core::InputError;
use common_settings::Settings;
Expand Down Expand Up @@ -230,6 +231,10 @@ pub trait TableContext: Send + Sync {
/// Get license key from context, return empty if license is not found or error happened.
fn get_license_key(&self) -> String;

fn add_query_profiles(&self, profiles: &[PlanProfile]);

fn get_query_profiles(&self) -> Vec<PlanProfile>;

fn set_runtime_filter(&self, filters: (usize, Vec<Expr<String>>));

fn get_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async-trait = { workspace = true }
futures = { workspace = true }
minitrace = { workspace = true }
petgraph = "0.6.2"
serde = { workspace = true }

[dev-dependencies]
serde = { workspace = true }
Expand Down
9 changes: 6 additions & 3 deletions src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_exception::Result;
use crate::pipe::Pipe;
use crate::pipe::PipeItem;
use crate::processors::profile::PlanScope;
use crate::processors::profile::Profile;
use crate::processors::DuplicateProcessor;
use crate::processors::InputPort;
use crate::processors::OutputPort;
Expand Down Expand Up @@ -74,7 +75,7 @@ impl Debug for Pipeline {
pub type InitCallback = Box<dyn FnOnce() -> Result<()> + Send + Sync + 'static>;

pub type FinishedCallback =
Box<dyn FnOnce(&Option<ErrorCode>) -> Result<()> + Send + Sync + 'static>;
Box<dyn FnOnce(&Result<Vec<Arc<Profile>>, ErrorCode>) -> Result<()> + Send + Sync + 'static>;

impl Pipeline {
pub fn create() -> Pipeline {
Expand Down Expand Up @@ -416,7 +417,9 @@ impl Pipeline {
self.on_init = Some(Box::new(f));
}

pub fn set_on_finished<F: FnOnce(&Option<ErrorCode>) -> Result<()> + Send + Sync + 'static>(
pub fn set_on_finished<
F: FnOnce(&Result<Vec<Arc<Profile>>, ErrorCode>) -> Result<()> + Send + Sync + 'static,
>(
&mut self,
f: F,
) {
Expand Down Expand Up @@ -465,7 +468,7 @@ impl Drop for Pipeline {
fn drop(&mut self) {
// An error may have occurred before the executor was created.
if let Some(on_finished) = self.on_finished.take() {
let cause = Some(ErrorCode::Internal(
let cause = Err(ErrorCode::Internal(
"Pipeline illegal state: not successfully shutdown.",
));

Expand Down
35 changes: 35 additions & 0 deletions src/query/pipeline/core/src/processors/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,41 @@ impl Profile {
}
}

#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct PlanProfile {
pub id: Option<u32>,
pub name: Option<String>,
pub parent_id: Option<u32>,

/// The time spent to process in nanoseconds
pub cpu_time: usize,
/// The time spent to wait in nanoseconds, usually used to
/// measure the time spent on waiting for I/O
pub wait_time: usize,
}

impl PlanProfile {
pub fn create(profile: &Profile) -> PlanProfile {
PlanProfile {
id: profile.plan_id,
name: profile.plan_name.clone(),
parent_id: profile.plan_parent_id,
cpu_time: profile.cpu_time.load(Ordering::SeqCst) as usize,
wait_time: profile.wait_time.load(Ordering::SeqCst) as usize,
}
}

pub fn accumulate(&mut self, profile: &Profile) {
self.cpu_time += profile.cpu_time.load(Ordering::SeqCst) as usize;
self.wait_time += profile.wait_time.load(Ordering::SeqCst) as usize;
}

pub fn merge(&mut self, profile: &PlanProfile) {
self.cpu_time += profile.cpu_time;
self.wait_time += profile.wait_time;
}
}

pub struct PlanScopeGuard {
idx: usize,
scope_size: Arc<AtomicUsize>,
Expand Down
10 changes: 6 additions & 4 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,15 +371,15 @@ impl DataExchangeManager {
let query_id = ctx.get_id();
let mut statistics_receiver = statistics_receiver.lock();

statistics_receiver.shutdown(may_error.is_some());
statistics_receiver.shutdown(may_error.is_err());
ctx.get_exchange_manager().on_finished_query(&query_id);
statistics_receiver.wait_shutdown()?;

on_finished(may_error)?;

match may_error {
None => Ok(()),
Some(error_code) => Err(error_code.clone()),
Ok(_) => Ok(()),
Err(error_code) => Err(error_code.clone()),
}
});

Expand Down Expand Up @@ -772,7 +772,9 @@ impl QueryCoordinator {

Thread::named_spawn(Some(String::from("Distributed-Executor")), move || {
let _g = span.set_local_parent();
statistics_sender.shutdown(executor.execute().err());
let res = executor.execute().err();
let profiles = executor.get_inner().get_profiles();
statistics_sender.shutdown(res, profiles);
query_ctx
.get_exchange_manager()
.on_finished_query(&query_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ impl BlockMetaTransform<ExchangeDeserializeMeta> for TransformExchangeDeserializ
match meta.packet.pop().unwrap() {
DataPacket::ErrorCode(v) => Err(v),
DataPacket::Dictionary(_) => unreachable!(),
DataPacket::FetchProgress => unreachable!(),
DataPacket::SerializeProgress { .. } => unreachable!(),
DataPacket::CopyStatus { .. } => unreachable!(),
DataPacket::MergeStatus { .. } => unreachable!(),
DataPacket::QueryProfiles(_) => unreachable!(),
DataPacket::FragmentData(v) => self.recv_data(meta.packet, v),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,17 @@ impl StatisticsReceiver {
Ok(Some(DataPacket::ErrorCode(error))) => Err(error),
Ok(Some(DataPacket::Dictionary(_))) => unreachable!(),
Ok(Some(DataPacket::FragmentData(_))) => unreachable!(),
Ok(Some(DataPacket::FetchProgress)) => unreachable!(),
Ok(Some(DataPacket::SerializeProgress(progress))) => {
for progress_info in progress {
progress_info.inc(ctx);
}

Ok(false)
}
Ok(Some(DataPacket::QueryProfiles(profiles))) => {
ctx.add_query_profiles(&profiles);
Ok(false)
}
Ok(Some(DataPacket::CopyStatus(status))) => {
log::info!("merge CopyStatus for {} files", status.files.len());
ctx.get_copy_status().merge(status);
Expand Down
48 changes: 43 additions & 5 deletions src/query/service/src/api/rpc/exchange/statistics_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -22,6 +24,8 @@ use common_base::runtime::TrySpawn;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::profile::PlanProfile;
use common_pipeline_core::processors::profile::Profile;
use common_storage::MergeStatus;
use futures_util::future::Either;
use log::warn;
Expand All @@ -34,7 +38,7 @@ use crate::sessions::QueryContext;

pub struct StatisticsSender {
_spawner: Arc<QueryContext>,
shutdown_flag_sender: Sender<Option<ErrorCode>>,
shutdown_flag_sender: Sender<(Option<ErrorCode>, Vec<Arc<Profile>>)>,
join_handle: Option<JoinHandle<()>>,
}

Expand All @@ -55,12 +59,17 @@ impl StatisticsSender {
let mut sleep_future = Box::pin(sleep(Duration::from_millis(100)));
let mut notified = Box::pin(shutdown_flag_receiver.recv());

let mut query_profiles = vec![];
loop {
match futures::future::select(sleep_future, notified).await {
Either::Right((Ok(None), _)) | Either::Right((Err(_), _)) => {
Either::Right((Err(_), _)) => {
break;
}
Either::Right((Ok(Some(error_code)), _recv)) => {
Either::Right((Ok((None, profiles)), _)) => {
query_profiles = profiles;
break;
}
Either::Right((Ok((Some(error_code), _profiles)), _recv)) => {
let data = DataPacket::ErrorCode(error_code);
if let Err(error_code) = tx.send(data).await {
warn!(
Expand All @@ -83,6 +92,10 @@ impl StatisticsSender {
}
}

if let Err(error) = Self::send_profile(query_profiles, &tx).await {
warn!("Profiles send has error, cause: {:?}.", error);
}

if let Err(error) = Self::send_copy_status(&ctx, &tx).await {
warn!("CopyStatus send has error, cause: {:?}.", error);
}
Expand All @@ -104,12 +117,12 @@ impl StatisticsSender {
}
}

pub fn shutdown(&mut self, error: Option<ErrorCode>) {
pub fn shutdown(&mut self, error: Option<ErrorCode>, profiles: Vec<Arc<Profile>>) {
let shutdown_flag_sender = self.shutdown_flag_sender.clone();

let join_handle = self.join_handle.take();
futures::executor::block_on(async move {
if let Err(error_code) = shutdown_flag_sender.send(error).await {
if let Err(error_code) = shutdown_flag_sender.send((error, profiles)).await {
warn!(
"Cannot send data via flight exchange, cause: {:?}",
error_code
Expand Down Expand Up @@ -161,6 +174,31 @@ impl StatisticsSender {
Ok(())
}

async fn send_profile(profiles: Vec<Arc<Profile>>, flight_sender: &FlightSender) -> Result<()> {
let mut merged_profiles = HashMap::new();

for proc_profile in profiles {
if proc_profile.plan_id.is_some() {
match merged_profiles.entry(proc_profile.plan_id) {
Entry::Vacant(v) => {
v.insert(PlanProfile::create(&proc_profile));
}
Entry::Occupied(mut v) => {
v.get_mut().accumulate(&proc_profile);
}
};
}
}

if !merged_profiles.is_empty() {
let data_packet =
DataPacket::QueryProfiles(merged_profiles.into_values().collect::<Vec<_>>());
flight_sender.send(data_packet).await?;
}

Ok(())
}

fn fetch_progress(ctx: &Arc<QueryContext>) -> Result<Vec<ProgressInfo>> {
let mut progress_info = vec![];

Expand Down
14 changes: 9 additions & 5 deletions src/query/service/src/api/rpc/packets/packet_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use byteorder::WriteBytesExt;
use common_arrow::arrow_format::flight::data::FlightData;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::profile::PlanProfile;
use common_storage::CopyStatus;
use common_storage::MergeStatus;
use log::error;
Expand Down Expand Up @@ -53,7 +54,7 @@ pub enum DataPacket {
ErrorCode(ErrorCode),
Dictionary(FlightData),
FragmentData(FragmentData),
FetchProgress,
QueryProfiles(Vec<PlanProfile>),
SerializeProgress(Vec<ProgressInfo>),
CopyStatus(CopyStatus),
MergeStatus(MergeStatus),
Expand All @@ -67,12 +68,12 @@ impl DataPacket {
pub fn bytes_size(&self) -> usize {
match self {
DataPacket::ErrorCode(_) => 0,
DataPacket::FetchProgress => 0,
DataPacket::CopyStatus(_) => 0,
DataPacket::MergeStatus(_) => 0,
DataPacket::SerializeProgress(_) => 0,
DataPacket::Dictionary(v) => calc_size(v),
DataPacket::FragmentData(v) => calc_size(&v.data) + v.meta.len(),
DataPacket::QueryProfiles(_) => 0,
}
}
}
Expand All @@ -87,9 +88,9 @@ impl TryFrom<DataPacket> for FlightData {
FlightData::from(error)
}
DataPacket::FragmentData(fragment_data) => FlightData::from(fragment_data),
DataPacket::FetchProgress => FlightData {
DataPacket::QueryProfiles(profiles) => FlightData {
app_metadata: vec![0x03],
data_body: vec![],
data_body: serde_json::to_vec(&profiles)?,
data_header: vec![],
flight_descriptor: None,
},
Expand Down Expand Up @@ -154,7 +155,10 @@ impl TryFrom<FlightData> for DataPacket {
flight_data,
)?)),
0x02 => Ok(DataPacket::ErrorCode(ErrorCode::try_from(flight_data)?)),
0x03 => Ok(DataPacket::FetchProgress),
0x03 => {
let status = serde_json::from_slice::<Vec<PlanProfile>>(&flight_data.data_body)?;
Ok(DataPacket::QueryProfiles(status))
}
0x04 => {
let mut bytes = flight_data.data_body.as_slice();
let progress_size = bytes.read_u64::<BigEndian>()?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/common/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn do_hook_compact(
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);

let compact_start_at = Instant::now();
if err.is_none() {
if err.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, need_lock)
Expand Down
9 changes: 8 additions & 1 deletion src/query/service/src/interpreters/common/query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,16 @@ impl InterpreterQueryLog {
server_version: "".to_string(),
session_settings,
extra: "".to_string(),
has_profiles: false,
})
}

pub fn log_finish(ctx: &QueryContext, now: SystemTime, err: Option<ErrorCode>) -> Result<()> {
pub fn log_finish(
ctx: &QueryContext,
now: SystemTime,
err: Option<ErrorCode>,
has_profiles: bool,
) -> Result<()> {
ctx.set_finish_time(now);
// User.
let handler_type = ctx.get_current_session().get_type().to_string();
Expand Down Expand Up @@ -316,6 +322,7 @@ impl InterpreterQueryLog {
server_version: "".to_string(),
session_settings,
extra: "".to_string(),
has_profiles,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn hook_refresh_agg_index(
.get_enable_refresh_aggregating_index_after_write()?
{
pipeline.set_on_finished(move |err| {
if err.is_none() {
if err.is_ok() {
info!("execute pipeline finished successfully, starting run generate aggregating index job.");
match GlobalIORuntime::instance().block_on({
refresh_agg_index(ctx, desc)
Expand Down
Loading

0 comments on commit 676dc82

Please sign in to comment.