diff --git a/gprofiler/containers_client.py b/gprofiler/containers_client.py index f428383d1..586e93375 100644 --- a/gprofiler/containers_client.py +++ b/gprofiler/containers_client.py @@ -15,8 +15,28 @@ logger = get_logger_adapter(__name__) -class ContainerNamesClient: +class ContainerNamesClientBase: def __init__(self) -> None: + self._current_container_names: Set[str] = set() + + def get_container_name(self, pid: int) -> str: + raise NotImplementedError + + def reset_cache(self) -> None: + self._current_container_names.clear() + + @property + def container_names(self) -> List[str]: + return list(self._current_container_names) + + +class ContainerNamesClient(ContainerNamesClientBase): + """ + Container names are from Docker & CRI containers. + """ + + def __init__(self) -> None: + super().__init__() try: self._containers_client: Optional[ContainersClient] = ContainersClient() logger.info(f"Discovered container runtimes: {self._containers_client.get_runtimes()}") @@ -30,16 +50,11 @@ def __init__(self) -> None: self._containers_client = None self._pid_to_container_name_cache: Dict[int, str] = {} - self._current_container_names: Set[str] = set() self._container_id_to_name_cache: Dict[str, Optional[str]] = {} def reset_cache(self) -> None: + super().reset_cache() self._pid_to_container_name_cache.clear() - self._current_container_names.clear() - - @property - def container_names(self) -> List[str]: - return list(self._current_container_names) def get_container_name(self, pid: int) -> str: if self._containers_client is None: @@ -94,3 +109,34 @@ def _refresh_container_names_cache(self) -> None: self._container_id_to_name_cache.clear() for container in self._containers_client.list_containers() if self._containers_client is not None else []: self._container_id_to_name_cache[container.id] = container.name + + +class SparkContainerNamesClient(ContainerNamesClientBase): + """ + Container names are Spark application ids. + """ + + def get_container_name(self, pid: int) -> str: + if not valid_perf_pid(pid): + return "" + + try: + process = Process(pid) + args = process.cmdline() + except NoSuchProcess: + return "" + + try: + it = iter(args) + for arg in it: + if arg == "--app-id": + name = f"spark: {next(it)}" + self._current_container_names.add(name) + return name + else: + return "" # not found + except StopIteration: + return "" + except Exception: + logger.exception("Unexpected error getting --app-id argument for pid", pid=pid, args=args) + return "" diff --git a/gprofiler/main.py b/gprofiler/main.py index 98a560a53..6ce6ca43a 100644 --- a/gprofiler/main.py +++ b/gprofiler/main.py @@ -36,7 +36,7 @@ ProfilerAPIClient, ) from gprofiler.consts import CPU_PROFILING_MODE -from gprofiler.containers_client import ContainerNamesClient +from gprofiler.containers_client import SparkContainerNamesClient from gprofiler.diagnostics import log_diagnostics, set_diagnostics from gprofiler.exceptions import APIError, NoProfilersEnabledError from gprofiler.gprofiler_types import ProcessToProfileData, UserArgs, integers_list, positive_integer @@ -147,7 +147,8 @@ def __init__( # 2. accessible only by us. # the latter can be root only. the former can not. we should do this separation so we don't expose # files unnecessarily. - container_names_client = ContainerNamesClient() if self._enrichment_options.container_names else None + # container_names_client = ContainerNamesClient() if self._enrichment_options.container_names else None + container_names_client = SparkContainerNamesClient() if self._enrichment_options.container_names else None self._profiler_state = ProfilerState( stop_event=Event(), storage_dir=TEMPORARY_STORAGE_PATH, diff --git a/gprofiler/merge.py b/gprofiler/merge.py index 2a73cf053..862e127c5 100644 --- a/gprofiler/merge.py +++ b/gprofiler/merge.py @@ -11,7 +11,7 @@ from granulate_utils.metadata import Metadata -from gprofiler.containers_client import ContainerNamesClient +from gprofiler.containers_client import ContainerNamesClientBase from gprofiler.gprofiler_types import ProcessToProfileData, ProfileData, ProfilingErrorStack, StackToSampleCount from gprofiler.log import get_logger_adapter from gprofiler.metadata.enrichment import EnrichmentOptions @@ -39,7 +39,7 @@ def scale_sample_counts(stacks: StackToSampleCount, ratio: float) -> StackToSamp def _make_profile_metadata( - container_names_client: Optional[ContainerNamesClient], + container_names_client: Optional[ContainerNamesClientBase], add_container_names: bool, metadata: Metadata, metrics: Metrics, @@ -175,7 +175,7 @@ def concatenate_from_external_file( def concatenate_profiles( process_profiles: ProcessToProfileData, - container_names_client: Optional[ContainerNamesClient], + container_names_client: Optional[ContainerNamesClientBase], enrichment_options: EnrichmentOptions, metadata: Metadata, metrics: Metrics, @@ -211,7 +211,7 @@ def concatenate_profiles( def merge_profiles( perf_pid_to_profiles: ProcessToProfileData, process_profiles: ProcessToProfileData, - container_names_client: Optional[ContainerNamesClient], + container_names_client: Optional[ContainerNamesClientBase], enrichment_options: EnrichmentOptions, metadata: Metadata, metrics: Metrics, diff --git a/gprofiler/profiler_state.py b/gprofiler/profiler_state.py index b919d657b..fde9d4a77 100644 --- a/gprofiler/profiler_state.py +++ b/gprofiler/profiler_state.py @@ -4,7 +4,7 @@ from psutil import Process -from gprofiler.containers_client import ContainerNamesClient +from gprofiler.containers_client import ContainerNamesClientBase from gprofiler.utils import TemporaryDirectoryWithMode @@ -19,7 +19,7 @@ class ProfilerState: profile_spawned_processes: bool insert_dso_name: bool profiling_mode: str - container_names_client: Optional[ContainerNamesClient] + container_names_client: Optional[ContainerNamesClientBase] processes_to_profile: Optional[List[Process]] def __post_init__(self) -> None: