Skip to content

chore: handle non-utf8 tags #1143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions src/macaron/provenance/provenance_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from macaron.config.defaults import defaults
from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.repo_finder.repo_utils import get_repo_tags
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.checks.provenance_available_check import ProvenanceAvailableException
from macaron.slsa_analyzer.ci_service import GitHubActions
Expand Down Expand Up @@ -378,15 +379,10 @@ def find_provenance_from_ci(
if not digest:
logger.debug("Cannot retrieve asset provenance without commit digest.")
return None
tags = git_obj.repo.tags
for _tag in tags:
try:
tag_commit = str(_tag.commit)
except ValueError as error:
logger.debug("Commit of tag is a blob or tree: %s", error)
continue
if tag_commit and tag_commit == digest:
tag = str(_tag)
tags = get_repo_tags(git_obj)
for key, value in tags.items():
if value == digest:
tag = key
break

if not tag:
Expand Down
54 changes: 12 additions & 42 deletions src/macaron/repo_finder/commit_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
from enum import Enum
from re import Pattern

from git import TagReference
from gitdb.exc import BadName
from packageurl import PackageURL
from pydriller import Commit, Git

from macaron.repo_finder import repo_finder_deps_dev, to_domain_from_known_purl_types
from macaron.repo_finder.repo_finder_enums import CommitFinderInfo
from macaron.repo_finder.repo_utils import get_repo_tags
from macaron.slsa_analyzer.git_service import GIT_SERVICES

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -247,27 +247,13 @@ def find_commit_from_version_and_name(git_obj: Git, name: str, version: str) ->
logger.debug("Searching for commit of artifact version using tags: %s@%s", name, version)

# Only consider tags that have a commit.
repo_tags = git_obj.repo.tags
repo_tags = get_repo_tags(git_obj)
if not repo_tags:
logger.debug("No tags found for %s", name)
return None, CommitFinderInfo.NO_TAGS

valid_tags = {}
for tag in repo_tags:
commit = _get_tag_commit(tag)
if not commit:
logger.debug("No commit found for tag: %s", tag)
continue

tag_name = str(tag)
valid_tags[tag_name] = tag

if not valid_tags:
logger.debug("No tags with commits found for %s", name)
return None, CommitFinderInfo.NO_TAGS_WITH_COMMITS

# Match tags.
matched_tags, outcome = match_tags(list(valid_tags.keys()), name, version)
matched_tags, outcome = match_tags(list(repo_tags.keys()), name, version)

if not matched_tags:
logger.debug("No tags matched for %s", name)
Expand All @@ -279,25 +265,21 @@ def find_commit_from_version_and_name(git_obj: Git, name: str, version: str) ->
logger.debug("Up to 5 others: %s", matched_tags[1:6])

tag_name = matched_tags[0]
tag = valid_tags[tag_name]
if not tag:
# Tag names are taken from valid_tags and should always exist within it.
logger.debug("Missing tag name from tag dict: %s not in %s", tag_name, valid_tags.keys())

try:
hexsha = tag.commit.hexsha
except ValueError:
logger.debug("Error trying to retrieve digest of commit: %s", tag.commit)
return None, CommitFinderInfo.NO_TAG_COMMIT
commit = None
if tag_name not in repo_tags:
# Tag names are taken from repo_tags and should always exist within it.
logger.debug("Missing tag name from tag dict: %s not in %s", tag_name, repo_tags.keys())
else:
commit = repo_tags[tag_name]

logger.debug(
"Found tag %s with commit %s for artifact version %s@%s",
tag,
hexsha,
tag_name,
commit,
name,
version,
)
return hexsha if hexsha else None, CommitFinderInfo.MATCHED
return commit if commit else None, CommitFinderInfo.MATCHED


def _split_name(name: str) -> list[str]:
Expand Down Expand Up @@ -907,15 +889,3 @@ def _create_suffix_tag_comparison_pattern(tag_part: str) -> Pattern | None:

# Combine the alphabetic and zero-extended numeric parts.
return re.compile(f"{versioned_string_result.group(1)}(0*){versioned_string_result.group(3)}", re.IGNORECASE)


def _get_tag_commit(tag: TagReference) -> Commit | None:
"""Return the commit of the passed tag.

This is a standalone function to more clearly handle the potential error raised by accessing the tag's commit
property.
"""
try:
return tag.commit
except ValueError:
return None
3 changes: 0 additions & 3 deletions src/macaron/repo_finder/repo_finder_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ class CommitFinderInfo(Enum):
#: Reported if the repository has no Git tags.
NO_TAGS = "No Git tags"

#: Reported if the repository has no Git tags with associated commits.
NO_TAGS_WITH_COMMITS = "No Git tags with commits"

#: Reported if the tag selected from the repository fails to resolve to a commit despite having one associated with
# it.
NO_TAG_COMMIT = "No valid commit found for Git tag"
Expand Down
64 changes: 63 additions & 1 deletion src/macaron/repo_finder/repo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import logging
import os
import string
import subprocess # nosec B404
from urllib.parse import urlparse

from packageurl import PackageURL
from pydriller import Git

from macaron.config.global_config import global_config
from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService
from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService
from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR
from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR, decode_git_tags, parse_git_tags

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -169,3 +171,63 @@ def check_repo_urls_are_equivalent(repo_1: str, repo_2: str) -> bool:
return False

return True


def get_repo_tags(git_obj: Git) -> dict[str, str]:
"""Retrieve the tags of the passed repo.

This will be attempted using the related Pydriller Git function, but will fall back to a Git subprocess for
repositories that contain non utf-8 tags.

Parameters
----------
git_obj: Git
The Git object of the repository.

Returns
-------
dict[str, str]
A dictionary of tags mapped to commits.
"""
tags = None
try:
tags = git_obj.repo.tags
except UnicodeDecodeError as error:
logger.debug("Failed to retrieve tags in utf-8 encoding: %s", error)

if tags:
tag_dict: dict[str, str] = {}
for tag in tags:
try:
tag_commit = str(tag.commit)
except ValueError as error:
logger.debug("Commit of tag is a blob or tree: %s", error)
continue
tag_dict[tag.name] = tag_commit
return tag_dict

# Retrieve tags using a Git subprocess.
repository_path = git_obj.repo.working_tree_dir
if not os.path.isdir(repository_path):
logger.debug("")
return {}
try:
result = subprocess.run( # nosec B603
args=["git", "show-ref", "--tags", "-d"],
capture_output=True,
cwd=repository_path,
check=False,
)
except (subprocess.CalledProcessError, OSError) as error:
logger.debug("Failed to retrieve repository tags: %s", error)
return {}

if result.returncode != 0:
logger.debug("Failed to retrieve repository tags.")
return {}

decoded_data = decode_git_tags(result.stdout)
if not decoded_data:
return {}

return parse_git_tags(decoded_data)
90 changes: 64 additions & 26 deletions src/macaron/slsa_analyzer/git_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from macaron.config.global_config import global_config
from macaron.environment_variables import get_patched_env
from macaron.errors import CloneError, GitTagError
from macaron.util import BytesDecoder

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -412,7 +413,7 @@ def list_remote_references(arguments: list[str], repo: str) -> str | None:
logger.error("Failed to retrieve remote references from repo: %s", repo)
return None

return result.stdout.decode("utf-8")
return decode_git_tags(result.stdout)


def resolve_local_path(start_dir: str, local_path: str) -> str:
Expand Down Expand Up @@ -944,7 +945,7 @@ def is_commit_hash(value: str) -> bool:
return bool(re.match(pattern, value))


def get_tags_via_git_remote(repo: str) -> dict[str, str] | None:
def get_tags_via_git_remote(repo: str) -> dict[str, str]:
"""Retrieve all tags from a given repository using ls-remote.

Parameters
Expand All @@ -955,35 +956,14 @@ def get_tags_via_git_remote(repo: str) -> dict[str, str] | None:
Returns
-------
dict[str]
A dictionary of tags mapped to their commits, or None if the operation failed..
A dictionary of tags mapped to their commits.
"""
tag_data = list_remote_references(["--tags"], repo)
if not tag_data:
return None
tags = {}

for tag_line in tag_data.splitlines():
tag_line = tag_line.strip()
if not tag_line:
continue
split = tag_line.split("\t")
if len(split) != 2:
continue
possible_tag = split[1]
if possible_tag.endswith("^{}"):
possible_tag = possible_tag[:-3]
elif possible_tag in tags:
# If a tag already exists, it must be the annotated reference of an annotated tag.
# In that case we skip the tag as it does not point to the proper source commit.
# Note that this should only happen if the tags are received out of standard order.
continue
possible_tag = possible_tag.replace("refs/tags/", "")
if not possible_tag:
continue
tags[possible_tag] = split[0]
return {}

tags = parse_git_tags(tag_data)
logger.debug("Found %s tags via ls-remote of %s", len(tags), repo)

return tags


Expand Down Expand Up @@ -1055,3 +1035,61 @@ def find_highest_git_tag(tags: set[str]) -> str:
raise GitTagError("No valid version tag found.")

return highest_tag


def parse_git_tags(tag_data: str) -> dict[str, str]:
"""Parse the tags and commits found within the passed data.

Parameters
----------
tag_data: str
The tag data to parse.

Returns
-------
dict[str, str]
A dictionary of tags mapped to commits.
"""
tags = {}
for tag_line in tag_data.splitlines():
tag_line = tag_line.strip()
if not tag_line:
continue
split = re.split("[\t ]", tag_line, maxsplit=1)
if len(split) != 2:
continue
possible_tag = split[1]
if possible_tag.endswith("^{}"):
possible_tag = possible_tag[:-3]
elif possible_tag in tags:
# If a tag already exists, it must be the annotated reference of an annotated tag.
# In that case we skip the tag as it does not point to the proper source commit.
# Note that this should only happen if the tags are received out of standard order.
continue
possible_tag = possible_tag.replace("refs/tags/", "")
if not possible_tag:
continue
tags[possible_tag] = split[0]

return tags


def decode_git_tags(data: bytes) -> str | None:
"""Decode the passed Git tag data.

Parameters
----------
data: bytes
The data to decode.

Returns
-------
str | None
The decoded data, or None if an error occurred.
"""
try:
return data.decode("utf-8")
except UnicodeDecodeError as error:
logger.debug("Error decoding stdout as utf-8: %s", error)
# Try other character encodings.
return BytesDecoder.decode(data)
46 changes: 46 additions & 0 deletions src/macaron/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,49 @@ def copy_file_bulk(file_list: list, src_path: str, target_path: str) -> bool:
return False

return True


class BytesDecoder:
"""This class aims to decode some non-UTF8 bytes to a valid string.

The aim is not to 'correctly' parse the passed data. Only to successfully do so.
It is assumed that an attempt to decode using UTF8 has already failed.
The top 10 most common encodings (after UTF-8) are tried.
"""

# Taken from https://w3techs.com/technologies/overview/character_encoding.
COMMON_ENCODINGS = [
"ISO-8859-1",
"cp1252",
"cp1251",
"euc-jp",
"euc-kr",
"shift_jis",
"gb2312",
"cp1250",
"ISO-8859-2",
"big5",
]

@staticmethod
def decode(data: bytes) -> str | None:
"""Attempt to decode the passed bytes using common encodings.

Parameters
----------
data: bytes
The data to decode.

Returns
-------
str | None
The data as a string if successful, or None.
"""
for encoding in BytesDecoder.COMMON_ENCODINGS:
try:
return data.decode(encoding)
except UnicodeDecodeError:
pass

logger.debug("Failed to decode bytes using most common character encodings.")
return None
Loading
Loading