Skip to content

Commit

Permalink
Merge branch 'master' into grpcio-1-62-2
Browse files Browse the repository at this point in the history
  • Loading branch information
slicklash authored Jun 25, 2024
2 parents f9a5cb2 + b6cbb3c commit 2c2b9dc
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
2 changes: 1 addition & 1 deletion gprofiler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__version__ = "1.48.0"
__version__ = "1.49.0"
21 changes: 21 additions & 0 deletions gprofiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def __init__(
remote_logs_handler: Optional[RemoteLogsHandler] = None,
controller_process: Optional[Process] = None,
external_metadata_path: Optional[Path] = None,
heartbeat_file_path: Optional[Path] = None,
):
self._output_dir = output_dir
self._flamegraph = flamegraph
Expand All @@ -154,6 +155,7 @@ def __init__(
self._controller_process = controller_process
self._duration = duration
self._external_metadata_path = external_metadata_path
self._heartbeat_file_path = heartbeat_file_path
if self._collect_metadata:
self._static_metadata = get_static_metadata(self._spawn_time, user_args, self._external_metadata_path)
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
Expand Down Expand Up @@ -393,6 +395,11 @@ def run_continuous(self) -> None:
self._state.init_new_cycle()

snapshot_start = time.monotonic()

if self._heartbeat_file_path:
# --heart-beat flag
self._heartbeat_file_path.touch(mode=644, exist_ok=True)

try:
self._snapshot()
except Exception:
Expand Down Expand Up @@ -808,6 +815,15 @@ def parse_cmd_args() -> configargparse.Namespace:
help="Log extra verbose information, making the debugging of gProfiler easier",
)

parser.add_argument(
"--heartbeat-file",
type=str,
dest="heartbeat_file",
default=None,
help="Heartbeat file used to indicate gProfiler is functioning."
"The file modification indicates the last snapshot time.",
)

args = parser.parse_args()

args.perf_inject = args.nodejs_mode == "perf"
Expand Down Expand Up @@ -1067,6 +1083,10 @@ def main() -> None:
logger.error(f"External metadata file {args.external_metadata} does not exist!")
sys.exit(1)

heartbeat_file_path: Optional[Path] = None
if args.heartbeat_file is not None:
heartbeat_file_path = Path(args.heartbeat_file)

try:
log_system_info()
except Exception:
Expand Down Expand Up @@ -1149,6 +1169,7 @@ def main() -> None:
controller_process=controller_process,
processes_to_profile=processes_to_profile,
external_metadata_path=external_metadata_path,
heartbeat_file_path=heartbeat_file_path,
)
logger.info("gProfiler initialized and ready to start profiling")
if args.continuous:
Expand Down
44 changes: 37 additions & 7 deletions gprofiler/profilers/python_ebpf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import glob
import os
import resource
import selectors
import signal
from pathlib import Path
from subprocess import Popen
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, cast

from granulate_utils.linux.ns import is_running_in_init_pid
from psutil import NoSuchProcess, Process
Expand Down Expand Up @@ -64,7 +65,7 @@ class PythonEbpfProfiler(ProfilerBase):
_DUMP_TIMEOUT = 5 # seconds
_POLL_TIMEOUT = 10 # seconds
_GET_OFFSETS_TIMEOUT = 5 # seconds
_STDERR_READ_SIZE = 65536 # bytes read every cycle from stderr
_OUTPUT_READ_SIZE = 65536 # bytes read every cycle from stderr

def __init__(
self,
Expand All @@ -78,6 +79,7 @@ def __init__(
):
super().__init__(frequency, duration, profiler_state)
self.process: Optional[Popen] = None
self.process_selector: Optional[selectors.BaseSelector] = None
self.output_path = Path(self._profiler_state.storage_dir) / f"pyperf.{random_prefix()}.col"
self.add_versions = add_versions
self.user_stacks_pages = user_stacks_pages
Expand Down Expand Up @@ -213,6 +215,36 @@ def start(self) -> None:
raise
else:
self.process = process
self._register_process_selectors()

def _register_process_selectors(self) -> None:
self.process_selector = selectors.DefaultSelector()
assert self.process_selector and self.process and self.process.stdout and self.process.stderr # for mypy
self.process_selector.register(self.process.stdout, selectors.EVENT_READ)
self.process_selector.register(self.process.stderr, selectors.EVENT_READ)

def _unregister_process_selectors(self) -> None:
assert self.process_selector
self.process_selector.close()
self.process_selector = None

def _read_process_standard_outputs(self) -> Tuple[Optional[str], Optional[str]]:
"""
Read the process standard outputs in a non-blocking manner.
:return: tuple[stdout, stderr]
"""
stdout: Optional[str] = None
stderr: Optional[str] = None
assert self.process_selector and self.process
for key, _ in self.process_selector.select(timeout=0):
output = key.fileobj.read1(self._OUTPUT_READ_SIZE) # type: ignore
output = cast(str, output)
if key.fileobj is self.process.stdout:
stdout = output
elif key.fileobj is self.process.stderr:
stderr = output
return stdout, stderr

def _dump(self) -> Path:
assert self.is_running()
Expand All @@ -224,11 +256,8 @@ def _dump(self) -> Path:
output = wait_for_file_by_prefix(
f"{self.output_path}.", self._DUMP_TIMEOUT, self._profiler_state.stop_event
)
# PyPerf outputs sampling & error counters every interval (after writing the output file), print them.
# also, makes sure its output pipe doesn't fill up.
# using read1() which performs just a single read() call and doesn't read until EOF
# (unlike Popen.communicate())
logger.debug("PyPerf dump output", stderr=self.process.stderr.read1(self._STDERR_READ_SIZE)) # type: ignore
stdout, stderr = self._read_process_standard_outputs()
logger.debug("PyPerf dump output", stdout=stdout, stderr=stderr)
return output
except TimeoutError:
# error flow :(
Expand Down Expand Up @@ -274,6 +303,7 @@ def snapshot(self) -> ProcessToProfileData:
return profiles

def _terminate(self) -> Tuple[Optional[int], str, str]:
self._unregister_process_selectors()
if self.is_running():
assert self.process is not None # for mypy
self.process.terminate() # okay to call even if process is already dead
Expand Down

0 comments on commit 2c2b9dc

Please sign in to comment.