diff --git a/gprofiler/__init__.py b/gprofiler/__init__.py index 27a235aa6..d57fc92a3 100644 --- a/gprofiler/__init__.py +++ b/gprofiler/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "1.48.0" +__version__ = "1.49.0" diff --git a/gprofiler/main.py b/gprofiler/main.py index 498a2ad76..9e845e891 100644 --- a/gprofiler/main.py +++ b/gprofiler/main.py @@ -136,6 +136,7 @@ def __init__( remote_logs_handler: Optional[RemoteLogsHandler] = None, controller_process: Optional[Process] = None, external_metadata_path: Optional[Path] = None, + heartbeat_file_path: Optional[Path] = None, ): self._output_dir = output_dir self._flamegraph = flamegraph @@ -154,6 +155,7 @@ def __init__( self._controller_process = controller_process self._duration = duration self._external_metadata_path = external_metadata_path + self._heartbeat_file_path = heartbeat_file_path if self._collect_metadata: self._static_metadata = get_static_metadata(self._spawn_time, user_args, self._external_metadata_path) self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) @@ -393,6 +395,11 @@ def run_continuous(self) -> None: self._state.init_new_cycle() snapshot_start = time.monotonic() + + if self._heartbeat_file_path: + # --heart-beat flag + self._heartbeat_file_path.touch(mode=644, exist_ok=True) + try: self._snapshot() except Exception: @@ -808,6 +815,15 @@ def parse_cmd_args() -> configargparse.Namespace: help="Log extra verbose information, making the debugging of gProfiler easier", ) + parser.add_argument( + "--heartbeat-file", + type=str, + dest="heartbeat_file", + default=None, + help="Heartbeat file used to indicate gProfiler is functioning." + "The file modification indicates the last snapshot time.", + ) + args = parser.parse_args() args.perf_inject = args.nodejs_mode == "perf" @@ -1067,6 +1083,10 @@ def main() -> None: logger.error(f"External metadata file {args.external_metadata} does not exist!") sys.exit(1) + heartbeat_file_path: Optional[Path] = None + if args.heartbeat_file is not None: + heartbeat_file_path = Path(args.heartbeat_file) + try: log_system_info() except Exception: @@ -1149,6 +1169,7 @@ def main() -> None: controller_process=controller_process, processes_to_profile=processes_to_profile, external_metadata_path=external_metadata_path, + heartbeat_file_path=heartbeat_file_path, ) logger.info("gProfiler initialized and ready to start profiling") if args.continuous: diff --git a/gprofiler/profilers/python_ebpf.py b/gprofiler/profilers/python_ebpf.py index e2b44bc9e..05535cb50 100644 --- a/gprofiler/profilers/python_ebpf.py +++ b/gprofiler/profilers/python_ebpf.py @@ -16,10 +16,11 @@ import glob import os import resource +import selectors import signal from pathlib import Path from subprocess import Popen -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, cast from granulate_utils.linux.ns import is_running_in_init_pid from psutil import NoSuchProcess, Process @@ -64,7 +65,7 @@ class PythonEbpfProfiler(ProfilerBase): _DUMP_TIMEOUT = 5 # seconds _POLL_TIMEOUT = 10 # seconds _GET_OFFSETS_TIMEOUT = 5 # seconds - _STDERR_READ_SIZE = 65536 # bytes read every cycle from stderr + _OUTPUT_READ_SIZE = 65536 # bytes read every cycle from stderr def __init__( self, @@ -78,6 +79,7 @@ def __init__( ): super().__init__(frequency, duration, profiler_state) self.process: Optional[Popen] = None + self.process_selector: Optional[selectors.BaseSelector] = None self.output_path = Path(self._profiler_state.storage_dir) / f"pyperf.{random_prefix()}.col" self.add_versions = add_versions self.user_stacks_pages = user_stacks_pages @@ -213,6 +215,36 @@ def start(self) -> None: raise else: self.process = process + self._register_process_selectors() + + def _register_process_selectors(self) -> None: + self.process_selector = selectors.DefaultSelector() + assert self.process_selector and self.process and self.process.stdout and self.process.stderr # for mypy + self.process_selector.register(self.process.stdout, selectors.EVENT_READ) + self.process_selector.register(self.process.stderr, selectors.EVENT_READ) + + def _unregister_process_selectors(self) -> None: + assert self.process_selector + self.process_selector.close() + self.process_selector = None + + def _read_process_standard_outputs(self) -> Tuple[Optional[str], Optional[str]]: + """ + Read the process standard outputs in a non-blocking manner. + + :return: tuple[stdout, stderr] + """ + stdout: Optional[str] = None + stderr: Optional[str] = None + assert self.process_selector and self.process + for key, _ in self.process_selector.select(timeout=0): + output = key.fileobj.read1(self._OUTPUT_READ_SIZE) # type: ignore + output = cast(str, output) + if key.fileobj is self.process.stdout: + stdout = output + elif key.fileobj is self.process.stderr: + stderr = output + return stdout, stderr def _dump(self) -> Path: assert self.is_running() @@ -224,11 +256,8 @@ def _dump(self) -> Path: output = wait_for_file_by_prefix( f"{self.output_path}.", self._DUMP_TIMEOUT, self._profiler_state.stop_event ) - # PyPerf outputs sampling & error counters every interval (after writing the output file), print them. - # also, makes sure its output pipe doesn't fill up. - # using read1() which performs just a single read() call and doesn't read until EOF - # (unlike Popen.communicate()) - logger.debug("PyPerf dump output", stderr=self.process.stderr.read1(self._STDERR_READ_SIZE)) # type: ignore + stdout, stderr = self._read_process_standard_outputs() + logger.debug("PyPerf dump output", stdout=stdout, stderr=stderr) return output except TimeoutError: # error flow :( @@ -274,6 +303,7 @@ def snapshot(self) -> ProcessToProfileData: return profiles def _terminate(self) -> Tuple[Optional[int], str, str]: + self._unregister_process_selectors() if self.is_running(): assert self.process is not None # for mypy self.process.terminate() # okay to call even if process is already dead