From 1991aacca968dd9de1f81382b832619a93d0ec6a Mon Sep 17 00:00:00 2001 From: hov1417 Date: Mon, 22 May 2023 11:46:58 +0400 Subject: [PATCH] Add: eBPF profiling --- .gitignore | 2 + CMakeLists.txt | 4 +- Dockerfile | 22 +- README.md | 6 + cmake/leveldb.cmake | 2 + ebpf/ebpf.py | 521 ++++++++++++++++++++++++++++++++++++++++++ ebpf/ebpf_main.c | 419 +++++++++++++++++++++++++++++++++ run.py | 88 +++++-- src/bench.cxx | 30 +++ src/core/settings.hpp | 1 + 10 files changed, 1065 insertions(+), 30 deletions(-) create mode 100644 ebpf/ebpf.py create mode 100644 ebpf/ebpf_main.c diff --git a/.gitignore b/.gitignore index 314bde06..f1ed02af 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,5 @@ code-review*.bak *.exe *.out *.app + +**/__pycache__/ diff --git a/CMakeLists.txt b/CMakeLists.txt index e57a7d3f..3d654f29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -131,11 +131,11 @@ add_executable(ucsb_bench ./src/bench.cxx) if(${UCSB_BUILD_USTORE}) # Choose engine. Available engines: UCSET, ROCKSDB, LEVELDB, UDISK - set(USTORE_ENGINE_NAME UCSET) + set(USTORE_ENGINE_NAME UCSET) include("${CMAKE_MODULE_PATH}/ustore.cmake") list(APPEND UCSB_DB_LIBS "ustore") - target_compile_definitions(ucsb_bench PUBLIC UCSB_HAS_USTORE=1) + target_compile_definitions(ucsb_bench PUBLIC UCSB_HAS_USTORE=1) endif() if(${UCSB_BUILD_ROCKSDB}) diff --git a/Dockerfile b/Dockerfile index 140f2847..06249493 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,36 +4,34 @@ MAINTAINER unum@cloud.com WORKDIR ./ -RUN apt-get update +RUN apt update ENV DEBIAN_FRONTEND noninteractive # Install tools RUN apt install -y python3-pip RUN pip3 install cmake -RUN pip3 install conan RUN apt install -y gcc-10 RUN apt install -y g++-10 -RUN apt-get install -y libexplain-dev -RUN apt-get install -y libsnappy-dev -RUN apt-get install -yq pkg-config -RUN apt-get install -y git +RUN apt install -y libexplain-dev +RUN apt install -y libsnappy-dev +RUN apt install -y pkg-config +RUN apt install -y git # Build WiredTiger (latest) -RUN git clone git://github.com/wiredtiger/wiredtiger.git +RUN git clone --depth 1 git@github.com:wiredtiger/wiredtiger.git RUN mkdir ./wiredtiger/build WORKDIR "./wiredtiger/build" -RUN cmake ../. +RUN cmake .. RUN make install WORKDIR / # Build UCSB -RUN git clone https://github.com/unum-cloud/ucsb.git +RUN git clone --depth 1 git@github.com:unum-cloud/ucsb.git WORKDIR "./ucsb/" RUN update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-10 10 RUN update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-10 10 -RUN bash -i setup.sh RUN bash -i build_release.sh -RUN rm -rf ./bench +RUN rm -rf ./bench/results -ENTRYPOINT ["./build_release/bin/ucsb_bench"] +ENTRYPOINT ["./build_release/build/bin/ucsb_bench"] diff --git a/README.md b/README.md index 9dc279a4..46596e6d 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,12 @@ The outputs will be placed in the `bench/results/` folder. git clone https://github.com/unum-cloud/ucsb.git && cd ucsb && ./run.py ``` +### eBPF + +There is an optional eBPF-based profiling, which can be enabled with `--with-ebpf` and `--with-ebpf-memory` flags, +before execution make sure you have `bcc` [installed](https://github.com/iovisor/bcc/blob/master/INSTALL.md) at least version 0.21.0. + + ## Supported Databases Key-Value Stores and NoSQL databases differ in supported operations. diff --git a/cmake/leveldb.cmake b/cmake/leveldb.cmake index ea21444f..9e210799 100644 --- a/cmake/leveldb.cmake +++ b/cmake/leveldb.cmake @@ -27,4 +27,6 @@ if(NOT leveldb_POPULATED) add_subdirectory(${leveldb_SOURCE_DIR} ${leveldb_BINARY_DIR} EXCLUDE_FROM_ALL) endif() +add_compile_options(-g) + include_directories(${leveldb_SOURCE_DIR}/include) diff --git a/ebpf/ebpf.py b/ebpf/ebpf.py new file mode 100644 index 00000000..0e233e5a --- /dev/null +++ b/ebpf/ebpf.py @@ -0,0 +1,521 @@ +#!/usr/bin/env python3 + +import argparse +import json +import logging +import os +import signal +import resource +import sys +from time import sleep, strftime, time +from typing import Optional, Tuple + +import pexpect +from bcc import BPF +from bcc.syscall import syscall_name +from pexpect import spawn + +logging.basicConfig(filename='/tmp/ebpf.log', encoding='utf-8', level=logging.DEBUG) + + +def get_size_filter(min_size, max_size): + if min_size is not None and max_size is not None: + return "if (size < %d || size > %d) return 0;" % (min_size, max_size) + elif min_size is not None: + return "if (size < %d) return 0;" % min_size + elif max_size is not None: + return "if (size > %d) return 0;" % max_size + else: + return "" + + +class Allocation(object): + def __init__(self, stack, size, address): + self.stack = stack + self.count = 1 + self.size = size + self.address = address + + def update(self, size): + self.count += 1 + self.size += size + + +def print_outstanding(bpf, pid, top, min_age_ns): + print(f"Top {top} stacks with outstanding allocations:") + alloc_info = {} + allocs = bpf["allocs"] + stack_traces = bpf["stack_traces"] + for address, info in sorted(allocs.items(), key=lambda a: a[1].size): + if BPF.monotonic_time() - min_age_ns < info.timestamp_ns or info.stack_id < 0: + continue + if info.stack_id in alloc_info: + alloc_info[info.stack_id].update(info.size) + else: + stack = list(stack_traces.walk(info.stack_id)) + combined = [] + for addr in stack: + func_name = bpf.sym(addr, pid, show_module=True, show_offset=True) + formatted_address = ('0x' + format(addr, '016x') + '\t').encode('utf-8') + combined.append(formatted_address + func_name) + alloc_info[info.stack_id] = Allocation(combined, info.size, address.value) + print(f"\taddr = {address.value} size = {info.size}") + to_show = sorted(alloc_info.values(), key=lambda a: a.size)[-top:] + for alloc in to_show: + stack = b"\n\t\t".join(alloc.stack).decode("ascii") + print(f"\t{alloc.size} bytes in {alloc.count} allocations from stack\n\t\t{stack}") + + +def get_outstanding(bpf, pid, min_age_ns, top): + alloc_info = {} + allocs = bpf["allocs"] + stack_traces = bpf["stack_traces"] + for address, info in sorted(allocs.items(), key=lambda a: a[1].size): + if BPF.monotonic_time() - min_age_ns < info.timestamp_ns or info.stack_id < 0: + continue + if info.stack_id in alloc_info: + alloc_info[info.stack_id].update(info.size) + else: + stack = list(stack_traces.walk(info.stack_id)) + combined = [] + for addr in stack: + func_name = bpf.sym(addr, pid, show_module=True, show_offset=True) + formatted_address = ('0x' + format(addr, '016x') + ' ').encode('utf-8') + combined.append(formatted_address + func_name) + alloc_info[info.stack_id] = Allocation(combined, info.size, address.value) + + sorted_stacks = sorted(alloc_info.values(), key=lambda a: -a.size)[:top] + return list( + map(lambda alloc: {'stack': [s.decode('ascii') for s in alloc.stack], 'size': alloc.size, 'count': alloc.count}, + sorted_stacks)) + + +class CombinedAlloc(object): + def __init__(self, item): + self.stack_id = item[0] + self.free_size = item[1].free_size + self.alloc_size = item[1].alloc_size + self.number_of_frees = item[1].number_of_frees + self.number_of_allocs = item[1].number_of_allocs + + def key(self): + return self.alloc_size - self.free_size + + def __str__(self): + return f"CombinedAlloc(stack_id={self.stack_id},\n" \ + f"\t free_size={self.free_size},\n" \ + f"\t alloc_size={self.alloc_size},\n" \ + f"\t number_of_frees={self.number_of_frees},\n" \ + f"\t number_of_allocs={self.number_of_allocs})\n" + + def __repr__(self): + return self.__str__() + + def is_positive(self): + return self.alloc_size > self.free_size + + +def print_memory_statistics(bpf, pid, top): + stack_traces = bpf["stack_traces"] + print("stack traces", len(list(stack_traces.items()))) + combined_alloc = list( + sorted( + map(CombinedAlloc, bpf["combined_allocs"].items()), + key=lambda a: a.key(), + ) + ) + memory = sum((c.alloc_size - c.free_size for c in combined_alloc)) / 1024 + print("overall, allocated", memory, "kb in", len(combined_alloc), "allocations") + entries = [] + for allocation in combined_alloc[:top]: + trace = get_trace_info(bpf, pid, stack_traces, allocation.stack_id.value) + entry = f"\t{allocation.alloc_size - allocation.free_size} bytes in " \ + f"{allocation.number_of_allocs - allocation.number_of_frees}" \ + f" allocations from stack ({allocation.number_of_allocs + allocation.number_of_frees} allocs/frees)" \ + f"\n\t\t{trace}" + entries.append(entry) + + print(f"Top {top} stacks with outstanding allocations:") + print('\n'.join(reversed(entries))) + + +def get_statistics(bpf, pid): + stack_traces = bpf["stack_traces"] + combined_alloc = list( + sorted( + map(CombinedAlloc, bpf["combined_allocs"].items()), + key=lambda a: a.key(), + ) + ) + memory = sum((c.alloc_size - c.free_size for c in combined_alloc)) + entries = [] + for allocation in combined_alloc: + entries.append({ + 'alloc_size': allocation.alloc_size, + 'free_size': allocation.free_size, + 'number_of_allocs': allocation.number_of_allocs, + 'number_of_frees': allocation.number_of_frees, + 'trace': get_trace_info_as_list(bpf, pid, stack_traces, allocation.stack_id.value), + }) + return { + "memory": memory, + "combined_allocs": list(reversed(entries)), + "stack_traces": len(list(stack_traces.items())) + } + + +def get_trace_info(bpf, pid, stack_traces, stack_id): + trace = [] + for addr in walk_trace(stack_traces, stack_id): + sym = bpf.sym(addr, pid, show_module=False, show_offset=True) + trace.append(sym.decode()) + + trace = "\n\t\t".join(trace) + if not trace: + trace = "stack information lost" + return trace + + +def get_trace_info_as_list(bpf, pid, stack_traces, stack_id): + trace = [] + for addr in walk_trace(stack_traces, stack_id): + sym = bpf.sym(addr, pid, show_module=False, show_offset=True) + trace.append(sym.decode()) + + return trace + + +def walk_trace(stack_traces, stack_id): + try: + return stack_traces.walk(stack_id) + except KeyError: + return [] + + +def print_outstanding_kernel_cache(bpf, top): + kernel_cache_allocs = list( + sorted(filter(lambda a: a[1].alloc_size > a[1].free_size, bpf['kernel_cache_counts'].items()), + key=lambda a: a[1].alloc_size - a[1].free_size) + )[:top] + if not kernel_cache_allocs: + return + print("---------------- Kernel Caches ---------------") + for (k, v) in kernel_cache_allocs: + print("Cache", str(k.name, "utf-8"), v.alloc_count - v.free_count, v.alloc_size - v.free_size) + + +def gernel_kernel_cache(bpf, top): + kernel_cache_allocs = list( + sorted(filter(lambda a: a[1].alloc_size > a[1].free_size, bpf['kernel_cache_counts'].items()), + key=lambda a: a[1].alloc_size - a[1].free_size) + )[:top] + if not kernel_cache_allocs: + return + caches = [] + to_remove = [] + for (k, v) in kernel_cache_allocs: + caches.append({ + 'name': str(k.name, "utf-8"), + 'alloc_count': v.alloc_count, + 'free_count': v.free_count, + 'alloc_size': v.alloc_size, + 'free_size': v.free_size, + }) + if v.alloc_count >= v.free_count: + to_remove.append(k) + if len(to_remove) > 0: + arr = (type(to_remove[0]) * len(to_remove))(*to_remove) + bpf['kernel_cache_counts'].items_delete_batch(arr) + return caches + + +def print_syscalls(bpf, top): + syscall_counts = bpf["syscall_counts"] + print("SYSCALL COUNT TIME") + for k, v in sorted(syscall_counts.items(), key=lambda kv: -kv[1].total_ns)[:top]: + print("%-22s %8d %16.3f" % (system_call_name(k), v.count, v.total_ns / 1e3)) + syscall_counts.clear() + + +def get_syscalls(bpf, top): + syscall_counts = bpf["syscall_counts"] + syscalls = [] + for k, v in sorted(syscall_counts.items(), key=lambda kv: -kv[1].total_ns)[:top]: + syscalls.append({ + 'name': system_call_name(k), + 'count': v.count, + 'total_ns': v.total_ns, + }) + syscall_counts.clear() + return syscalls + + +def system_call_name(k): + return syscall_name(k.value).decode('ascii') + + +def print_time(): + print("[%s]" % strftime("%H:%M:%S")) + + +def save_snapshot( + bpf: BPF, + pid: int, + min_age_ns: int, + top: int, + snapshot_dir: str, + with_memory: bool, + snapshot_prefix: Optional[str] = None +): + current_time_millis = int(round(time() * 1000)) + stats = get_statistics(bpf, pid) if with_memory else {} + outstanding = get_outstanding(bpf, pid, min_age_ns, top) if with_memory else [] + kernel_caches = gernel_kernel_cache(bpf, top) if with_memory else [] + syscalls = get_syscalls(bpf, top) + snapshot = { + 'time': current_time_millis, + 'stats': stats, + 'outstanding': outstanding, + 'kernel_caches': kernel_caches, + 'syscalls': syscalls, + } + os.makedirs(snapshot_dir, exist_ok=True) + with open(f'{snapshot_dir}/{snapshot_prefix or "snapshot"}-{current_time_millis}.json', 'w') as outfile: + json.dump(snapshot, outfile) + + +class Arguments: + def __init__(self, args): + self.pid = args.pid + self.command = args.command + self.interval = args.interval + self.min_age_ns = 1e6 * args.older + self.alloc_sample_every_n = args.alloc_sample_rate + self.top = args.top + self.min_alloc_size = args.min_alloc_size + self.max_alloc_size = args.max_alloc_size + + if args.snapshots is None: + self.save_snapshots = './snapshots' + elif args.snapshots == -1: + self.save_snapshots = None + else: + self.save_snapshots = args.snapshots + + if self.min_alloc_size is not None and self.max_alloc_size is not None \ + and self.min_alloc_size > self.max_alloc_size: + print("min_size (-z) can't be greater than max_size (-Z)") + exit(1) + + if self.command is None and self.pid is None: + print("Either -p or -c must be specified") + exit(1) + + +def parse(): + parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("-p", "--pid", type=int, default=-1, + help="the PID to trace; if not specified, trace kernel allocs") + parser.add_argument("interval", nargs="?", default=5, type=int, + help="interval in seconds to print outstanding allocations") + parser.add_argument("-o", "--older", default=500, type=int, + help="prune allocations younger than this age in milliseconds") + parser.add_argument("-c", "--command", + help="execute and trace the specified command") + parser.add_argument("-s", "--alloc-sample-rate", default=1, type=int, + help="sample every N-th allocation to decrease the overhead") + parser.add_argument("-T", "--top", type=int, default=10, + help="display only this many top stats") + parser.add_argument("-z", "--min-alloc-size", type=int, + help="capture only allocations larger than this size") + parser.add_argument("-Z", "--max-alloc-size", type=int, + help="capture only allocations smaller than this size") + parser.add_argument("-S", "--snapshots", default=-1, type=str, nargs='?', + help="save statistics snapshots to the specified directory") + return Arguments(parser.parse_args()) + + +def attach_ebpf(pid: int = -1, + process: Optional[spawn] = None, + command: Optional[str] = None, + top: int = 10, + interval: int = 5, + alloc_sample_every_n: int = 1, + max_alloc_size: Optional[int] = None, + min_age_ns: int = 500, + min_alloc_size: Optional[int] = None, + save_snapshots: Optional[str] = None, + with_memory: bool = False): + (bpf, pid, process) = attach_probes(pid, process, command, alloc_sample_every_n, max_alloc_size, min_alloc_size, + with_memory) + + harvest_ebpf(bpf, pid, process, top, interval, min_age_ns, save_snapshots) + + +def attach_probes( + pid: int = -1, + process: Optional[spawn] = None, + command: Optional[str] = None, + alloc_sample_every_n: int = 1, + max_alloc_size: Optional[int] = None, + min_alloc_size: Optional[int] = None, + with_memory: bool = False, + communicate_with_signals: bool = False +) -> Optional[Tuple[BPF, int, spawn]]: + if pid == -1 and command is None and process is None: + logging.info("Either pid or command or process must be specified") + return + + if command is not None: + logging.info(f"Executing '{command}' and tracing the resulting process.") + process = pexpect.spawn(command) + pid = process.pid + elif process is not None: + pid = process.pid + + if communicate_with_signals: + signal.signal(signal.SIGUSR2, signal_handler) + + # Constructing probes + bpf = BPF(src_file='./ebpf/ebpf_main.c', + usdt_contexts=[], + cflags=[ + "-Wno-macro-redefined", + f"-DPROCESS_ID={pid}", + f"-DSAMPLE_EVERY_N={alloc_sample_every_n}", + f"-DPAGE_SIZE={resource.getpagesize()}", + f"-DFILTER_BY_SIZE={get_size_filter(min_alloc_size, max_alloc_size)}", + "-DWITH_MEMORY" if with_memory else "", + ]) + + # Attaching probes + + logging.info(f"Attaching to pid {pid}") + + wait_time = 0 + wait_interval = 0.01 + timeout = 5 + while not process.isalive(): + sleep(wait_interval) + wait_time += wait_interval + if process.terminated: + print(process.readline().decode('utf-8')) + raise Exception(f'Process is already terminated, with status code {process.exitstatus}') + if wait_time > timeout: + raise Exception('Process is not alive') + + if with_memory: + for sym in ["malloc", "calloc", "realloc", "mmap", "posix_memalign", "valloc", "memalign", "pvalloc", + "aligned_alloc"]: + bpf.attach_uprobe(name="c", sym=sym, fn_name=sym + "_enter", pid=pid) + bpf.attach_uretprobe(name="c", sym=sym, fn_name=sym + "_exit", pid=pid) + + bpf.attach_uprobe(name="c", sym="free", fn_name="free_enter", pid=pid) + bpf.attach_uprobe(name="c", sym="munmap", fn_name="munmap_enter", pid=pid) + + # kernel cache probes + bpf.attach_kprobe(event='kmem_cache_alloc_lru', fn_name='trace_cache_alloc') + bpf.attach_kprobe(event='kmem_cache_alloc_bulk', fn_name='trace_cache_alloc') + bpf.attach_kprobe(event='kmem_cache_alloc_node', fn_name='trace_cache_alloc') + + bpf.attach_kprobe(event='kmem_cache_free', fn_name='trace_cache_free') + bpf.attach_kprobe(event='kmem_cache_free_bulk', fn_name='trace_cache_free') + + return bpf, pid, process + + +SIGUSR2_received = False + + +def signal_handler(_sig_no, _stack_frame): + global SIGUSR2_received + SIGUSR2_received = True + + +def is_terminated(pid: int, process: Optional[spawn]): + if process is None: + try: + # Sending signal 0 to a pid will raise OSError if the process does not exist + os.kill(pid, 0) + except OSError: + return True + else: + return False + return SIGUSR2_received or not process.isalive() + + +def sleep_and_check( + interval: float, + pid: int, + process: Optional[spawn], + check_delay: float +): + wait_til = time() + interval + while time() < wait_til: + sleep(check_delay) + if is_terminated(pid, process): + break + + +def print_ebpf_info(bpf, pid, min_age_ns, top): + print_time() + print_memory_statistics(bpf, pid, top) + print_outstanding(bpf, pid, top, min_age_ns) + print_outstanding_kernel_cache(bpf, top) + print_syscalls(bpf, top) + print() + sys.stdout.flush() + + +def harvest_ebpf( + bpf: BPF, + pid: int = -1, + process: Optional[spawn] = None, + top: int = 10, + interval: int = 5, + min_age_ns: int = 500, + with_memory: bool = False, + save_snapshots: Optional[str] = None, + snapshot_prefix: Optional[str] = None, + communicate_with_signals: bool = False, +): + if pid == -1 and process is None: + raise ValueError("Either pid or process must be specified") + + if process is not None: + pid = process.pid + + while True: + logging.info(f"Sleeping for {interval} seconds...") + try: + sleep_and_check(interval, pid, process, check_delay=0.2) + except KeyboardInterrupt: + break + if save_snapshots: + save_snapshot(bpf, pid, min_age_ns, top, save_snapshots, with_memory, snapshot_prefix) + else: + print_ebpf_info(bpf, pid, min_age_ns, top) + if is_terminated(pid, process): + break + + if communicate_with_signals: + # Sending SIGUSR1 to the process will notify that the tracing is done + os.kill(pid, signal.SIGUSR1) + + logging.info("Detaching...") + bpf.cleanup() + + +if __name__ == "__main__": + arguments = parse() + attach_ebpf( + pid=arguments.pid, + command=arguments.command, + top=arguments.top, + interval=arguments.interval, + alloc_sample_every_n=arguments.alloc_sample_every_n, + max_alloc_size=arguments.max_alloc_size, + min_age_ns=arguments.min_age_ns, + min_alloc_size=arguments.min_alloc_size, + save_snapshots=arguments.save_snapshots, + ) diff --git a/ebpf/ebpf_main.c b/ebpf/ebpf_main.c new file mode 100644 index 00000000..da9974fe --- /dev/null +++ b/ebpf/ebpf_main.c @@ -0,0 +1,419 @@ +#include +#include +#include +#include +#include +#include + +#define FILTER_BY_PID u32 __pid = bpf_get_current_pid_tgid() >> 32;if (__pid != PROCESS_ID) {return 0;} + +#ifdef WITH_MEMORY + +struct alloc_info_t { + u64 size; + u64 timestamp_ns; + int stack_id; +}; + +struct combined_alloc_info_t { + u64 alloc_size; + u64 free_size; + u64 number_of_allocs; + u64 number_of_frees; +}; + +// count of allocations per stack trace +BPF_HASH(combined_allocs, u64, struct combined_alloc_info_t, 10240); + +BPF_HASH(sizes, u64); +BPF_HASH(allocs, u64, struct alloc_info_t, 1000000); +BPF_HASH(memptrs, u64, u64); + +BPF_STACK_TRACE(stack_traces, 10240); + +static inline void update_statistics_add(u64 stack_id, u64 sz) { + struct combined_alloc_info_t *existing_cinfo; + struct combined_alloc_info_t cinfo = {0}; + + existing_cinfo = combined_allocs.lookup(&stack_id); + if (existing_cinfo != 0) + cinfo = *existing_cinfo; + + lock_xadd(&cinfo.alloc_size, sz); + lock_xadd(&cinfo.number_of_allocs, 1); + + combined_allocs.update(&stack_id, &cinfo); +} + +static inline void update_statistics_del(u64 stack_id, u64 sz) { + struct combined_alloc_info_t *existing_cinfo; + struct combined_alloc_info_t cinfo = {0}; + + existing_cinfo = combined_allocs.lookup(&stack_id); + if (existing_cinfo != 0) + cinfo = *existing_cinfo; + + lock_xadd(&cinfo.free_size, sz); + lock_xadd(&cinfo.number_of_frees, 1); + + + combined_allocs.update(&stack_id, &cinfo); +} + +static inline int gen_alloc_enter(struct pt_regs *ctx, size_t size) { + FILTER_BY_PID + + FILTER_BY_SIZE + if (SAMPLE_EVERY_N > 1) { + u64 ts = bpf_ktime_get_ns(); + if (ts % SAMPLE_EVERY_N != 0) + return 0; + } + + u64 pid = bpf_get_current_pid_tgid(); + u64 size64 = size; + sizes.update(&pid, &size64); + + return 0; +} + +static inline int gen_alloc_exit2(struct pt_regs *ctx, u64 address) { + FILTER_BY_PID + + u64 pid = bpf_get_current_pid_tgid(); + u64 * size64 = sizes.lookup(&pid); + struct alloc_info_t info = {0}; + + if (size64 == 0) + return 0; // missed alloc entry + + info.size = *size64; + sizes.delete(&pid); + + if (address != 0) { + info.timestamp_ns = bpf_ktime_get_ns(); + info.stack_id = stack_traces.get_stackid(ctx, BPF_F_USER_STACK); + allocs.update(&address, &info); + update_statistics_add(info.stack_id, info.size); + } + + return 0; +} + +static inline int gen_alloc_exit(struct pt_regs *ctx) { + return gen_alloc_exit2(ctx, PT_REGS_RC(ctx)); +} + +static inline int gen_free_enter(struct pt_regs *ctx, void *address) { + FILTER_BY_PID + + u64 addr = (u64)address; + struct alloc_info_t *info = allocs.lookup(&addr); + if (info == 0) + return 0; + + allocs.delete(&addr); + update_statistics_del(info->stack_id, info->size); + + return 0; +} + +/** Probes */ + +int malloc_enter(struct pt_regs *ctx, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int malloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int free_enter(struct pt_regs *ctx, void *address) { + return gen_free_enter(ctx, address); +} + +int calloc_enter(struct pt_regs *ctx, size_t nmemb, size_t size) { + return gen_alloc_enter(ctx, nmemb * size); +} + +int calloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int realloc_enter(struct pt_regs *ctx, void *ptr, size_t size) { + gen_free_enter(ctx, ptr); + return gen_alloc_enter(ctx, size); +} + +int realloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int mmap_enter(struct pt_regs *ctx) { + size_t size = (size_t) PT_REGS_PARM2(ctx); + return gen_alloc_enter(ctx, size); +} + +int mmap_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int munmap_enter(struct pt_regs *ctx, void *address) { + return gen_free_enter(ctx, address); +} + +int posix_memalign_enter(struct pt_regs *ctx, void **memptr, size_t alignment, + size_t size) { + u64 memptr64 = (u64)(size_t)memptr; + u64 pid = bpf_get_current_pid_tgid(); + + memptrs.update(&pid, &memptr64); + return gen_alloc_enter(ctx, size); +} + +int posix_memalign_exit(struct pt_regs *ctx) { + u64 pid = bpf_get_current_pid_tgid(); + u64 *memptr64 = memptrs.lookup(&pid); + void *addr; + + if (memptr64 == 0) + return 0; + + memptrs.delete(&pid); + + if (bpf_probe_read_user(&addr, sizeof(void *), (void *) (size_t) * memptr64)) + return 0; + + u64 addr64 = (u64)(size_t) addr; + return gen_alloc_exit2(ctx, addr64); +} + +int aligned_alloc_enter(struct pt_regs *ctx, size_t alignment, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int aligned_alloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int valloc_enter(struct pt_regs *ctx, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int valloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int memalign_enter(struct pt_regs *ctx, size_t alignment, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int memalign_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +int pvalloc_enter(struct pt_regs *ctx, size_t size) { + return gen_alloc_enter(ctx, size); +} + +int pvalloc_exit(struct pt_regs *ctx) { return gen_alloc_exit(ctx); } + +/** Tracepoints + Function names are in tracepoint__{{category}}__{{event}} + alternatively we can use attach_tracepoint function in python api +*/ + +int tracepoint__kmem__kmalloc(struct tracepoint__kmem__kmalloc *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +int tracepoint__kmem__kfree(struct tracepoint__kmem__kfree *args) { + return gen_free_enter((struct pt_regs *) args, (void *) args->ptr); +} + +int tracepoint__kmem__kmem_cache_alloc( + struct tracepoint__kmem__kmem_cache_alloc *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +int tracepoint__kmem__kmem_cache_free( + struct tracepoint__kmem__kmem_cache_free *args) { + return gen_free_enter((struct pt_regs *) args, (void *) args->ptr); +} + +int tracepoint__kmem__mm_page_alloc( + struct tracepoint__kmem__mm_page_alloc *args) { + gen_alloc_enter((struct pt_regs *) args, PAGE_SIZE << args->order); + return gen_alloc_exit2((struct pt_regs *) args, args->pfn); +} + +int tracepoint__kmem__mm_page_free( + struct tracepoint__kmem__mm_page_free *args) { + return gen_free_enter((struct pt_regs *) args, (void *) args->pfn); +} + +int tracepoint__kmem__kmalloc_node(struct tracepoint__kmem__kmalloc_node *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +int tracepoint__kmem__kmem_cache_alloc_node(struct tracepoint__kmem__kmem_cache_alloc_node *args) { + gen_alloc_enter((struct pt_regs *) args, args->bytes_alloc); + return gen_alloc_exit2((struct pt_regs *) args, (size_t) args->ptr); +} + +/** kernel cache */ + +// to resolve undefined error +// taken from bcc slabratetop tool +struct slab { + unsigned long __page_flags; +#if defined(CONFIG_SLAB) + struct kmem_cache *slab_cache; + union { + struct { + struct list_head slab_list; + void *freelist; /* array of free object indexes */ + void *s_mem; /* first object */ + }; + struct rcu_head rcu_head; + }; + unsigned int active; +#elif defined(CONFIG_SLUB) + struct kmem_cache *slab_cache; + union { + struct { + union { + struct list_head slab_list; +#ifdef CONFIG_SLUB_CPU_PARTIAL + struct { + struct slab *next; + int slabs; /* Nr of slabs left */ + }; +#endif + }; + /* Double-word boundary */ + void *freelist; /* first free object */ + union { + unsigned long counters; + struct { + unsigned inuse:16; + unsigned objects:15; + unsigned frozen:1; + }; + }; + }; + struct rcu_head rcu_head; + }; + unsigned int __unused; +#elif defined(CONFIG_SLOB) + struct list_head slab_list; + void *__unused_1; + void *freelist; /* first free block */ + long units; + unsigned int __unused_2; +#else +#error "Unexpected slab allocator configured" +#endif + atomic_t __page_refcount; +#ifdef CONFIG_MEMCG + unsigned long memcg_data; +#endif +}; + +// slab_address() will not be used, and NULL will be returned directly, which +// can avoid adaptation of different kernel versions +static inline void *slab_address(const struct slab *slab) { + return NULL; +} + +#ifdef CONFIG_SLUB + +#include + +#else + +#include + +#endif + +struct key_t { + char name[32]; +}; + +struct val_t { + u64 alloc_count; + u64 alloc_size; + u64 free_count; + u64 free_size; +}; + +BPF_HASH(kernel_cache_counts, +struct key_t, struct val_t); + +int trace_cache_alloc(struct pt_regs *ctx, struct kmem_cache *cachep) { + FILTER_BY_PID + + u64 size = cachep->size; + + FILTER_BY_SIZE + + struct key_t key = {}; + bpf_probe_read_kernel(&key.name, sizeof(key.name), cachep->name); + + struct val_t empty_val_t = {}; + + struct val_t *val = kernel_cache_counts.lookup_or_try_init(&key, &empty_val_t); + if (val) { + val->alloc_count++; + val->alloc_size += size; + } + + return 0; +} + +int trace_cache_free(struct pt_regs *ctx, struct kmem_cache *cachep) { + FILTER_BY_PID + + u64 size = cachep->size; + + FILTER_BY_SIZE + + struct key_t key = {}; + bpf_probe_read_kernel(&key.name, sizeof(key.name), cachep->name); + + struct val_t empty_val_t = {}; + struct val_t *val = kernel_cache_counts.lookup_or_try_init(&key, &empty_val_t); + if (val) { + val->free_count ++; + val->free_size += size; + } + + return 0; +} + +#endif + + +/** System Calls */ + +struct sys_call_data_t { + u64 count; + u64 total_ns; +}; +BPF_HASH(syscall_start, u64, u64); +BPF_HASH(syscall_counts, u32, struct sys_call_data_t); + +int tracepoint__raw_syscalls__sys_enter(struct tracepoint__raw_syscalls__sys_enter *args) { + FILTER_BY_PID + u64 pid_tgid = bpf_get_current_pid_tgid(); + u64 t = bpf_ktime_get_ns(); + syscall_start.update(&pid_tgid, &t); + return 0; +} + +int tracepoint__raw_syscalls__sys_exit(struct tracepoint__raw_syscalls__sys_exit *args) { + FILTER_BY_PID + + u64 pid_tgid = bpf_get_current_pid_tgid(); + + struct sys_call_data_t *val, zero = {}; + u64 *start_ns = syscall_start.lookup(&pid_tgid); + if (!start_ns) + return 0; + u32 key = args->id; + val = syscall_counts.lookup_or_try_init(&key, &zero); + if (val) { + lock_xadd(&val->count, 1); + lock_xadd(&val->total_ns, bpf_ktime_get_ns() - *start_ns); + } + return 0; +} \ No newline at end of file diff --git a/run.py b/run.py index 6d0af99c..cf110982 100755 --- a/run.py +++ b/run.py @@ -1,13 +1,16 @@ #!/usr/bin/env python3 +import argparse import os -import sys -import time +import pathlib import shutil import signal +import sys +import time +from threading import Thread +from typing import List + import pexpect -import pathlib -import argparse import termcolor """ @@ -53,6 +56,8 @@ drop_caches = True cleanup_previous = True run_in_docker_container = False +with_ebpf = False +with_ebpf_memory = False main_dir_path = './db_main/' storage_disk_paths = [ @@ -76,14 +81,16 @@ def get_db_main_dir_path(db_name: str, size: str, main_dir_path: str) -> str: return os.path.join(main_dir_path, db_name, size, '') -def get_db_storage_dir_paths(db_name: str, size: str, storage_disk_paths: str) -> list: +def get_db_storage_dir_paths(db_name: str, size: str, storage_disk_paths: List[str]) -> list: db_storage_dir_paths = [] for storage_disk_path in storage_disk_paths: db_storage_dir_paths.append(os.path.join(storage_disk_path, db_name, size, '')) return db_storage_dir_paths -def get_results_file_path(db_name: str, size: str, drop_caches: bool, transactional: bool, storage_disk_paths: str, threads_count: int) -> str: +def get_results_file_path(db_name: str, size: str, drop_caches: bool, transactional: bool, + storage_disk_paths: List[str], + threads_count: int) -> str: root_dir_path = '' if drop_caches: if transactional: @@ -106,7 +113,7 @@ def drop_system_caches(): try: with open('/proc/sys/vm/drop_caches', 'w') as stream: stream.write('3\n') - time.sleep(8) # Wait for other apps to reload its caches + time.sleep(8) # Wait for other apps to reload its caches except KeyboardInterrupt: print(termcolor.colored('Terminated by user', 'yellow')) exit(1) @@ -115,14 +122,28 @@ def drop_system_caches(): exit(1) -def run(db_name: str, size: str, workload_names: list, main_dir_path: str, storage_disk_paths: str, transactional: bool, drop_caches: bool, run_in_docker_container: bool, threads_count: bool, run_index: int, runs_count: int) -> None: +def run(db_name: str, + size: str, + workload_names: list, + main_dir_path: str, + storage_disk_paths: List[str], + transactional: bool, + drop_caches: bool, + run_in_docker_container: bool, + threads_count: int, + run_index: int, + runs_count: int, + with_ebpf: bool, + with_ebpf_memory: bool) -> None: db_config_file_path = get_db_config_file_path(db_name, size) workloads_file_path = get_workloads_file_path(size) db_main_dir_path = get_db_main_dir_path(db_name, size, main_dir_path) db_storage_dir_paths = get_db_storage_dir_paths(db_name, size, storage_disk_paths) - results_file_path = get_results_file_path(db_name, size, drop_caches, transactional, storage_disk_paths, threads_count) + results_file_path = get_results_file_path(db_name, size, drop_caches, transactional, storage_disk_paths, + threads_count) transactional_flag = '-t' if transactional else '' + lazy_flag = '-l' if with_ebpf else '' filter = ','.join(workload_names) db_storage_dir_paths = ','.join(db_storage_dir_paths) @@ -131,9 +152,10 @@ def run(db_name: str, size: str, workload_names: list, main_dir_path: str, stora runner = f'docker run -v {os.getcwd()}/bench:/ucsb/bench -v {os.getcwd()}/tmp:/ucsb/tmp -it ucsb-image-dev' else: runner = './build_release/build/bin/ucsb_bench' - process = pexpect.spawn(f'{runner} \ + command = f'{runner} \ -db {db_name} \ {transactional_flag} \ + {lazy_flag} \ -cfg "{db_config_file_path}" \ -wl "{workloads_file_path}" \ -md "{db_main_dir_path}" \ @@ -143,10 +165,33 @@ def run(db_name: str, size: str, workload_names: list, main_dir_path: str, stora -fl {filter} \ -ri {run_index} \ -rc {runs_count}' - ) + process = pexpect.spawn(command) + thread = None + if with_ebpf: + from ebpf.ebpf import attach_probes, harvest_ebpf + bpf, pid, process = attach_probes( + process=process, + with_memory=with_ebpf_memory, + communicate_with_signals=True, + ) + # Send SIGUSR1 to the process to notify it that the probes are attached + os.kill(process.pid, signal.SIGUSR1) + thread = Thread(target=harvest_ebpf, args=(bpf,), kwargs={ + 'process': process, + 'interval': 5, + 'with_memory': with_ebpf_memory, + 'snapshot_prefix': "-".join(workload_names), + 'save_snapshots': f'./bench/ebpf/snapshots/{db_name}_{size}', + 'communicate_with_signals': True, + }) + thread.start() + process.interact() process.close() + if with_ebpf: + thread.join() + # Handle signal if process.signalstatus: sig = signal.Signals(process.signalstatus) @@ -158,7 +203,6 @@ def run(db_name: str, size: str, workload_names: list, main_dir_path: str, stora def parse_args(): - global db_names global sizes global workload_names @@ -169,6 +213,8 @@ def parse_args(): global cleanup_previous global drop_caches global run_in_docker_container + global with_ebpf + global with_ebpf_memory parser = argparse.ArgumentParser() parser.add_argument('-db', '--db-names', help="Database name(s)", nargs='+', required=False, default=db_names) @@ -181,6 +227,8 @@ def parse_args(): parser.add_argument('-cl', '--cleanup-previous', help="Drops existing database before start the new benchmark", action=argparse.BooleanOptionalAction, default=cleanup_previous) parser.add_argument('-dp', '--drop-caches', help="Drops system cashes before each benchmark", action=argparse.BooleanOptionalAction, default=drop_caches) parser.add_argument('-rd', '--run-docker', help="Runs the benchmark in a docker container", action=argparse.BooleanOptionalAction, default=run_in_docker_container) + parser.add_argument('-eb', '--with-ebpf', help="Runs ebpf benchmarks", default=with_ebpf, dest="with_ebpf", action=argparse.BooleanOptionalAction) + parser.add_argument('-em', '--with-ebpf-memory', help="Enable memory related ebpf benchmarks", default=with_ebpf_memory, dest="with_ebpf_memory", action=argparse.BooleanOptionalAction) args = parser.parse_args() db_names = args.db_names @@ -193,6 +241,8 @@ def parse_args(): cleanup_previous = args.cleanup_previous drop_caches = args.drop_caches run_in_docker_container = args.run_docker + with_ebpf = args.with_ebpf + with_ebpf_memory = args.with_ebpf_memory def check_args(): @@ -206,10 +256,13 @@ def check_args(): sys.exit('Database size(s) not specified') if not workload_names: sys.exit('Workload name(s) not specified') + if run_in_docker_container and with_ebpf: + sys.exit('Running ebpf benchmarks in docker container is not supported') + if with_ebpf_memory and not with_ebpf: + sys.exit('Memory related ebpf benchmarks require ebpf benchmarks to be enabled, run with --with-ebpf flag') def main() -> None: - if os.geteuid() != 0: print(termcolor.colored(f'Run as sudo!', 'red')) sys.exit(-1) @@ -245,7 +298,8 @@ def main() -> None: pathlib.Path(db_storage_dir_path).mkdir( parents=True, exist_ok=True) # Create results dir paths - results_file_path = get_results_file_path(db_name, size, drop_caches, transactional, storage_disk_paths, threads_count) + results_file_path = get_results_file_path(db_name, size, drop_caches, transactional, storage_disk_paths, + threads_count) pathlib.Path(results_file_path).parent.mkdir(parents=True, exist_ok=True) # Run benchmark @@ -253,9 +307,11 @@ def main() -> None: for i, workload_name in enumerate(workload_names): if not run_in_docker_container: drop_system_caches() - run(db_name, size, [workload_name], main_dir_path, storage_disk_paths, transactional, drop_caches, run_in_docker_container, threads_count, i, len(workload_names)) + run(db_name, size, [workload_name], main_dir_path, storage_disk_paths, transactional, drop_caches, + run_in_docker_container, threads_count, i, len(workload_names), with_ebpf, with_ebpf_memory) else: - run(db_name, size, workload_names, main_dir_path, storage_disk_paths, transactional, drop_caches, run_in_docker_container, threads_count, 0, 1) + run(db_name, size, workload_names, main_dir_path, storage_disk_paths, transactional, drop_caches, + run_in_docker_container, threads_count, 0, 1, with_ebpf, with_ebpf_memory) if __name__ == '__main__': diff --git a/src/bench.cxx b/src/bench.cxx index 9f5f3900..24cf0af8 100644 --- a/src/bench.cxx +++ b/src/bench.cxx @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -37,6 +38,7 @@ void parse_and_validate_args(int argc, char* argv[], settings_t& settings) { argparse::ArgumentParser program(argv[0]); program.add_argument("-db", "--db-name").required().help("Database name"); program.add_argument("-t", "--transaction").default_value(false).implicit_value(true).help("Transactional"); + program.add_argument("-l", "--lazy").default_value(false).implicit_value(true).help("Wait for a SIGUSR1"); program.add_argument("-cfg", "--config-path").required().help("Database configuration file path"); program.add_argument("-wl", "--workload-path").required().help("Workloads file path"); program.add_argument("-res", "--results-path").required().help("Results file path"); @@ -53,6 +55,7 @@ void parse_and_validate_args(int argc, char* argv[], settings_t& settings) { settings.db_name = program.get("db-name"); settings.transactional = program.get("transaction"); + settings.lazy = program.get("lazy"); settings.db_config_file_path = program.get("config-path"); settings.workloads_file_path = program.get("workload-path"); settings.results_file_path = program.get("results-path"); @@ -502,6 +505,21 @@ void bench(bm::State& state, workload_t const& workload, db_t& db, bool transact } } +void wait_for_SIGUSR1() { + int sig; + sigset_t set; + + // Create a signal set containing SIGUSR1 + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + + // Block all signals in the set so that they don't terminate the program + sigprocmask(SIG_BLOCK, &set, NULL); + + // Wait for SIGUSR1 signal + sigwait(&set, &sig); +} + int main(int argc, char** argv) { try { @@ -509,6 +527,10 @@ int main(int argc, char** argv) { settings_t settings; parse_and_validate_args(argc, argv, settings); + if (settings.lazy) { + wait_for_SIGUSR1(); + } + // Resolve results paths fs::path final_results_file_path = settings.results_file_path; if (final_results_file_path.string().back() == '/') @@ -597,6 +619,14 @@ int main(int argc, char** argv) { file_reporter_t::merge_results(in_progress_results_file_path, final_results_file_path); fs::remove(in_progress_results_file_path); + + if (settings.lazy) { + __pid_t parent_pid = getppid(); + // Notify parent that we finished + kill(parent_pid, SIGUSR2); + wait_for_SIGUSR1(); + } + } catch (exception_t const& ex) { fmt::print("UCSB exception: {}\n", ex.what()); diff --git a/src/core/settings.hpp b/src/core/settings.hpp index 96703cfd..37de7622 100644 --- a/src/core/settings.hpp +++ b/src/core/settings.hpp @@ -9,6 +9,7 @@ namespace ucsb { struct settings_t { std::string db_name; bool transactional = false; + bool lazy = false; fs::path db_config_file_path; fs::path db_main_dir_path; std::vector db_storage_dir_paths;