Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyproject: bump sigstore-protobuf-specs #705

Merged
merged 18 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 9 additions & 20 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "A tool for signing Python package distributions"
readme = "README.md"
license = { file = "LICENSE" }
authors = [
{ name = "Sigstore Authors", email = "[email protected]" }
{ name = "Sigstore Authors", email = "[email protected]" },
]
classifiers = [
"License :: OSI Approved :: Apache Software License",
Expand All @@ -30,12 +30,12 @@ dependencies = [
"cryptography >= 39",
"id >= 1.0.0",
"importlib_resources ~= 5.7; python_version < '3.11'",
"pydantic ~= 1.10",
"pydantic >= 2,< 3",
"pyjwt >= 2.1",
"pyOpenSSL >= 23.0.0",
"requests",
"securesystemslib",
"sigstore-protobuf-specs ~= 0.1.0",
"sigstore-protobuf-specs ~= 0.2.0",
"tuf >= 2.1,< 4.0",
]
requires-python = ">=3.7"
Expand All @@ -50,12 +50,7 @@ Source = "https://github.com/sigstore/sigstore-python"
Documentation = "https://sigstore.github.io/sigstore-python/"

[project.optional-dependencies]
test = [
"pytest",
"pytest-cov",
"pretend",
"coverage[toml]",
]
test = ["pytest", "pytest-cov", "pretend", "coverage[toml]"]
lint = [
"bandit",
"black",
Expand All @@ -72,14 +67,8 @@ lint = [
# See: https://github.com/python/typeshed/issues/8699
# "types-pyOpenSSL",
]
doc = [
"pdoc",
]
dev = [
"build",
"bump >= 1.3.2",
"sigstore[doc,test,lint]",
]
doc = ["pdoc"]
dev = ["build", "bump >= 1.3.2", "sigstore[doc,test,lint]"]

[tool.isort]
multi_line_output = 3
Expand All @@ -100,9 +89,9 @@ omit = ["sigstore/_cli.py"]

[tool.coverage.report]
exclude_lines = [
"@abc.abstractmethod",
"@typing.overload",
"if typing.TYPE_CHECKING",
"@abc.abstractmethod",
"@typing.overload",
"if typing.TYPE_CHECKING",
]

[tool.interrogate]
Expand Down
10 changes: 6 additions & 4 deletions sigstore/_internal/fulcio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
SignedCertificateTimestamp,
Version,
)
from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, ConfigDict, Field, validator

from sigstore._utils import B64Str
from sigstore.oidc import IdentityToken
Expand Down Expand Up @@ -96,15 +96,17 @@ class DetachedFulcioSCT(BaseModel):
Represents a "detached" SignedCertificateTimestamp from Fulcio.
"""

model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)

version: Version = Field(..., alias="sct_version")
log_id: bytes = Field(..., alias="id")
timestamp: datetime.datetime
digitally_signed: bytes = Field(..., alias="signature")
extension_bytes: bytes = Field(..., alias="extensions")

class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
@validator("timestamp")
def _validate_timestamp(cls, v: datetime.datetime) -> datetime.datetime:
return v.replace(tzinfo=datetime.timezone.utc)

@validator("digitally_signed", pre=True)
def _validate_digitally_signed(cls, v: bytes) -> bytes:
Expand Down
7 changes: 6 additions & 1 deletion sigstore/_internal/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ class InvalidSETError(Exception):

def verify_set(client: RekorClient, entry: LogEntry) -> None:
"""
Verify the Signed Entry Timestamp for a given Rekor `entry` using the given `client`.
Verify the inclusion promise (Signed Entry Timestamp) for a given transparency log
`entry` using the given `client`.

Fails if the given log entry does not contain an inclusion promise.
"""
if entry.inclusion_promise is None:
raise InvalidSETError("invalid log entry: no inclusion promise")

signed_entry_ts = base64.b64decode(entry.inclusion_promise)

Expand Down
6 changes: 4 additions & 2 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,9 @@ def _to_bundle(self) -> Bundle:
signed_entry_timestamp=base64.b64decode(
self.log_entry.inclusion_promise
)
),
)
if self.log_entry.inclusion_promise
else None,
inclusion_proof=inclusion_proof,
canonicalized_body=base64.b64decode(self.log_entry.body),
)
Expand All @@ -364,7 +366,7 @@ def _to_bundle(self) -> Bundle:
)

bundle = Bundle(
media_type="application/vnd.dev.sigstore.bundle+json;version=0.1",
media_type="application/vnd.dev.sigstore.bundle+json;version=0.2",
verification_material=material,
message_signature=MessageSignature(
message_digest=HashOutput(
Expand Down
103 changes: 60 additions & 43 deletions sigstore/transparency.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,70 @@

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field, StrictInt, StrictStr, validator
from pydantic import (
BaseModel,
ConfigDict,
Field,
StrictInt,
StrictStr,
validator,
)
from pydantic.dataclasses import dataclass
from securesystemslib.formats import encode_canonical

from sigstore._utils import B64Str


class LogInclusionProof(BaseModel):
"""
Represents an inclusion proof for a transparency log entry.
"""

model_config = ConfigDict(populate_by_name=True)

checkpoint: StrictStr = Field(..., alias="checkpoint")
hashes: List[StrictStr] = Field(..., alias="hashes")
log_index: StrictInt = Field(..., alias="logIndex")
root_hash: StrictStr = Field(..., alias="rootHash")
tree_size: StrictInt = Field(..., alias="treeSize")

@validator("log_index")
def _log_index_positive(cls, v: int) -> int:
if v < 0:
raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
return v

@validator("tree_size")
def _tree_size_positive(cls, v: int) -> int:
if v < 0:
raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
return v

@validator("tree_size")
def _log_index_within_tree_size(
cls, v: int, values: Dict[str, Any], **kwargs: Any
) -> int:
if "log_index" in values and v <= values["log_index"]:
raise ValueError(
"Inclusion proof has log index greater than or equal to tree size: "
f"{v} <= {values['log_index']}"
)
return v


@dataclass(frozen=True)
class LogEntry:
"""
Represents a transparency log entry.

Log entries are retrieved from the transparency log after signing or verification events,
or loaded from "Sigstore" bundles provided by the user.

This representation allows for either a missing inclusion promise or a missing
inclusion proof, but not both: attempting to construct a `LogEntry` without
at least one will fail.
"""

uuid: Optional[str]
Expand Down Expand Up @@ -68,17 +116,24 @@ class LogEntry:

inclusion_proof: Optional[LogInclusionProof]
"""
An optional inclusion proof for this log entry.
An inclusion proof for this log entry, if present.
"""

inclusion_promise: B64Str
inclusion_promise: Optional[B64Str]
"""
An inclusion promise for this log entry.
An inclusion promise for this log entry, if present.

Internally, this is a base64-encoded Signed Entry Timestamp (SET) for this
log entry.
"""

def __post_init__(self) -> None:
"""
Invariant preservation.
"""
if self.inclusion_proof is None and self.inclusion_promise is None:
raise ValueError("Log entry must have either inclusion proof or promise")

@classmethod
def _from_response(cls, dict_: dict[str, Any]) -> LogEntry:
"""
Expand Down Expand Up @@ -118,41 +173,3 @@ def encode_canonical(self) -> bytes:
}

