Skip to content

Commit

Permalink
feat(diagnosis): Dump JVM stack trace in await-tree-dump (#17254)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
fuyufjh and github-actions[bot] authored Jun 14, 2024
1 parent 43d9f43 commit 39e79db
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 5 deletions.
8 changes: 6 additions & 2 deletions dashboard/pages/await_tree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,14 @@ export default function AwaitTreeDump() {
.join("\n")
const barrierWorkerState = _(response.barrierWorkerState)
.entries()
.map(([k, v]) => `[BarrierWorkerState ${k}]\n${v}`)
.map(([k, v]) => `[BarrierWorkerState (Worker ${k})]\n${v}`)
.join("\n")
const jvmStackTraces = _(response.jvmStackTraces)
.entries()
.map(([k, v]) => `[JVM (Worker ${k})]\n${v}`)
.join("\n")

result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}\n${barrierWorkerState}`
result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}\n${barrierWorkerState}\n\n${jvmStackTraces}`
} catch (e: any) {
result = `${title}\n\nERROR: ${e.message}\n${e.cause}`
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector.api;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;

public class Monitor {

public static String dumpStackTrace() {
StringBuilder builder = new StringBuilder();
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
builder.append(ti.toString());
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public void validate(
} catch (SQLException e) {
LOG.error("failed to connect to target database. jdbcUrl: {}", jdbcUrl, e);
throw Status.INVALID_ARGUMENT
.withDescription("failed to connect to target database: " + e.getSQLState())
.withDescription(
"failed to connect to target database: "
+ e.getSQLState()
+ ": "
+ e.getMessage())
.asRuntimeException();
}

Expand Down
3 changes: 2 additions & 1 deletion proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ message StackTraceResponse {
map<string, string> rpc_traces = 2;
map<string, string> compaction_task_traces = 3;
map<uint64, string> inflight_barrier_traces = 4;
map<uint32, string> barrier_worker_state = 5;
map<uint32, string> barrier_worker_state = 5; // key: worker id
map<uint32, string> jvm_stack_traces = 6; // key: worker id. Might be empty if the worker doesn't run JVM.
}

// CPU profiling
Expand Down
22 changes: 22 additions & 0 deletions src/common/src/util/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ impl<'a> Display for StackTraceResponseOutput<'a> {
writeln!(s, ">> Worker {worker_id}")?;
writeln!(s, "{state}\n")?;
}

if !self.jvm_stack_traces.is_empty() {
writeln!(s, "\n\n--- JVM Stack Traces ---")?;
for (worker_id, state) in &self.jvm_stack_traces {
writeln!(s, ">> Worker {worker_id}")?;
writeln!(s, "{state}\n")?;
}
}

Ok(())
}
}
Expand All @@ -102,6 +111,19 @@ impl StackTraceResponse {
}
}
}
for (worker_id, worker_state) in b.jvm_stack_traces {
match self.jvm_stack_traces.entry(worker_id) {
Entry::Occupied(_entry) => {
warn!(
worker_id,
worker_state, "duplicate jvm stack trace. skipped"
);
}
Entry::Vacant(entry) => {
entry.insert(worker_state);
}
}
}
}

pub fn output(&self) -> StackTraceResponseOutput<'_> {
Expand Down
13 changes: 13 additions & 0 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use itertools::Itertools;
use prometheus::core::Collector;
use risingwave_common::config::{MetricLevel, ServerConfig};
use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX};
use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces;
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
AnalyzeHeapRequest, AnalyzeHeapResponse, BackPressureInfo, GetBackPressureRequest,
Expand Down Expand Up @@ -102,6 +103,12 @@ impl MonitorService for MonitorServiceImpl {

let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;

let jvm_stack_traces = match dump_jvm_stack_traces() {
Ok(None) => None,
Err(err) => Some(err.as_report().to_string()),
Ok(Some(stack_traces)) => Some(stack_traces),
};

Ok(Response::new(StackTraceResponse {
actor_traces,
rpc_traces,
Expand All @@ -111,6 +118,12 @@ impl MonitorService for MonitorServiceImpl {
self.stream_mgr.env.worker_id(),
barrier_worker_state,
)]),
jvm_stack_traces: match jvm_stack_traces {
Some(stack_traces) => {
BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)])
}
None => BTreeMap::new(),
},
}))
}

Expand Down
35 changes: 34 additions & 1 deletion src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::util::resource_util::memory::system_memory_available_byte
use thiserror_ext::AsReport;
use tracing::error;

use crate::call_method;
use crate::{call_method, call_static_method};

/// Use 10% of compute total memory by default. Compute node uses 0.7 * system memory by default.
const DEFAULT_MEMORY_PROPORTION: f64 = 0.07;
Expand Down Expand Up @@ -241,3 +241,36 @@ pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result<Str
let java_str = env.get_string(&jstr)?;
Ok(java_str.to_str()?.to_string())
}

/// Dumps the JVM stack traces.
///
/// # Returns
///
/// - `Ok(None)` if JVM is not initialized.
/// - `Ok(Some(String))` if JVM is initialized and stack traces are dumped.
/// - `Err` if failed to dump stack traces.
pub fn dump_jvm_stack_traces() -> anyhow::Result<Option<String>> {
match JVM.get() {
None => Ok(None),
Some(jvm) => {
let mut env = jvm
.attach_current_thread()
.with_context(|| "Failed to attach thread to JVM")?;

let result = call_static_method!(
env,
{com.risingwave.connector.api.Monitor},
{String dumpStackTrace()}
)
.with_context(|| "Failed to call Java function")?;
let result = JString::from(result);
let result = env
.get_string(&result)
.with_context(|| "Failed to convert JString")?;
let result = result
.to_str()
.with_context(|| "Failed to convert JavaStr")?;
Ok(Some(result.to_string()))
}
}
}

0 comments on commit 39e79db

Please sign in to comment.