Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
roi-granulate committed Jun 13, 2024
1 parent 3084a60 commit 9152021
Showing 1 changed file with 37 additions and 7 deletions.
44 changes: 37 additions & 7 deletions gprofiler/profilers/python_ebpf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import glob
import os
import resource
import selectors
import signal
from pathlib import Path
from subprocess import Popen
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, cast

from granulate_utils.linux.ns import is_running_in_init_pid
from psutil import NoSuchProcess, Process
Expand Down Expand Up @@ -64,7 +65,7 @@ class PythonEbpfProfiler(ProfilerBase):
_DUMP_TIMEOUT = 5 # seconds
_POLL_TIMEOUT = 10 # seconds
_GET_OFFSETS_TIMEOUT = 5 # seconds
_STDERR_READ_SIZE = 65536 # bytes read every cycle from stderr
_OUTPUT_READ_SIZE = 65536 # bytes read every cycle from stderr

def __init__(
self,
Expand All @@ -78,6 +79,7 @@ def __init__(
):
super().__init__(frequency, duration, profiler_state)
self.process: Optional[Popen] = None
self.process_selector: Optional[selectors.DefaultSelector] = None
self.output_path = Path(self._profiler_state.storage_dir) / f"pyperf.{random_prefix()}.col"
self.add_versions = add_versions
self.user_stacks_pages = user_stacks_pages
Expand Down Expand Up @@ -213,6 +215,36 @@ def start(self) -> None:
raise
else:
self.process = process
self._register_process_selectors()

def _register_process_selectors(self) -> None:
self.process_selector = selectors.DefaultSelector()
assert self.process_selector and self.process and self.process.stdout and self.process.stderr # for mypy
self.process_selector.register(self.process.stdout, selectors.EVENT_READ)
self.process_selector.register(self.process.stderr, selectors.EVENT_READ)

def _unregister_process_selectors(self) -> None:
assert self.process_selector
for selector_key in self.process_selector.get_map().values():
self.process_selector.unregister(selector_key.fileobj)

def _read_process_standard_outputs(self) -> Tuple[Optional[str], Optional[str]]:
"""
Read the process standard outputs in a non-blocking manner.
:return: tuple[stdout, stderr]
"""
stdout: Optional[str] = None
stderr: Optional[str] = None
assert self.process_selector and self.process
for key, _ in self.process_selector.select(timeout=0):
output = key.fileobj.read1(self._OUTPUT_READ_SIZE) # type: ignore
output = cast(str, output)
if key.fileobj is self.process.stdout:
stdout = output
if key.fileobj is self.process.stderr:
stderr = output
return stdout, stderr

def _dump(self) -> Path:
assert self.is_running()
Expand All @@ -224,11 +256,8 @@ def _dump(self) -> Path:
output = wait_for_file_by_prefix(
f"{self.output_path}.", self._DUMP_TIMEOUT, self._profiler_state.stop_event
)
# PyPerf outputs sampling & error counters every interval (after writing the output file), print them.
# also, makes sure its output pipe doesn't fill up.
# using read1() which performs just a single read() call and doesn't read until EOF
# (unlike Popen.communicate())
logger.debug("PyPerf dump output", stderr=self.process.stderr.read1(self._STDERR_READ_SIZE)) # type: ignore
stdout, stderr = self._read_process_standard_outputs()
logger.debug("PyPerf dump output", stdout=stdout, stderr=stderr)
return output
except TimeoutError:
# error flow :(
Expand Down Expand Up @@ -274,6 +303,7 @@ def snapshot(self) -> ProcessToProfileData:
return profiles

def _terminate(self) -> Tuple[Optional[int], str, str]:
self._unregister_process_selectors()
if self.is_running():
assert self.process is not None # for mypy
self.process.terminate() # okay to call even if process is already dead
Expand Down

0 comments on commit 9152021

Please sign in to comment.