Skip to content

refactor: remove depends_on fields from heuristics and handle skips in problog #1133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Define and initialize the base analyzer."""
Expand All @@ -18,13 +18,9 @@ def __init__(
self,
name: str,
heuristic: Heuristics,
depends_on: list[tuple[Heuristics, HeuristicResult]] | None,
) -> None:
self.name: str = name
self.heuristic: Heuristics = heuristic
self.depends_on: list[tuple[Heuristics, HeuristicResult]] | None = (
depends_on # Contains the dependent heuristics and the expected result of each heuristic
)

@abstractmethod
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
Expand Down
6 changes: 2 additions & 4 deletions src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ class HeuristicResult(str, Enum):
#: Indicates that suspicious activity was detected.
FAIL = "FAIL"

#: Indicates that the heuristic check could not be performed due to missing metadata.
#: The `SKIP` result occurs when the necessary metadata is not available. This often happens
#: when fetching data through the PyPI API and the relevant data, such as the maintainer's
#: join date or release information, is missing or unavailable.
#: Indicates that this heuristic is not applicable to this package.
#: Please use HeuristicAnalyzerValueError for malformed package data.
SKIP = "SKIP"
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ class AnomalousVersionAnalyzer(BaseHeuristicAnalyzer):
DIGIT_DATE_FORMATS: list[str] = ["%Y%m%d", "%Y%d%m", "%d%m%Y", "%m%d%Y", "%y%m%d", "%y%d%m", "%d%m%y", "%m%d%y"]

def __init__(self) -> None:
super().__init__(
name="anomalous_version_analyzer",
heuristic=Heuristics.ANOMALOUS_VERSION,
depends_on=[(Heuristics.ONE_RELEASE, HeuristicResult.FAIL)],
)
super().__init__(name="anomalous_version_analyzer", heuristic=Heuristics.ANOMALOUS_VERSION)
self.major_threshold, self.epoch_threshold, self.day_publish_error = self._load_defaults()

def _load_defaults(self) -> tuple[int, int, int]:
Expand Down Expand Up @@ -110,13 +106,8 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if len(releases) != 1:
error_msg = (
"This heuristic depends on a single release, but somehow there are multiple when the one release"
+ " heuristic failed."
)
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)
if len(releases) != 1: # We only analyze packages with a single release, this heuristic does not apply.
return HeuristicResult.SKIP, {}

# Since there is only one release, the latest version should be that release
release = pypi_package_json.get_latest_version()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@

"""Analyzer checks whether the maintainers' join date closer to latest package's release date."""

import logging
from datetime import datetime, timedelta

from macaron.config.defaults import defaults
from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.malware_analyzer.datetime_parser import parse_datetime
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry

logger: logging.Logger = logging.getLogger(__name__)


class CloserReleaseJoinDateAnalyzer(BaseHeuristicAnalyzer):
"""Check whether the maintainers' join date closer to package's latest release date.
Expand All @@ -20,9 +24,7 @@ class CloserReleaseJoinDateAnalyzer(BaseHeuristicAnalyzer):
"""

def __init__(self) -> None:
super().__init__(
name="closer_release_join_date_analyzer", heuristic=Heuristics.CLOSER_RELEASE_JOIN_DATE, depends_on=None
)
super().__init__(name="closer_release_join_date_analyzer", heuristic=Heuristics.CLOSER_RELEASE_JOIN_DATE)
self.gap_threshold: int = self._load_defaults()

def _load_defaults(self) -> int:
Expand Down Expand Up @@ -97,17 +99,27 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
maintainers_join_date: list[datetime] | None = self._get_maintainers_join_date(
pypi_package_json.pypi_registry, pypi_package_json.component_name
)
# If there is no maintainer join date information, then it is malformed package metadata
if not maintainers_join_date:
error_msg = "Metadata has no maintainers or join dates for them"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

latest_release_date: datetime | None = self._get_latest_release_date(pypi_package_json)
# Upload time is standardized by PyPI, so if it is not in the expected format then it is
# malformed package metadata
if not latest_release_date:
error_msg = "Unable to parse latest upload time"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

detail_info: dict[str, JsonType] = {
"maintainers_join_date": (
[date.strftime("%Y-%m-%d %H:%M:%S") for date in maintainers_join_date] if maintainers_join_date else []
),
"latest_release_date": latest_release_date.strftime("%Y-%m-%d %H:%M:%S") if latest_release_date else "",
}

if maintainers_join_date is None or latest_release_date is None:
return HeuristicResult.SKIP, detail_info

for date in maintainers_join_date:
difference = abs(latest_release_date - date)
threshold_delta = timedelta(days=self.gap_threshold)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@

"""Analyzer checks there is no project link of the package."""

import logging

from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class EmptyProjectLinkAnalyzer(BaseHeuristicAnalyzer):
"""Check whether the PyPI package has no project links."""