return encode_canonical(payload).encode() # type: ignore


class LogInclusionProof(BaseModel):
"""
Represents an inclusion proof for a transparency log entry.
"""

checkpoint: StrictStr = Field(..., alias="checkpoint")
hashes: List[StrictStr] = Field(..., alias="hashes")
log_index: StrictInt = Field(..., alias="logIndex")
root_hash: StrictStr = Field(..., alias="rootHash")
tree_size: StrictInt = Field(..., alias="treeSize")

class Config:
allow_population_by_field_name = True

@validator("log_index")
def _log_index_positive(cls, v: int) -> int:
if v < 0:
raise ValueError(f"Inclusion proof has invalid log index: {v} < 0")
return v

@validator("tree_size")
def _tree_size_positive(cls, v: int) -> int:
if v < 0:
raise ValueError(f"Inclusion proof has invalid tree size: {v} < 0")
return v

@validator("tree_size")
def _log_index_within_tree_size(
cls, v: int, values: Dict[str, Any], **kwargs: Any
) -> int:
if "log_index" in values and v <= values["log_index"]:
raise ValueError(
"Inclusion proof has log index greater than or equal to tree size: "
f"{v} <= {values['log_index']}"
)
return v
62 changes: 47 additions & 15 deletions sigstore/verify/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
)
from pydantic import BaseModel
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import Bundle
from sigstore_protobuf_specs.dev.sigstore.rekor.v1 import (
InclusionPromise,
InclusionProof,
)

