From 2e32f4d4068b91e371e78cd358bf5da5f4a279c4 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 14 Jun 2024 12:05:45 +0800 Subject: [PATCH 1/4] better error format: failed to connect to target database... --- .../main/java/com/risingwave/connector/JDBCSinkFactory.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java index 835c7b969366..875bda89681d 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java @@ -75,7 +75,11 @@ public void validate( } catch (SQLException e) { LOG.error("failed to connect to target database. jdbcUrl: {}", jdbcUrl, e); throw Status.INVALID_ARGUMENT - .withDescription("failed to connect to target database: " + e.getSQLState()) + .withDescription( + "failed to connect to target database: " + + e.getSQLState() + + ": " + + e.getMessage()) .asRuntimeException(); } From 78e8c49a82af6ebaa4cd6c72b4fe14b09a54479c Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 14 Jun 2024 12:06:17 +0800 Subject: [PATCH 2/4] print stack trace from JVM --- .../com/risingwave/connector/api/Monitor.java | 17 +++++++++ proto/monitor_service.proto | 3 +- src/common/src/util/prost.rs | 22 ++++++++++++ .../src/rpc/service/monitor_service.rs | 13 +++++++ src/jni_core/src/jvm_runtime.rs | 35 ++++++++++++++++++- 5 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java new file mode 100644 index 000000000000..17e1cec263ac --- /dev/null +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java @@ -0,0 +1,17 @@ +package com.risingwave.connector.api; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + +public class Monitor { + + public static String dumpStackTrace() { + StringBuilder builder = new StringBuilder(); + ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); + for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) { + builder.append(ti.toString()); + } + return builder.toString(); + } +} diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto index b9238c8d3686..03013c127fd8 100644 --- a/proto/monitor_service.proto +++ b/proto/monitor_service.proto @@ -12,7 +12,8 @@ message StackTraceResponse { map rpc_traces = 2; map compaction_task_traces = 3; map inflight_barrier_traces = 4; - map barrier_worker_state = 5; + map barrier_worker_state = 5; // key: worker id + map jvm_stack_traces = 6; // key: worker id. Might be empty if the worker doesn't run JVM. } // CPU profiling diff --git a/src/common/src/util/prost.rs b/src/common/src/util/prost.rs index d5d8501b8b81..8145a37a8a20 100644 --- a/src/common/src/util/prost.rs +++ b/src/common/src/util/prost.rs @@ -77,6 +77,15 @@ impl<'a> Display for StackTraceResponseOutput<'a> { writeln!(s, ">> Worker {worker_id}")?; writeln!(s, "{state}\n")?; } + + if !self.jvm_stack_traces.is_empty() { + writeln!(s, "\n\n--- JVM Stack Traces ---")?; + for (worker_id, state) in &self.jvm_stack_traces { + writeln!(s, ">> Worker {worker_id}")?; + writeln!(s, "{state}\n")?; + } + } + Ok(()) } } @@ -102,6 +111,19 @@ impl StackTraceResponse { } } } + for (worker_id, worker_state) in b.jvm_stack_traces { + match self.jvm_stack_traces.entry(worker_id) { + Entry::Occupied(_entry) => { + warn!( + worker_id, + worker_state, "duplicate jvm stack trace. skipped" + ); + } + Entry::Vacant(entry) => { + entry.insert(worker_state); + } + } + } } pub fn output(&self) -> StackTraceResponseOutput<'_> { diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 6723528fcd1b..e1c6247296b0 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -22,6 +22,7 @@ use itertools::Itertools; use prometheus::core::Collector; use risingwave_common::config::{MetricLevel, ServerConfig}; use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX}; +use risingwave_jni_core::jvm_runtime::dump_jvm_stack_traces; use risingwave_pb::monitor_service::monitor_service_server::MonitorService; use risingwave_pb::monitor_service::{ AnalyzeHeapRequest, AnalyzeHeapResponse, BackPressureInfo, GetBackPressureRequest, @@ -102,6 +103,12 @@ impl MonitorService for MonitorServiceImpl { let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?; + let jvm_stack_traces = match dump_jvm_stack_traces() { + Ok(None) => None, + Err(err) => Some(err.as_report().to_string()), + Ok(Some(stack_traces)) => Some(stack_traces), + }; + Ok(Response::new(StackTraceResponse { actor_traces, rpc_traces, @@ -111,6 +118,12 @@ impl MonitorService for MonitorServiceImpl { self.stream_mgr.env.worker_id(), barrier_worker_state, )]), + jvm_stack_traces: match jvm_stack_traces { + Some(stack_traces) => { + BTreeMap::from_iter([(self.stream_mgr.env.worker_id(), stack_traces)]) + } + None => BTreeMap::new(), + }, })) } diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs index e596d5664dac..53818c17e40d 100644 --- a/src/jni_core/src/jvm_runtime.rs +++ b/src/jni_core/src/jvm_runtime.rs @@ -25,7 +25,7 @@ use risingwave_common::util::resource_util::memory::system_memory_available_byte use thiserror_ext::AsReport; use tracing::error; -use crate::call_method; +use crate::{call_method, call_static_method}; /// Use 10% of compute total memory by default. Compute node uses 0.7 * system memory by default. const DEFAULT_MEMORY_PROPORTION: f64 = 0.07; @@ -241,3 +241,36 @@ pub fn jobj_to_str(env: &mut JNIEnv<'_>, obj: JObject<'_>) -> anyhow::Result anyhow::Result> { + match JVM.get() { + None => Ok(None), + Some(jvm) => { + let mut env = jvm + .attach_current_thread() + .with_context(|| "Failed to attach thread to JVM")?; + + let result = call_static_method!( + env, + {com.risingwave.connector.api.Monitor}, + {String dumpStackTrace()} + ) + .with_context(|| "Failed to call Java function")?; + let result = JString::from(result); + let result = env + .get_string(&result) + .with_context(|| "Failed to convert JString")?; + let result = result + .to_str() + .with_context(|| "Failed to convert JavaStr")?; + Ok(Some(result.to_string())) + } + } +} From 9de40406fb0771a78b8556cd23c9ed44fca2b6b0 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 14 Jun 2024 13:44:13 +0800 Subject: [PATCH 3/4] web dashboard --- dashboard/pages/await_tree.tsx | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dashboard/pages/await_tree.tsx b/dashboard/pages/await_tree.tsx index 8163d4acc23c..6babcccf7e91 100644 --- a/dashboard/pages/await_tree.tsx +++ b/dashboard/pages/await_tree.tsx @@ -88,10 +88,14 @@ export default function AwaitTreeDump() { .join("\n") const barrierWorkerState = _(response.barrierWorkerState) .entries() - .map(([k, v]) => `[BarrierWorkerState ${k}]\n${v}`) + .map(([k, v]) => `[BarrierWorkerState (Worker ${k})]\n${v}`) + .join("\n") + const jvmStackTraces = _(response.jvmStackTraces) + .entries() + .map(([k, v]) => `[JVM (Worker ${k})]\n${v}`) .join("\n") - result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}\n${barrierWorkerState}` + result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}\n${barrierWorkerState}\n\n${jvmStackTraces}` } catch (e: any) { result = `${title}\n\nERROR: ${e.message}\n${e.cause}` } From 42f436b44a492f68bb381f6022124534d2b4fb82 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 14 Jun 2024 14:33:37 +0800 Subject: [PATCH 4/4] add license header Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .../com/risingwave/connector/api/Monitor.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java index 17e1cec263ac..5b89cfeb6e5d 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/Monitor.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector.api; import java.lang.management.ManagementFactory;