diff --git a/ebpf/ebpf.py b/ebpf/ebpf.py index 02a2a915..913dd1d1 100644 --- a/ebpf/ebpf.py +++ b/ebpf/ebpf.py @@ -1,13 +1,11 @@ #!/usr/bin/env python3 -import argparse import json import logging import os -import signal import resource -import sys -from time import sleep, strftime, time +import signal +from time import sleep, time from typing import Optional, Tuple import pexpect @@ -48,31 +46,6 @@ def update(self, size): self.size += size -def print_outstanding(bpf, pid, top, min_age_ns): - print(f"Top {top} stacks with outstanding allocations:") - alloc_info = {} - allocs = bpf["allocs"] - stack_traces = bpf["stack_traces"] - for address, info in sorted(allocs.items(), key=lambda a: a[1].size): - if BPF.monotonic_time() - min_age_ns < info.timestamp_ns or info.stack_id < 0: - continue - if info.stack_id in alloc_info: - alloc_info[info.stack_id].update(info.size) - else: - stack = list(stack_traces.walk(info.stack_id)) - combined = [] - for addr in stack: - func_name = bpf.sym(addr, pid, show_module=True, show_offset=True) - formatted_address = ('0x' + format(addr, '016x') + '\t').encode('utf-8') - combined.append(formatted_address + func_name) - alloc_info[info.stack_id] = Allocation(combined, info.size, address.value) - print(f"\taddr = {address.value} size = {info.size}") - to_show = sorted(alloc_info.values(), key=lambda a: a.size)[-top:] - for alloc in to_show: - stack = b"\n\t\t".join(alloc.stack).decode("ascii") - print(f"\t{alloc.size} bytes in {alloc.count} allocations from stack\n\t\t{stack}") - - def get_outstanding(bpf, pid, min_age_ns, top): alloc_info = {} allocs = bpf["allocs"] @@ -122,30 +95,6 @@ def is_positive(self): return self.alloc_size > self.free_size -def print_memory_statistics(bpf, pid, top): - stack_traces = bpf["stack_traces"] - print("stack traces", len(list(stack_traces.items()))) - combined_alloc = list( - sorted( - map(CombinedAlloc, bpf["combined_allocs"].items()), - key=lambda a: a.key(), - ) - ) - memory = sum((c.alloc_size - c.free_size for c in combined_alloc)) / 1024 - print("overall, allocated", memory, "kb in", len(combined_alloc), "allocations") - entries = [] - for allocation in combined_alloc[:top]: - trace = get_trace_info(bpf, pid, stack_traces, allocation.stack_id.value) - entry = f"\t{allocation.alloc_size - allocation.free_size} bytes in " \ - f"{allocation.number_of_allocs - allocation.number_of_frees}" \ - f" allocations from stack ({allocation.number_of_allocs + allocation.number_of_frees} allocs/frees)" \ - f"\n\t\t{trace}" - entries.append(entry) - - print(f"Top {top} stacks with outstanding allocations:") - print('\n'.join(reversed(entries))) - - def get_statistics(bpf, pid, min_age_ns, top): stack_traces = bpf["stack_traces"] combined_alloc = list( @@ -200,18 +149,6 @@ def walk_trace(stack_traces, stack_id): return [] -def print_outstanding_kernel_cache(bpf, top): - kernel_cache_allocs = list( - sorted(filter(lambda a: a[1].alloc_size > a[1].free_size, bpf['kernel_cache_counts'].items()), - key=lambda a: a[1].alloc_size - a[1].free_size) - )[:top] - if not kernel_cache_allocs: - return - print("---------------- Kernel Caches ---------------") - for (k, v) in kernel_cache_allocs: - print("Cache", str(k.name, "utf-8"), v.alloc_count - v.free_count, v.alloc_size - v.free_size) - - def gernel_kernel_cache(bpf, top): kernel_cache_allocs = list( sorted(filter(lambda a: a[1].alloc_size > a[1].free_size, bpf['kernel_cache_counts'].items()), @@ -237,21 +174,13 @@ def gernel_kernel_cache(bpf, top): return caches -def print_syscalls(bpf, top): - syscall_counts = bpf["syscall_counts"] - print("SYSCALL COUNT TIME") - for k, v in sorted(syscall_counts.items(), key=lambda kv: -kv[1].total_ns)[:top]: - print("%-22s %8d %16.3f" % (system_call_name(k.value), v.count, v.total_ns / 1e3)) - syscall_counts.clear() - - def get_syscalls(bpf): syscall_counts = bpf["syscall_counts"] syscalls = {} for k, v in syscall_counts.items(): key = k.value syscall_id = key >> 32 - thread_id = (1 << 32) & key + thread_id = ((1 << 32) - 1) & key syscalls.setdefault(thread_id, []).append({ 'name': system_call_name(syscall_id), @@ -268,28 +197,28 @@ def system_call_name(k): return syscall_name(k).decode('ascii') -def print_time(): - print("[%s]" % strftime("%H:%M:%S")) - - -def get_syscall_stacks(bpf, pid): +def get_additional_syscall_info(bpf, pid): syscall_counts_stacks = bpf['syscall_counts_stacks'] syscalls_per_thread = {} - stack_ids = [] - pid_to_syscall_times = {} + tid_to_syscall_timestamps = {} for k, v in syscall_counts_stacks.items(): - stack_set = syscalls_per_thread.setdefault(v.pid_tgid, {}).setdefault(system_call_name(v.id), - {'stacks': set(), 'number': 0}) - stack_set['stacks'].add(v.stack_id) - stack_set['number'] += 1 - stack_ids.append(v.stack_id) - pid_to_syscall_times.setdefault(v.pid_tgid, []).append(k.value) - - stacks = {stack_id: get_trace_info_as_list(bpf, pid, bpf['stack_traces'], stack_id) for stack_id in stack_ids} + thread_id = ((1 << 32) - 1) & v.pid_tgid + stack_dict = syscalls_per_thread.setdefault(thread_id, {}).setdefault(system_call_name(v.id), {}) + stack_trace = '\n\t'.join(get_trace_info_as_list(bpf, pid, bpf['stack_traces'], v.stack_id)) + stack_dict[stack_trace] = stack_dict.get(stack_trace, 0) + 1 + tid_to_syscall_timestamps.setdefault(thread_id, []).append(k.value) + + syscalls_per_thread = {thread_id: { + call_name: [ + { + 'stack_trace': trace.split('\n\t'), + 'count': count, + } for (trace, count) in per_syscall.items() + ] for (call_name, per_syscall) in calls.items()} + for (thread_id, calls) in syscalls_per_thread.items()} return { - 'syscalls': syscalls_per_thread, - 'stacks': stacks, - 'pid_to_syscall_times': pid_to_syscall_times + 'syscall_stacks': syscalls_per_thread, + 'tid_to_syscall_timestamps': tid_to_syscall_timestamps } @@ -300,19 +229,20 @@ def save_snapshot( top: int, snapshot_dir: str, with_memory: bool, - with_syscall_stacks: bool, + with_syscall_details: bool, snapshot_prefix: Optional[str] = None ): current_time_millis = int(round(time() * 1000)) snapshot = {'time': current_time_millis} + if with_memory: snapshot['memory_stats'] = get_statistics(bpf, pid, min_age_ns, top) snapshot['kernel_caches'] = gernel_kernel_cache(bpf, top) snapshot['syscalls'] = get_syscalls(bpf) - if with_syscall_stacks: - snapshot['syscall_stacks'] = get_syscall_stacks(bpf, pid) + if with_syscall_details: + snapshot['syscall_details'] = get_additional_syscall_info(bpf, pid) os.makedirs(snapshot_dir, exist_ok=True) with open(get_result_file_name(snapshot_dir, snapshot_prefix or 'snapshot', 'json'), 'w') as outfile: @@ -328,74 +258,6 @@ def get_result_file_name(dir_name: str, prefix: str, suffix: str): return path -class Arguments: - def __init__(self, args): - self.pid = args.pid - self.command = args.command - self.interval = args.interval - self.min_age_ns = 1e6 * args.older - self.alloc_sample_every_n = args.alloc_sample_rate - self.top = args.top - self.min_alloc_size = args.min_alloc_size - self.max_alloc_size = args.max_alloc_size - - if args.snapshots is None: - self.save_snapshots = './snapshots' - elif args.snapshots == -1: - self.save_snapshots = None - else: - self.save_snapshots = args.snapshots - - if self.min_alloc_size is not None and self.max_alloc_size is not None \ - and self.min_alloc_size > self.max_alloc_size: - print("min_size (-z) can't be greater than max_size (-Z)") - exit(1) - - if self.command is None and self.pid is None: - print("Either -p or -c must be specified") - exit(1) - - -def parse(): - parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("-p", "--pid", type=int, default=-1, - help="the PID to trace; if not specified, trace kernel allocs") - parser.add_argument("interval", nargs="?", default=5, type=int, - help="interval in seconds to print outstanding allocations") - parser.add_argument("-o", "--older", default=500, type=int, - help="prune allocations younger than this age in milliseconds") - parser.add_argument("-c", "--command", - help="execute and trace the specified command") - parser.add_argument("-s", "--alloc-sample-rate", default=1, type=int, - help="sample every N-th allocation to decrease the overhead") - parser.add_argument("-T", "--top", type=int, default=10, - help="display only this many top stats") - parser.add_argument("-z", "--min-alloc-size", type=int, - help="capture only allocations larger than this size") - parser.add_argument("-Z", "--max-alloc-size", type=int, - help="capture only allocations smaller than this size") - parser.add_argument("-S", "--snapshots", default=-1, type=str, nargs='?', - help="save statistics snapshots to the specified directory") - return Arguments(parser.parse_args()) - - -def attach_ebpf(pid: int = -1, - process: Optional[spawn] = None, - command: Optional[str] = None, - top: int = 10, - interval: int = 5, - alloc_sample_every_n: int = 1, - max_alloc_size: Optional[int] = None, - min_age_ns: int = 500, - min_alloc_size: Optional[int] = None, - save_snapshots: Optional[str] = None, - with_memory: bool = False): - (bpf, pid, process) = attach_probes(pid, process, command, alloc_sample_every_n, max_alloc_size, min_alloc_size, - with_memory) - - harvest_ebpf(bpf, pid, process, top, interval, min_age_ns, save_snapshots) - - def attach_probes( pid: int = -1, process: Optional[spawn] = None, @@ -404,7 +266,7 @@ def attach_probes( max_alloc_size: Optional[int] = None, min_alloc_size: Optional[int] = None, with_memory: bool = False, - syscall_stacks: bool = False, + syscall_details: bool = False, communicate_with_signals: bool = False ) -> Optional[Tuple[BPF, int, spawn]]: if pid == -1 and command is None and process is None: @@ -431,7 +293,7 @@ def attach_probes( f"-DPAGE_SIZE={resource.getpagesize()}", f"-DFILTER_BY_SIZE={get_size_filter(min_alloc_size, max_alloc_size)}", "-DWITH_MEMORY" if with_memory else "", - "-DCOLLECT_SYSCALL_STACK_INFO" if syscall_stacks else "", + "-DCOLLECT_SYSCALL_STACK_INFO" if syscall_details else "", ]) # Attaching probes @@ -445,7 +307,6 @@ def attach_probes( sleep(wait_interval) wait_time += wait_interval if process.terminated: - print(process.readline().decode('utf-8')) raise Exception(f'Process is already terminated, with status code {process.exitstatus}') if wait_time > timeout: raise Exception('Process is not alive') @@ -503,16 +364,6 @@ def sleep_and_check( break -def print_ebpf_info(bpf, pid, min_age_ns, top): - print_time() - print_memory_statistics(bpf, pid, top) - print_outstanding(bpf, pid, top, min_age_ns) - print_outstanding_kernel_cache(bpf, top) - print_syscalls(bpf, top) - print() - sys.stdout.flush() - - def harvest_ebpf( bpf: BPF, pid: int = -1, @@ -521,7 +372,7 @@ def harvest_ebpf( interval: int = 5, min_age_ns: int = 500, with_memory: bool = False, - with_syscall_stacks: bool = False, + with_syscall_details: bool = False, save_snapshots: Optional[str] = None, snapshot_prefix: Optional[str] = None, communicate_with_signals: bool = False, @@ -538,10 +389,7 @@ def harvest_ebpf( sleep_and_check(interval, pid, process, check_delay=0.2) except KeyboardInterrupt: break - if save_snapshots: - save_snapshot(bpf, pid, min_age_ns, top, save_snapshots, with_memory, with_syscall_stacks, snapshot_prefix) - else: - print_ebpf_info(bpf, pid, min_age_ns, top) + save_snapshot(bpf, pid, min_age_ns, top, save_snapshots, with_memory, with_syscall_details, snapshot_prefix) if is_terminated(pid, process): break @@ -551,18 +399,3 @@ def harvest_ebpf( logging.info("Detaching...") bpf.cleanup() - - -if __name__ == "__main__": - arguments = parse() - attach_ebpf( - pid=arguments.pid, - command=arguments.command, - top=arguments.top, - interval=arguments.interval, - alloc_sample_every_n=arguments.alloc_sample_every_n, - max_alloc_size=arguments.max_alloc_size, - min_age_ns=arguments.min_age_ns, - min_alloc_size=arguments.min_alloc_size, - save_snapshots=arguments.save_snapshots, - ) diff --git a/run.py b/run.py index eaa8cc72..dbe638e8 100755 --- a/run.py +++ b/run.py @@ -58,7 +58,7 @@ run_in_docker_container = False with_ebpf = False with_ebpf_memory = False -with_syscall_stacks = False +with_syscall_details = False main_dir_path = "./db_main/" storage_disk_paths = [ @@ -145,9 +145,8 @@ def run( run_index: int, runs_count: int, with_ebpf: bool, - with_ebpf_memory: bool -, - with_syscall_stacks: bool) -> None: + with_ebpf_memory: bool, + with_syscall_details: bool) -> None: db_config_file_path = get_db_config_file_path(db_name, size) workloads_file_path = get_workloads_file_path(size) db_main_dir_path = get_db_main_dir_path(db_name, size, main_dir_path) @@ -157,7 +156,7 @@ def run( ) transactional_flag = "-t" if transactional else "" - lazy_flag = '-l' if with_ebpf else '' + lazy_flag = "-l" if with_ebpf else "" filter = ",".join(workload_names) db_storage_dir_paths = ",".join(db_storage_dir_paths) @@ -177,19 +176,20 @@ def run( from ebpf.ebpf import attach_probes, harvest_ebpf bpf, pid, process = attach_probes( process=process, + syscall_details=with_syscall_details, with_memory=with_ebpf_memory, communicate_with_signals=True, ) # Send SIGUSR1 to the process to notify it that the probes are attached os.kill(process.pid, signal.SIGUSR1) thread = Thread(target=harvest_ebpf, args=(bpf,), kwargs={ - 'process': process, - 'interval': 5, - 'with_memory': with_ebpf_memory, - 'with_syscall_stacks': with_syscall_stacks, - 'snapshot_prefix': "-".join(workload_names), - 'save_snapshots': f'./bench/ebpf/snapshots/{db_name}_{size}', - 'communicate_with_signals': True, + process: process, + "interval": 5, + "with_memory": with_ebpf_memory, + "with_syscall_details": with_syscall_details, + "snapshot_prefix": "-".join(workload_names), + "save_snapshots": f"./bench/ebpf/snapshots/{db_name}_{size}", + "communicate_with_signals": True, }) thread.start() if with_ebpf: @@ -223,7 +223,7 @@ def parse_args(): global run_in_docker_container global with_ebpf global with_ebpf_memory - global with_syscall_stacks + global with_syscall_details parser = argparse.ArgumentParser() @@ -321,10 +321,10 @@ def parse_args(): ) parser.add_argument( "-es", - "--with-ebpf-syscall-stacks", + "--with-ebpf-syscall-details", help="Collect eBPF syscall stack traces", - default=with_syscall_stacks, - dest="with_syscall_stacks", + default=with_syscall_details, + dest="with_syscall_details", action=argparse.BooleanOptionalAction ) @@ -341,7 +341,7 @@ def parse_args(): run_in_docker_container = args.run_docker with_ebpf = args.with_ebpf with_ebpf_memory = args.with_ebpf_memory - with_syscall_stacks = args.with_syscall_stacks + with_syscall_details = args.with_syscall_details def check_args(): @@ -356,9 +356,9 @@ def check_args(): if not workload_names: sys.exit("Workload name(s) not specified") if run_in_docker_container and with_ebpf: - sys.exit('Running ebpf benchmarks in docker container is not supported') + sys.exit("Running ebpf benchmarks in docker container is not supported") if with_ebpf_memory and not with_ebpf: - sys.exit('Memory related ebpf benchmarks require ebpf benchmarks to be enabled, run with --with-ebpf flag') + sys.exit("Memory related ebpf benchmarks require ebpf benchmarks to be enabled, run with --with-ebpf flag") def main() -> None: @@ -432,7 +432,7 @@ def main() -> None: len(workload_names), with_ebpf, with_ebpf_memory, - with_syscall_stacks + with_syscall_details ) else: run( @@ -449,7 +449,7 @@ def main() -> None: 1, with_ebpf, with_ebpf_memory, - with_syscall_stacks + with_syscall_details ) diff --git a/src/leveldb/leveldb.hpp b/src/leveldb/leveldb.hpp index 4b72918d..e0aca0e3 100644 --- a/src/leveldb/leveldb.hpp +++ b/src/leveldb/leveldb.hpp @@ -195,7 +195,7 @@ operation_result_t leveldb_t::remove(key_t key) { operation_result_t leveldb_t::read(key_t key, value_span_t value) const { - // Unlike RocksDB, we can't read into some form fo a `PinnableSlice`, + // Unlike RocksDB, we can't read into some form of a `PinnableSlice`, // just `std::string`, causing heap allocations. std::string data; leveldb::Status status = db_->Get(read_options_, to_slice(key), &data);