from sigstore._internal.rekor import RekorClient
from sigstore._utils import (
Expand All @@ -48,6 +52,13 @@

logger = logging.getLogger(__name__)

_BUNDLE_0_1 = "application/vnd.dev.sigstore.bundle+json;version=0.1"
_BUNDLE_0_2 = "application/vnd.dev.sigstore.bundle+json;version=0.2"
_KNOWN_BUNDLE_TYPES = {
_BUNDLE_0_1,
_BUNDLE_0_2,
}


class VerificationResult(BaseModel):
"""
Expand Down Expand Up @@ -242,6 +253,9 @@ def from_bundle(

Effect: `input_` is consumed as part of construction.
"""
if bundle.media_type not in _KNOWN_BUNDLE_TYPES:
raise InvalidMaterials(f"unsupported bundle format: {bundle.media_type}")

certs = bundle.verification_material.x509_certificate_chain.certificates

if len(certs) == 0:
Expand Down Expand Up @@ -280,22 +294,39 @@ def from_bundle(
)
tlog_entry = tlog_entries[0]

# NOTE: Bundles are not required to include inclusion proofs,
# since offline (or non-gossiped) verification of an inclusion proof is
# only as strong as verification of the inclusion promise, which
# is always provided.
inclusion_proof = tlog_entry.inclusion_proof
parsed_inclusion_proof: LogInclusionProof | None = None
if inclusion_proof:
checkpoint = inclusion_proof.checkpoint

# If the inclusion proof is provided, it must include its
# checkpoint.
if not checkpoint.envelope:
# Handling of inclusion promises and proofs varies between bundle
# format versions:
#
# * For 0.1, an inclusion promise is required; the client
# MUST verify the inclusion promise.
# The inclusion proof is NOT required. If provided, it might NOT
# contain a checkpoint; in this case, we ignore it (since it's
# useless without one).
#
# * For 0.2, an inclusion proof is required; the client MUST
# verify the inclusion proof. The inclusion prof MUST contain
# a checkpoint.
# The inclusion promise is NOT required; if present, the client
# SHOULD verify it.

inclusion_promise: InclusionPromise | None = tlog_entry.inclusion_promise
inclusion_proof: InclusionProof | None = tlog_entry.inclusion_proof
if bundle.media_type == _BUNDLE_0_1:
if not inclusion_promise:
raise InvalidMaterials("bundle must contain an inclusion promise")
elif bundle.media_type == _BUNDLE_0_2:
if not inclusion_proof:
raise InvalidMaterials("bundle must contain an inclusion proof")
if not inclusion_proof.checkpoint.envelope:
raise InvalidMaterials("expected checkpoint in inclusion proof")

parsed_inclusion_proof: InclusionProof | None = None
if (
inclusion_proof is not None
and inclusion_proof.checkpoint.envelope is not None
):
parsed_inclusion_proof = LogInclusionProof(
checkpoint=checkpoint.envelope,
checkpoint=inclusion_proof.checkpoint.envelope,
hashes=[h.hex() for h in inclusion_proof.hashes],
log_index=inclusion_proof.log_index,
root_hash=inclusion_proof.root_hash.hex(),
Expand Down Expand Up @@ -336,7 +367,7 @@ def has_rekor_entry(self) -> bool:

def rekor_entry(self, client: RekorClient) -> LogEntry:
"""
Returns a `RekorEntry` for the current signing materials.
Returns a `LogEntry` for the current signing materials.
"""

# The Rekor entry we use depends on a few different states:
Expand All @@ -349,7 +380,8 @@ def rekor_entry(self, client: RekorClient) -> LogEntry:
offline = self._offline
has_rekor_entry = self.has_rekor_entry
has_inclusion_proof = (
self.has_rekor_entry and self._rekor_entry.inclusion_proof is not None # type: ignore
self.has_rekor_entry
and self._rekor_entry.inclusion_proof is not None # type: ignore
)

entry: LogEntry | None
Expand Down
Loading