def __init__(self) -> None:
super().__init__(name="empty_project_link_analyzer", heuristic=Heuristics.EMPTY_PROJECT_LINK, depends_on=None)
super().__init__(name="empty_project_link_analyzer", heuristic=Heuristics.EMPTY_PROJECT_LINK)

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.
Expand All @@ -30,10 +34,7 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
"""
project_links = pypi_package_json.get_project_links()

if project_links is None:
return HeuristicResult.FAIL, {}

if len(project_links) == 0: # Total.
if project_links is None or len(project_links) == 0:
return HeuristicResult.FAIL, {}

return HeuristicResult.PASS, {"project_links": project_links}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
)

def __init__(self) -> None:
super().__init__(
name="fake_email_analyzer",
heuristic=Heuristics.FAKE_EMAIL,
depends_on=None,
)
super().__init__(name="fake_email_analyzer", heuristic=Heuristics.FAKE_EMAIL)
self.check_deliverability: bool = self._load_defaults()

def _load_defaults(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime

from macaron.config.defaults import defaults
from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.datetime_parser import parse_datetime
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
Expand All @@ -20,11 +21,7 @@ class HighReleaseFrequencyAnalyzer(BaseHeuristicAnalyzer):
"""Check whether the release frequency is high."""

def __init__(self) -> None:
super().__init__(
name="high_release_frequency_analyzer",
heuristic=Heuristics.HIGH_RELEASE_FREQUENCY,
depends_on=[(Heuristics.ONE_RELEASE, HeuristicResult.PASS)], # Analyzing when this heuristic pass
)
super().__init__(name="high_release_frequency_analyzer", heuristic=Heuristics.HIGH_RELEASE_FREQUENCY)
self.average_gap_threshold: int = self._load_defaults() # Days

def _load_defaults(self) -> int:
Expand All @@ -49,7 +46,13 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
The result and related information collected during the analysis.
"""
version_to_releases: dict | None = pypi_package_json.get_releases()
if version_to_releases is None or len(version_to_releases) == 1:
if version_to_releases is None:
error_msg = "Metadata has no release information"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if len(version_to_releases) == 1:
# We only analyze packages with multiple releases, this heuristic does not apply.
return HeuristicResult.SKIP, {}

extract_data: dict[str, datetime] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@

"""Analyzer checks the packages contain one release."""

import logging

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class OneReleaseAnalyzer(BaseHeuristicAnalyzer):
"""Determine if there is only one release of the package."""

def __init__(self) -> None:
super().__init__(name="one_release_analyzer", heuristic=Heuristics.ONE_RELEASE, depends_on=None)
super().__init__(name="one_release_analyzer", heuristic=Heuristics.ONE_RELEASE)

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.
Expand All @@ -31,7 +36,9 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
"""
releases: dict | None = pypi_package_json.get_releases()
if releases is None:
return HeuristicResult.SKIP, {"releases": {}}
error_msg = "Metadata has no release information"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if len(releases) == 1:
return HeuristicResult.FAIL, {"releases": releases} # Higher false positive, so we keep it MEDIUM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def __init__(self) -> None:
super().__init__(
name="source_code_repo_analyzer",
heuristic=Heuristics.SOURCE_CODE_REPO,
depends_on=[(Heuristics.EMPTY_PROJECT_LINK, HeuristicResult.PASS)],
)

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ class TyposquattingPresenceAnalyzer(BaseHeuristicAnalyzer):
}

def __init__(self, popular_packages_path: str | None = None) -> None:
super().__init__(
name="typosquatting_presence_analyzer", heuristic=Heuristics.TYPOSQUATTING_PRESENCE, depends_on=None
)
super().__init__(name="typosquatting_presence_analyzer", heuristic=Heuristics.TYPOSQUATTING_PRESENCE)
self.default_path = os.path.join(MACARON_PATH, "resources/popular_packages.txt")
if popular_packages_path:
self.default_path = popular_packages_path
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Heuristics analyzer to check unchanged content in multiple releases."""
import logging
from collections import Counter

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType, json_extract
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
Expand All @@ -17,11 +18,7 @@ class UnchangedReleaseAnalyzer(BaseHeuristicAnalyzer):
"""Analyze whether the content of the package is updated by the maintainer."""

def __init__(self) -> None:
super().__init__(
name="unchanged_release_analyzer",
heuristic=Heuristics.UNCHANGED_RELEASE,
depends_on=[(Heuristics.HIGH_RELEASE_FREQUENCY, HeuristicResult.FAIL)],
)
super().__init__(name="unchanged_release_analyzer", heuristic=Heuristics.UNCHANGED_RELEASE)
self.hash_algo: str = "sha256"

def _get_digests(self, pypi_package_json: PyPIPackageJsonAsset) -> list[str] | None:
Expand Down Expand Up @@ -68,6 +65,12 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
"""
digests: list[str] | None = self._get_digests(pypi_package_json)
if digests is None:
error_msg = "Metadata has no digest information"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if len(digests) == 1:
# We only analyze packages with multiple releases, this heuristic does not apply.
return HeuristicResult.SKIP, {}

frequency = Counter(digests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def __init__(self) -> None:
super().__init__(
name="wheel_absence_analyzer",
heuristic=Heuristics.WHEEL_ABSENCE,
depends_on=None,
)

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ def __init__(self, resources_path: str | None = None) -> None:
super().__init__(
name="suspicious_patterns_analyzer",
heuristic=Heuristics.SUSPICIOUS_PATTERNS,
# We include the SKIP condition here as we want to consider the case where EMPTY_PROJECT_LINK fails,
# meaning SOURCE_CODE_REPO is skipped, as this is still a scenario where the source code repository
# is not available, so we want to run source code analysis.
depends_on=[
(Heuristics.SOURCE_CODE_REPO, HeuristicResult.FAIL),
(Heuristics.SOURCE_CODE_REPO, HeuristicResult.SKIP),
],
)
if resources_path is None:
resources_path = global_config.resources_path
Expand Down
Loading
Loading