Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tools/scxtop/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3031,6 +3031,9 @@ impl<'a> App<'a> {

/// Renders the perf top view with symbolized samples.
fn render_perf_top(&mut self, frame: &mut Frame) -> Result<()> {
// Process async symbolization responses to update symbol names
self.symbol_data.process_async_responses();

let area = frame.area();

// Split the area into left (table) and right (details) sections
Expand Down Expand Up @@ -5100,6 +5103,9 @@ impl<'a> App<'a> {
pub fn on_perf_sample(&mut self, action: &crate::PerfSampleAction) {
// Only process perf samples when in PerfTop state
if self.state == AppState::PerfTop {
// Process any pending async symbolization responses first
self.symbol_data.process_async_responses();

// Get layer ID from BPF sample (negative if not present)
let layer_id = if action.layer_id >= 0 {
Some(action.layer_id)
Expand Down
301 changes: 283 additions & 18 deletions tools/scxtop/src/symbol_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,191 @@ use blazesym::symbolize::{Input, Sym, Symbolizer};
use blazesym::Pid;
use std::collections::{BTreeMap, HashMap};
use std::path::PathBuf;
use std::sync::mpsc;
use std::thread;

const MAX_SYMBOLS: usize = 5000;
const MAX_STACK_TRACES_PER_SYMBOL: usize = 10; // Limit stack traces per symbol
const MAX_STACK_DEPTH: usize = 255;

/// Request for symbolization sent to the worker thread
#[derive(Debug, Clone)]
pub struct SymbolizationRequest {
pub address: u64,
pub pid: u32,
pub is_kernel: bool,
pub request_id: u64,
}

/// Response from symbolization worker thread
#[derive(Debug, Clone)]
pub struct SymbolizationResponse {
pub request_id: u64,
pub address: u64,
pub symbol_info: SymbolInfo,
}

/// Handle for the async symbolization worker
pub struct AsyncSymbolizer {
request_sender: mpsc::Sender<SymbolizationRequest>,
response_receiver: mpsc::Receiver<SymbolizationResponse>,
next_request_id: u64,
}

impl std::fmt::Debug for AsyncSymbolizer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncSymbolizer")
.field("next_request_id", &self.next_request_id)
.finish()
}
}

impl AsyncSymbolizer {
pub fn new() -> Self {
let (req_tx, req_rx) = mpsc::channel::<SymbolizationRequest>();
let (resp_tx, resp_rx) = mpsc::channel::<SymbolizationResponse>();

// Spawn the worker thread
thread::spawn(move || {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth switching to tokio::spawn and tokio::mpsc for uniformity, you can still do a receive in a sync context with try_recv which I think is appropriate for this use case.

let symbolizer = Symbolizer::new();
let mut symbol_cache: HashMap<u64, SymbolInfo> = HashMap::new();

while let Ok(request) = req_rx.recv() {
let symbol_info = if let Some(cached_info) = symbol_cache.get(&request.address) {
cached_info.clone()
} else {
let info = Self::symbolize_address_sync(
&symbolizer,
request.address,
request.pid,
request.is_kernel,
);
symbol_cache.insert(request.address, info.clone());
info
};

let response = SymbolizationResponse {
request_id: request.request_id,
address: request.address,
symbol_info,
};

if resp_tx.send(response).is_err() {
// Receiver is closed, exit the thread
break;
}
}
});

Self {
request_sender: req_tx,
response_receiver: resp_rx,
next_request_id: 1,
}
}

pub fn request_symbolization(&mut self, address: u64, pid: u32, is_kernel: bool) -> u64 {
let request_id = self.next_request_id;
self.next_request_id += 1;

let request = SymbolizationRequest {
address,
pid,
is_kernel,
request_id,
};

// Send request (ignore errors if receiver is closed)
let _ = self.request_sender.send(request);
request_id
}

pub fn try_recv_responses(&self) -> Vec<SymbolizationResponse> {
let mut responses = Vec::new();
while let Ok(response) = self.response_receiver.try_recv() {
responses.push(response);
}
responses
}

fn symbolize_address_sync(
symbolizer: &Symbolizer,
address: u64,
pid: u32,
is_kernel: bool,
) -> SymbolInfo {
let addrs: &[u64] = &[address];

let src = if is_kernel || pid == 0 {
let kernel = Kernel::default();
Source::Kernel(kernel)
} else {
let process = Process::new(Pid::from(pid));
Source::Process(process)
};

let input = Input::AbsAddr(addrs);

match symbolizer.symbolize(&src, input) {
Ok(symbolized) => {
if let Some(sym_result) = symbolized.first() {
match sym_result {
blazesym::symbolize::Symbolized::Sym(sym) => {
Self::extract_symbol_info_sync(sym, address, is_kernel)
}
blazesym::symbolize::Symbolized::Unknown(_) => {
Self::unknown_symbol_info_sync(address, is_kernel)
}
}
} else {
Self::unknown_symbol_info_sync(address, is_kernel)
}
}
Err(_) => Self::unknown_symbol_info_sync(address, is_kernel),
}
}

fn extract_symbol_info_sync(sym: &Sym, address: u64, is_kernel: bool) -> SymbolInfo {
let symbol_name = sym.name.to_string();
let module_name = if let Some(module) = &sym.module {
PathBuf::from(module)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("unknown")
.to_string()
} else if is_kernel {
"kernel".to_string()
} else {
"unknown".to_string()
};

let (file_name, line_number) = if let Some(code_info) = &sym.code_info {
let file: Option<String> = Some(code_info.file.to_string_lossy().to_string());
let line = code_info.line;
(file, line)
} else {
(None, None)
};

SymbolInfo {
symbol_name,
module_name,
file_name,
line_number,
address,
}
}

fn unknown_symbol_info_sync(address: u64, is_kernel: bool) -> SymbolInfo {
SymbolInfo {
symbol_name: format!("0x{address:x}"),
module_name: if is_kernel { "kernel" } else { "unknown" }.to_string(),
file_name: None,
line_number: None,
address,
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SymbolInfo {
Expand Down Expand Up @@ -46,8 +231,6 @@ pub struct SymbolSample {
pub layer_id: Option<i32>,
}

const MAX_SYMBOLS: usize = 5000;

#[derive(Debug)]
pub struct SymbolData {
/// Map from address to symbol information
Expand All @@ -56,10 +239,14 @@ pub struct SymbolData {
symbol_samples: BTreeMap<String, SymbolSample>,
/// Total sample count for percentage calculation
total_samples: u64,
/// Symbolizer instance
/// Async symbolizer for background symbolization
async_symbolizer: AsyncSymbolizer,
/// Symbolizer instance for immediate symbolization when needed
symbolizer: Symbolizer,
/// Counter for insertion order (FIFO)
insertion_counter: u64,
/// Pending symbolization requests
pending_requests: HashMap<u64, (u64, u32, bool)>, // request_id -> (address, pid, is_kernel)
}

impl Default for SymbolData {
Expand All @@ -74,8 +261,45 @@ impl SymbolData {
symbol_cache: HashMap::new(),
symbol_samples: BTreeMap::new(),
total_samples: 0,
async_symbolizer: AsyncSymbolizer::new(),
symbolizer: Symbolizer::new(),
insertion_counter: 0,
pending_requests: HashMap::new(),
}
}

/// Process any pending async symbolization responses
pub fn process_async_responses(&mut self) {
let responses = self.async_symbolizer.try_recv_responses();
for response in responses {
// Update the symbol cache with the symbolized result
self.symbol_cache
.insert(response.address, response.symbol_info);

// Remove from pending requests
self.pending_requests.remove(&response.request_id);
}
}

/// Request async symbolization for an address
fn request_async_symbolization(&mut self, address: u64, pid: u32, is_kernel: bool) {
// Don't request if already cached or already pending
if self.symbol_cache.contains_key(&address) {
return;
}

// Check if we already have a pending request for this address
let already_pending = self
.pending_requests
.values()
.any(|(pending_addr, _, _)| *pending_addr == address);

if !already_pending {
let request_id = self
.async_symbolizer
.request_symbolization(address, pid, is_kernel);
self.pending_requests
.insert(request_id, (address, pid, is_kernel));
}
}

Expand All @@ -92,29 +316,58 @@ impl SymbolData {
) {
self.total_samples += 1;

// Process any pending async symbolization responses first
self.process_async_responses();

// Try to get symbol info from cache first
let symbol_info = if let Some(cached_info) = self.symbol_cache.get(&address) {
cached_info.clone()
} else {
// Symbolize the address
let symbol_info = self.symbolize_address(address, pid, is_kernel);
self.symbol_cache.insert(address, symbol_info.clone());
symbol_info
// Request async symbolization for this address
self.request_async_symbolization(address, pid, is_kernel);

// For immediate display, create a temporary symbol info with the address
// The real symbolized name will be updated when the async response arrives
SymbolInfo {
symbol_name: format!("0x{address:x}"),
module_name: if is_kernel { "kernel" } else { "unknown" }.to_string(),
file_name: None,
line_number: None,
address,
}
};

// Also request async symbolization for stack addresses (limited depth to prevent overload)
for &stack_addr in kernel_stack.iter().take(MAX_STACK_DEPTH) {
if stack_addr != 0 {
self.request_async_symbolization(stack_addr, pid, true);
}
}
for &stack_addr in user_stack.iter().take(MAX_STACK_DEPTH) {
if stack_addr != 0 {
self.request_async_symbolization(stack_addr, pid, false);
}
}

// Truncate stacks to prevent excessive memory usage
let kernel_stack_filtered: Vec<u64> = kernel_stack
.iter()
.take(MAX_STACK_DEPTH)
.filter(|&&addr| addr != 0)
.copied()
.collect();
let user_stack_filtered: Vec<u64> = user_stack
.iter()
.take(MAX_STACK_DEPTH)
.filter(|&&addr| addr != 0)
.copied()
.collect();

// Create raw stack trace if we have stack data (don't symbolize yet)
let stack_trace = if !kernel_stack.is_empty() || !user_stack.is_empty() {
let stack_trace = if !kernel_stack_filtered.is_empty() || !user_stack_filtered.is_empty() {
Some(RawStackTrace {
kernel_stack: kernel_stack
.iter()
.filter(|&&addr| addr != 0)
.copied()
.collect(),
user_stack: user_stack
.iter()
.filter(|&&addr| addr != 0)
.copied()
.collect(),
kernel_stack: kernel_stack_filtered,
user_stack: user_stack_filtered,
count: 1,
pid,
})
Expand Down Expand Up @@ -178,6 +431,18 @@ impl SymbolData {
}
}
if !found_existing {
// Limit the number of stack traces per symbol to prevent OOMs
if sample.stack_traces.len() >= MAX_STACK_TRACES_PER_SYMBOL {
// Remove the least common stack trace (lowest count)
if let Some((min_idx, _)) = sample
.stack_traces
.iter()
.enumerate()
.min_by_key(|(_, trace)| trace.count)
{
sample.stack_traces.remove(min_idx);
}
}
sample.stack_traces.push(new_trace);
}
}
Expand Down