diff --git a/CHANGELOG.md b/CHANGELOG.md index d4543af9..c90f3352 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,10 +31,10 @@ All versions prior to 0.9.0 are untracked. ### Removed * **BREAKING API CHANGE**: `SigningResult` has been removed. - The public signing APIs now return `sigstore.verify.models.Bundle`. + The public signing APIs now return `sigstore.models.Bundle`. * **BREAKING API CHANGE**: `VerificationMaterials` has been removed. - The public verification APIs now accept `sigstore.verify.models.Bundle`. + The public verification APIs now accept `sigstore.models.Bundle`. * **BREAKING API CHANGE**: `Signer.sign(...)` has been removed. Use either `sign_artifact(...)` or `sign_dsse(...)`, depending on whether @@ -55,7 +55,7 @@ All versions prior to 0.9.0 are untracked. a `Hashed` parameter to convey the digest used for Rekor entry lookup ([#904](https://github.com/sigstore/sigstore-python/pull/904)) -* **BREAKING API CHANGE**: `Verifier.verify(...)` now takes a `sigstore.verify.models.Bundle`, +* **BREAKING API CHANGE**: `Verifier.verify(...)` now takes a `sigstore.models.Bundle`, instead of a `VerificationMaterials` ([#937](https://github.com/sigstore/sigstore-python/pull/937)) * sigstore-python now requires inclusion proofs in all signing and verification @@ -74,6 +74,9 @@ All versions prior to 0.9.0 are untracked. an inclusion proof. Passing `--offline` with detached materials will cause an error ([#937](https://github.com/sigstore/sigstore-python/pull/937)) +* API: `sigstore.transparency` has been removed, and its pre-existing APIs + have been re-homed under `sigstore.models` + ([#990](https://github.com/sigstore/sigstore-python/pull/990)) ## [2.1.5] diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 25008bc4..1835bc29 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -41,6 +41,7 @@ from sigstore._utils import sha256_digest from sigstore.errors import Error, VerificationError from sigstore.hashes import Hashed +from sigstore.models import Bundle from sigstore.oidc import ( DEFAULT_OAUTH_ISSUER_URL, STAGING_OAUTH_ISSUER_URL, @@ -54,7 +55,6 @@ Verifier, policy, ) -from sigstore.verify.models import Bundle logging.basicConfig(format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]) _logger = logging.getLogger(__name__) diff --git a/sigstore/_internal/merkle.py b/sigstore/_internal/merkle.py index 66daf370..b930c7de 100644 --- a/sigstore/_internal/merkle.py +++ b/sigstore/_internal/merkle.py @@ -33,7 +33,7 @@ from sigstore.errors import VerificationError if typing.TYPE_CHECKING: - from sigstore.transparency import LogEntry + from sigstore.models import LogEntry _LEAF_HASH_PREFIX = 0 diff --git a/sigstore/_internal/rekor/__init__.py b/sigstore/_internal/rekor/__init__.py index ab555a60..7c1d3e36 100644 --- a/sigstore/_internal/rekor/__init__.py +++ b/sigstore/_internal/rekor/__init__.py @@ -24,12 +24,7 @@ from sigstore._utils import base64_encode_pem_cert from sigstore.hashes import Hashed -from .checkpoint import SignedCheckpoint -from .client import RekorClient - __all__ = [ - "RekorClient", - "SignedCheckpoint", "_hashedrekord_from_parts", ] diff --git a/sigstore/_internal/rekor/checkpoint.py b/sigstore/_internal/rekor/checkpoint.py index cef9c55c..c49ae67b 100644 --- a/sigstore/_internal/rekor/checkpoint.py +++ b/sigstore/_internal/rekor/checkpoint.py @@ -32,7 +32,7 @@ from sigstore.errors import VerificationError if typing.TYPE_CHECKING: - from sigstore.transparency import LogEntry + from sigstore.models import LogEntry @dataclass(frozen=True) diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index 16233e3d..a690938c 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -28,7 +28,7 @@ import rekor_types import requests -from sigstore.transparency import LogEntry +from sigstore.models import LogEntry _logger = logging.getLogger(__name__) diff --git a/sigstore/verify/models.py b/sigstore/models.py similarity index 51% rename from sigstore/verify/models.py rename to sigstore/models.py index 4c09b46d..cb7f6025 100644 --- a/sigstore/verify/models.py +++ b/sigstore/models.py @@ -13,20 +13,35 @@ # limitations under the License. """ -Common (base) models for the verification APIs. +Common models shared between signing and verification. """ from __future__ import annotations import base64 import logging +import typing from textwrap import dedent +from typing import Any, List, Optional +import rfc8785 from cryptography.hazmat.primitives.serialization import Encoding from cryptography.x509 import ( Certificate, load_der_x509_certificate, ) +from pydantic import ( + BaseModel, + ConfigDict, + Field, + StrictInt, + StrictStr, + TypeAdapter, + ValidationInfo, + field_validator, +) +from pydantic.dataclasses import dataclass +from rekor_types import Dsse, Hashedrekord, ProposedEntry from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_v1 from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( Bundle as _Bundle, @@ -34,23 +49,281 @@ from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_v1 from sigstore_protobuf_specs.dev.sigstore.rekor import v1 as rekor_v1 from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import ( - InclusionPromise, InclusionProof, ) from sigstore import dsse +from sigstore._internal.merkle import verify_merkle_inclusion +from sigstore._internal.rekor.checkpoint import verify_checkpoint from sigstore._utils import ( B64Str, BundleType, + KeyID, cert_is_leaf, cert_is_root_ca, ) -from sigstore.errors import Error -from sigstore.transparency import LogEntry, LogInclusionProof +from sigstore.errors import Error, VerificationError + +if typing.TYPE_CHECKING: + from sigstore._internal.trustroot import RekorKeyring + _logger = logging.getLogger(__name__) +class LogInclusionProof(BaseModel): + """ + Represents an inclusion proof for a transparency log entry. + """ + + model_config = ConfigDict(populate_by_name=True) + + checkpoint: StrictStr = Field(..., alias="checkpoint") + hashes: List[StrictStr] = Field(..., alias="hashes") + log_index: StrictInt = Field(..., alias="logIndex") + root_hash: StrictStr = Field(..., alias="rootHash") + tree_size: StrictInt = Field(..., alias="treeSize") + + @field_validator("log_index") + def _log_index_positive(cls, v: int) -> int: + if v < 0: + raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") + return v + + @field_validator("tree_size") + def _tree_size_positive(cls, v: int) -> int: + if v < 0: + raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") + return v + + @field_validator("tree_size") + def _log_index_within_tree_size( + cls, v: int, info: ValidationInfo, **kwargs: Any + ) -> int: + if "log_index" in info.data and v <= info.data["log_index"]: + raise ValueError( + "Inclusion proof has log index greater than or equal to tree size: " + f"{v} <= {info.data['log_index']}" + ) + return v + + +@dataclass(frozen=True) +class LogEntry: + """ + Represents a transparency log entry. + + Log entries are retrieved from the transparency log after signing or verification events, + or loaded from "Sigstore" bundles provided by the user. + + This representation allows for either a missing inclusion promise or a missing + inclusion proof, but not both: attempting to construct a `LogEntry` without + at least one will fail. + """ + + uuid: Optional[str] + """ + This entry's unique ID in the log instance it was retrieved from. + + For sharded log deployments, IDs are unique per-shard. + + Not present for `LogEntry` instances loaded from Sigstore bundles. + """ + + body: B64Str + """ + The base64-encoded body of the transparency log entry. + """ + + integrated_time: int + """ + The UNIX time at which this entry was integrated into the transparency log. + """ + + log_id: str + """ + The log's ID (as the SHA256 hash of the DER-encoded public key for the log + at the time of entry inclusion). + """ + + log_index: int + """ + The index of this entry within the log. + """ + + inclusion_proof: LogInclusionProof + """ + An inclusion proof for this log entry. + """ + + inclusion_promise: Optional[B64Str] + """ + An inclusion promise for this log entry, if present. + + Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this + log entry. + """ + + @classmethod + def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: + """ + Create a new `LogEntry` from the given API response. + """ + + # Assumes we only get one entry back + entries = list(dict_.items()) + if len(entries) != 1: + raise ValueError("Received multiple entries in response") + + uuid, entry = entries[0] + return LogEntry( + uuid=uuid, + body=entry["body"], + integrated_time=entry["integratedTime"], + log_id=entry["logID"], + log_index=entry["logIndex"], + inclusion_proof=LogInclusionProof.model_validate( + entry["verification"]["inclusionProof"] + ), + inclusion_promise=entry["verification"]["signedEntryTimestamp"], + ) + + @classmethod + def _from_dict_rekor(cls, dict_: dict[str, Any]) -> LogEntry: + """ + Create a new `LogEntry` from the given Rekor TransparencyLogEntry. + """ + tlog_entry = rekor_v1.TransparencyLogEntry() + tlog_entry.from_dict(dict_) + + inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof + # This check is required by us as the client, not the + # protobuf-specs themselves. + if inclusion_proof is None or inclusion_proof.checkpoint.envelope is None: + raise InvalidBundle("entry must contain inclusion proof") + + parsed_inclusion_proof = LogInclusionProof( + checkpoint=inclusion_proof.checkpoint.envelope, + hashes=[h.hex() for h in inclusion_proof.hashes], + log_index=inclusion_proof.log_index, + root_hash=inclusion_proof.root_hash.hex(), + tree_size=inclusion_proof.tree_size, + ) + + return LogEntry( + uuid=None, + body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), + integrated_time=tlog_entry.integrated_time, + log_id=tlog_entry.log_id.key_id.hex(), + log_index=tlog_entry.log_index, + inclusion_proof=parsed_inclusion_proof, + inclusion_promise=B64Str( + base64.b64encode( + tlog_entry.inclusion_promise.signed_entry_timestamp + ).decode() + ), + ) + + def _to_dict_rekor(self) -> dict[str, Any]: + inclusion_promise: rekor_v1.InclusionPromise | None = None + if self.inclusion_promise: + inclusion_promise = rekor_v1.InclusionPromise( + signed_entry_timestamp=base64.b64decode(self.inclusion_promise) + ) + + inclusion_proof = rekor_v1.InclusionProof( + log_index=self.inclusion_proof.log_index, + root_hash=bytes.fromhex(self.inclusion_proof.root_hash), + tree_size=self.inclusion_proof.tree_size, + hashes=[bytes.fromhex(hash_) for hash_ in self.inclusion_proof.hashes], + checkpoint=rekor_v1.Checkpoint(envelope=self.inclusion_proof.checkpoint), + ) + + tlog_entry = rekor_v1.TransparencyLogEntry( + log_index=self.log_index, + log_id=common_v1.LogId(key_id=bytes.fromhex(self.log_id)), + integrated_time=self.integrated_time, + inclusion_promise=inclusion_promise, + inclusion_proof=inclusion_proof, + canonicalized_body=base64.b64decode(self.body), + ) + + # Fill in the appropriate kind + body_entry = TypeAdapter(ProposedEntry).validate_json( + tlog_entry.canonicalized_body + ) + if not isinstance(body_entry, (Hashedrekord, Dsse)): + raise InvalidBundle("log entry is not of expected type") + + tlog_entry.kind_version = rekor_v1.KindVersion( + kind=body_entry.kind, version=body_entry.api_version + ) + + tlog_entry_dict: dict[str, Any] = tlog_entry.to_dict() + return tlog_entry_dict + + def encode_canonical(self) -> bytes: + """ + Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. + + This encoded representation is suitable for verification against + the Signed Entry Timestamp. + """ + payload: dict[str, int | str] = { + "body": self.body, + "integratedTime": self.integrated_time, + "logID": self.log_id, + "logIndex": self.log_index, + } + + return rfc8785.dumps(payload) + + def _verify_set(self, keyring: RekorKeyring) -> None: + """ + Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log + `entry` using the given `keyring`. + + Fails if the given log entry does not contain an inclusion promise. + """ + + if self.inclusion_promise is None: + raise VerificationError("SET: invalid inclusion promise: missing") + + signed_entry_ts = base64.b64decode(self.inclusion_promise) + + try: + keyring.verify( + key_id=KeyID(bytes.fromhex(self.log_id)), + signature=signed_entry_ts, + data=self.encode_canonical(), + ) + except VerificationError as exc: + raise VerificationError(f"SET: invalid inclusion promise: {exc}") + + def _verify(self, keyring: RekorKeyring) -> None: + """ + Verifies this log entry. + + This method performs steps (5), (6), and optionally (7) in + the top-level verify API: + + * Verifies the consistency of the entry with the given bundle; + * Verifies the Merkle inclusion proof and its signed checkpoint; + * Verifies the inclusion promise, if present. + """ + + verify_merkle_inclusion(self) + verify_checkpoint(keyring, self) + + _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") + + if self.inclusion_promise: + self._verify_set(keyring) + _logger.debug( + f"successfully verified inclusion promise: index={self.log_index}" + ) + + class InvalidBundle(Error): """ Raised when the associated `Bundle` is invalid in some way. @@ -72,20 +345,6 @@ def diagnostics(self) -> str: ) -class InvalidRekorEntry(InvalidBundle): - """ - Raised if the effective Rekor entry in `VerificationMaterials.rekor_entry()` - does not match the other materials in `VerificationMaterials`. - - This can only happen in two scenarios: - - * A user has supplied the wrong offline entry, potentially maliciously; - * The Rekor log responded with the wrong entry, suggesting a server error. - """ - - pass - - class Bundle: """ Represents a Sigstore bundle. @@ -182,55 +441,22 @@ def _verify_bundle(self) -> None: # The inclusion promise is NOT required; if present, the client # SHOULD verify it. # - # Beneath all of this, we require that the inclusion proof be present. - inclusion_promise: InclusionPromise | None = tlog_entry.inclusion_promise - inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof + # Before all of this, we require that the inclusion proof be present + # (when constructing the LogEntry). + log_entry = LogEntry._from_dict_rekor(tlog_entry.to_dict()) + if media_type == BundleType.BUNDLE_0_1: - if not inclusion_promise: + if not log_entry.inclusion_promise: raise InvalidBundle("bundle must contain an inclusion promise") - if inclusion_proof and not inclusion_proof.checkpoint.envelope: + if not log_entry.inclusion_proof.checkpoint: _logger.debug( "0.1 bundle contains inclusion proof without checkpoint; ignoring" ) else: - if not inclusion_proof: - raise InvalidBundle("bundle must contain an inclusion proof") - if not inclusion_proof.checkpoint.envelope: + if not log_entry.inclusion_proof.checkpoint: raise InvalidBundle("expected checkpoint in inclusion proof") - parsed_inclusion_proof: InclusionProof | None = None - if ( - inclusion_proof is not None - and inclusion_proof.checkpoint.envelope is not None - ): - parsed_inclusion_proof = LogInclusionProof( - checkpoint=inclusion_proof.checkpoint.envelope, - hashes=[h.hex() for h in inclusion_proof.hashes], - log_index=inclusion_proof.log_index, - root_hash=inclusion_proof.root_hash.hex(), - tree_size=inclusion_proof.tree_size, - ) - - # Sanity: the only way we can hit this is with a v1 bundle without - # an inclusion proof. Putting this check here rather than above makes - # it clear that this check is required by us as the client, not the - # protobuf-specs themselves. - if parsed_inclusion_proof is None: - raise InvalidBundle("bundle must contain inclusion proof") - - self._log_entry = LogEntry( - uuid=None, - body=B64Str(base64.b64encode(tlog_entry.canonicalized_body).decode()), - integrated_time=tlog_entry.integrated_time, - log_id=tlog_entry.log_id.key_id.hex(), - log_index=tlog_entry.log_index, - inclusion_proof=parsed_inclusion_proof, - inclusion_promise=B64Str( - base64.b64encode( - tlog_entry.inclusion_promise.signed_entry_timestamp - ).decode() - ), - ) + self._log_entry = log_entry @property def signing_certificate(self) -> Certificate: @@ -292,30 +518,6 @@ def _from_parts( """ @private """ - inclusion_promise: rekor_v1.InclusionPromise | None = None - if log_entry.inclusion_promise: - inclusion_promise = rekor_v1.InclusionPromise( - signed_entry_timestamp=base64.b64decode(log_entry.inclusion_promise) - ) - - inclusion_proof = rekor_v1.InclusionProof( - log_index=log_entry.inclusion_proof.log_index, - root_hash=bytes.fromhex(log_entry.inclusion_proof.root_hash), - tree_size=log_entry.inclusion_proof.tree_size, - hashes=[bytes.fromhex(hash_) for hash_ in log_entry.inclusion_proof.hashes], - checkpoint=rekor_v1.Checkpoint( - envelope=log_entry.inclusion_proof.checkpoint - ), - ) - - tlog_entry = rekor_v1.TransparencyLogEntry( - log_index=log_entry.log_index, - log_id=common_v1.LogId(key_id=bytes.fromhex(log_entry.log_id)), - integrated_time=log_entry.integrated_time, - inclusion_promise=inclusion_promise, - inclusion_proof=inclusion_proof, - canonicalized_body=base64.b64decode(log_entry.body), - ) inner = _Bundle( media_type=BundleType.BUNDLE_0_3.value, @@ -327,13 +529,11 @@ def _from_parts( # Fill in the appropriate variants. if isinstance(content, common_v1.MessageSignature): inner.message_signature = content - tlog_entry.kind_version = rekor_v1.KindVersion( - kind="hashedrekord", version="0.0.1" - ) else: inner.dsse_envelope = content._inner - tlog_entry.kind_version = rekor_v1.KindVersion(kind="dsse", version="0.0.1") + tlog_entry = rekor_v1.TransparencyLogEntry() + tlog_entry.from_dict(log_entry._to_dict_rekor()) inner.verification_material.tlog_entries = [tlog_entry] return cls(inner) diff --git a/sigstore/sign.py b/sigstore/sign.py index 1967a62c..d7d3a21f 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -64,8 +64,8 @@ from sigstore._internal.sct import verify_sct from sigstore._internal.trustroot import KeyringPurpose, TrustedRoot from sigstore._utils import sha256_digest +from sigstore.models import Bundle from sigstore.oidc import ExpiredIdentity, IdentityToken -from sigstore.verify.models import Bundle _logger = logging.getLogger(__name__) diff --git a/sigstore/transparency.py b/sigstore/transparency.py deleted file mode 100644 index f4040e7a..00000000 --- a/sigstore/transparency.py +++ /dev/null @@ -1,226 +0,0 @@ -# Copyright 2022 The Sigstore Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Transparency log data structures. -""" - -from __future__ import annotations - -import base64 -import logging -import typing -from typing import Any, List, Optional - -import rfc8785 -from pydantic import ( - BaseModel, - ConfigDict, - Field, - StrictInt, - StrictStr, - ValidationInfo, - field_validator, -) -from pydantic.dataclasses import dataclass - -from sigstore._internal.merkle import verify_merkle_inclusion -from sigstore._internal.rekor.checkpoint import verify_checkpoint -from sigstore._utils import B64Str, KeyID -from sigstore.errors import VerificationError - -if typing.TYPE_CHECKING: - from sigstore._internal.trustroot import RekorKeyring - - -_logger = logging.getLogger(__name__) - - -class LogInclusionProof(BaseModel): - """ - Represents an inclusion proof for a transparency log entry. - """ - - model_config = ConfigDict(populate_by_name=True) - - checkpoint: StrictStr = Field(..., alias="checkpoint") - hashes: List[StrictStr] = Field(..., alias="hashes") - log_index: StrictInt = Field(..., alias="logIndex") - root_hash: StrictStr = Field(..., alias="rootHash") - tree_size: StrictInt = Field(..., alias="treeSize") - - @field_validator("log_index") - def _log_index_positive(cls, v: int) -> int: - if v < 0: - raise ValueError(f"Inclusion proof has invalid log index: {v} < 0") - return v - - @field_validator("tree_size") - def _tree_size_positive(cls, v: int) -> int: - if v < 0: - raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0") - return v - - @field_validator("tree_size") - def _log_index_within_tree_size( - cls, v: int, info: ValidationInfo, **kwargs: Any - ) -> int: - if "log_index" in info.data and v <= info.data["log_index"]: - raise ValueError( - "Inclusion proof has log index greater than or equal to tree size: " - f"{v} <= {info.data['log_index']}" - ) - return v - - -@dataclass(frozen=True) -class LogEntry: - """ - Represents a transparency log entry. - - Log entries are retrieved from the transparency log after signing or verification events, - or loaded from "Sigstore" bundles provided by the user. - - This representation allows for either a missing inclusion promise or a missing - inclusion proof, but not both: attempting to construct a `LogEntry` without - at least one will fail. - """ - - uuid: Optional[str] - """ - This entry's unique ID in the log instance it was retrieved from. - - For sharded log deployments, IDs are unique per-shard. - - Not present for `LogEntry` instances loaded from Sigstore bundles. - """ - - body: B64Str - """ - The base64-encoded body of the transparency log entry. - """ - - integrated_time: int - """ - The UNIX time at which this entry was integrated into the transparency log. - """ - - log_id: str - """ - The log's ID (as the SHA256 hash of the DER-encoded public key for the log - at the time of entry inclusion). - """ - - log_index: int - """ - The index of this entry within the log. - """ - - inclusion_proof: LogInclusionProof - """ - An inclusion proof for this log entry. - """ - - inclusion_promise: Optional[B64Str] - """ - An inclusion promise for this log entry, if present. - - Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this - log entry. - """ - - @classmethod - def _from_response(cls, dict_: dict[str, Any]) -> LogEntry: - """ - Create a new `LogEntry` from the given API response. - """ - - # Assumes we only get one entry back - entries = list(dict_.items()) - if len(entries) != 1: - raise ValueError("Received multiple entries in response") - - uuid, entry = entries[0] - return LogEntry( - uuid=uuid, - body=entry["body"], - integrated_time=entry["integratedTime"], - log_id=entry["logID"], - log_index=entry["logIndex"], - inclusion_proof=LogInclusionProof.model_validate( - entry["verification"]["inclusionProof"] - ), - inclusion_promise=entry["verification"]["signedEntryTimestamp"], - ) - - def encode_canonical(self) -> bytes: - """ - Returns a canonicalized JSON (RFC 8785) representation of the transparency log entry. - - This encoded representation is suitable for verification against - the Signed Entry Timestamp. - """ - payload: dict[str, int | str] = { - "body": self.body, - "integratedTime": self.integrated_time, - "logID": self.log_id, - "logIndex": self.log_index, - } - - return rfc8785.dumps(payload) - - def _verify_set(self, keyring: RekorKeyring) -> None: - """ - Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log - `entry` using the given `keyring`. - - Fails if the given log entry does not contain an inclusion promise. - """ - - if self.inclusion_promise is None: - raise VerificationError("SET: invalid inclusion promise: missing") - - signed_entry_ts = base64.b64decode(self.inclusion_promise) - - try: - keyring.verify( - key_id=KeyID(bytes.fromhex(self.log_id)), - signature=signed_entry_ts, - data=self.encode_canonical(), - ) - except VerificationError as exc: - raise VerificationError(f"SET: invalid inclusion promise: {exc}") - - def _verify(self, keyring: RekorKeyring) -> None: - """ - Verifies this log entry. - - This method performs steps (5), (6), and optionally (7) in - the top-level verify API: - - * Verifies the consistency of the entry with the given bundle; - * Verifies the Merkle inclusion proof and its signed checkpoint; - * Verifies the inclusion promise, if present. - """ - - verify_merkle_inclusion(self) - verify_checkpoint(keyring, self) - - _logger.debug(f"successfully verified inclusion proof: index={self.log_index}") - - if self.inclusion_promise: - self._verify_set(keyring) - _logger.debug( - f"successfully verified inclusion promise: index={self.log_index}" - ) diff --git a/sigstore/verify/__init__.py b/sigstore/verify/__init__.py index 875f4b07..3a1c01ee 100644 --- a/sigstore/verify/__init__.py +++ b/sigstore/verify/__init__.py @@ -20,7 +20,8 @@ import base64 from pathlib import Path -from sigstore.verify import Bundle, Verifier +from sigstore.models import Bundle +from sigstore.verify import Verifier from sigstore.verify.policy import Identity # The input to verify @@ -42,13 +43,10 @@ ``` """ -from sigstore.verify.models import Bundle from sigstore.verify.verifier import Verifier __all__ = [ - "Bundle", "Verifier", "policy", - "models", "verifier", ] diff --git a/sigstore/verify/verifier.py b/sigstore/verify/verifier.py index 6e0be2a0..bdf48b38 100644 --- a/sigstore/verify/verifier.py +++ b/sigstore/verify/verifier.py @@ -48,7 +48,7 @@ from sigstore._utils import base64_encode_pem_cert, sha256_digest from sigstore.errors import VerificationError from sigstore.hashes import Hashed -from sigstore.verify.models import Bundle +from sigstore.models import Bundle from sigstore.verify.policy import VerificationPolicy _logger = logging.getLogger(__name__) diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 0970f558..4d80d080 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -38,9 +38,9 @@ from sigstore._internal.rekor import _hashedrekord_from_parts from sigstore._internal.rekor.client import RekorClient from sigstore._utils import sha256_digest +from sigstore.models import Bundle from sigstore.oidc import _DEFAULT_AUDIENCE, IdentityToken from sigstore.sign import SigningContext -from sigstore.verify.models import Bundle from sigstore.verify.verifier import Verifier _ASSETS = (Path(__file__).parent / "assets").resolve() diff --git a/test/unit/test_transparency.py b/test/unit/test_models.py similarity index 60% rename from test/unit/test_transparency.py rename to test/unit/test_models.py index 82aa9e1b..93b0d0f1 100644 --- a/test/unit/test_transparency.py +++ b/test/unit/test_models.py @@ -12,12 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from base64 import b64encode import pytest from pydantic import ValidationError -from sigstore.transparency import LogEntry, LogInclusionProof +from sigstore.models import Bundle, InvalidBundle, LogEntry, LogInclusionProof class TestLogEntry: @@ -33,6 +34,14 @@ def test_missing_inclusion_proof(self): inclusion_promise=None, ) + def test_logentry_roundtrip(self, signing_bundle): + _, bundle = signing_bundle("bundle.txt") + + assert ( + LogEntry._from_dict_rekor(bundle.log_entry._to_dict_rekor()) + == bundle.log_entry + ) + class TestLogInclusionProof: def test_valid(self): @@ -77,3 +86,40 @@ def test_checkpoint_missing(self): tree_size=100, ), ) + + +class TestBundle: + """ + Tests for the `Bundle` wrapper model. + """ + + def test_invalid_bundle_version(self, signing_bundle): + with pytest.raises(InvalidBundle, match="unsupported bundle format"): + signing_bundle("bundle_invalid_version.txt") + + def test_invalid_empty_cert_chain(self, signing_bundle): + with pytest.raises( + InvalidBundle, match="expected non-empty certificate chain in bundle" + ): + signing_bundle("bundle_no_cert_v1.txt") + + def test_invalid_no_log_entry(self, signing_bundle): + with pytest.raises( + InvalidBundle, match="expected exactly one log entry in bundle" + ): + signing_bundle("bundle_no_log_entry.txt") + + def test_verification_materials_offline_no_checkpoint(self, signing_bundle): + with pytest.raises( + InvalidBundle, match="expected checkpoint in inclusion proof" + ): + signing_bundle("bundle_no_checkpoint.txt") + + def test_bundle_roundtrip(self, signing_bundle): + _, bundle = signing_bundle("bundle.txt") + + # Bundles are not directly comparable, but a round-trip preserves their + # underlying object structure. + assert json.loads(Bundle.from_json(bundle.to_json()).to_json()) == json.loads( + bundle.to_json() + ) diff --git a/test/unit/verify/test_models.py b/test/unit/verify/test_models.py deleted file mode 100644 index 7cf3efdc..00000000 --- a/test/unit/verify/test_models.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2022 The Sigstore Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json - -import pytest - -from sigstore.verify.models import Bundle, InvalidBundle - - -class TestBundle: - """ - Tests for the `Bundle` wrapper model. - """ - - def test_invalid_bundle_version(self, signing_bundle): - with pytest.raises(InvalidBundle, match="unsupported bundle format"): - signing_bundle("bundle_invalid_version.txt") - - def test_invalid_empty_cert_chain(self, signing_bundle): - with pytest.raises( - InvalidBundle, match="expected non-empty certificate chain in bundle" - ): - signing_bundle("bundle_no_cert_v1.txt") - - def test_invalid_no_log_entry(self, signing_bundle): - with pytest.raises( - InvalidBundle, match="expected exactly one log entry in bundle" - ): - signing_bundle("bundle_no_log_entry.txt") - - def test_verification_materials_offline_no_checkpoint(self, signing_bundle): - with pytest.raises( - InvalidBundle, match="expected checkpoint in inclusion proof" - ): - signing_bundle("bundle_no_checkpoint.txt") - - def test_bundle_roundtrip(self, signing_bundle): - _, bundle = signing_bundle("bundle.txt") - - # Bundles are not directly comparable, but a round-trip preserves their - # underlying object structure. - assert json.loads(Bundle.from_json(bundle.to_json()).to_json()) == json.loads( - bundle.to_json() - ) diff --git a/test/unit/verify/test_verifier.py b/test/unit/verify/test_verifier.py index 2e675f59..1d4b99e5 100644 --- a/test/unit/verify/test_verifier.py +++ b/test/unit/verify/test_verifier.py @@ -20,8 +20,8 @@ from sigstore.dsse import _StatementBuilder, _Subject from sigstore.errors import VerificationError +from sigstore.models import Bundle from sigstore.verify import policy -from sigstore.verify.models import Bundle from sigstore.verify.verifier import Verifier