Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sigstore, test: break apart DSSE/artifact sign APIs #956

Merged
merged 12 commits into from
Apr 12, 2024
26 changes: 9 additions & 17 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,12 @@ All versions prior to 0.9.0 are untracked.

### Added

* API: `Signer.sign()` can now take a `Hashed` as an input,
performing a signature on a pre-computed hash value
([#860](https://github.com/sigstore/sigstore-python/pull/860))
* API: `Signer.sign_artifact()` has been added, replacing the removed
`Signer.sign()` API

* API: `Signer.sign()` can now take an in-toto `Statement` as an input,
producing a DSSE-formatted signature rather than a "bare" signature
([#804](https://github.com/sigstore/sigstore-python/pull/804))

* API: `SigningResult.content` has been added, representing either the
`hashedrekord` entry's message signature or the `dsse` entry's envelope
([#804](https://github.com/sigstore/sigstore-python/pull/804))
* API: `Signer.sign_intoto()` has been added. It takes an in-toto `Statement`
as an input, producing a DSSE-formatted signature rather than a "bare"
signature ([#804](https://github.com/sigstore/sigstore-python/pull/804))

* API: "v3" Sigstore bundles are now supported during verification
([#901](https://github.com/sigstore/sigstore-python/pull/901))
Expand All @@ -41,15 +36,16 @@ All versions prior to 0.9.0 are untracked.
* **BREAKING API CHANGE**: `VerificationMaterials` has been removed.
The public verification APIs now accept `sigstore.verify.models.Bundle`.

* **BREAKING API CHANGE**: `Signer.sign(...)` has been removed. Use
either `sign_artifact(...)` or `sign_intoto(...)`, depending on whether
you're signing opaque bytes or an in-toto statement.

* **BREAKING API CHANGE**: `VerificationResult` has been removed.
The public verification and policy APIs now raise
`sigstore.errors.VerificationError` on failure.

### Changed

* **BREAKING API CHANGE**: The `Signer.sign(...)` API now returns a `sigstore.verify.models.Bundle`,
instead of a `SigningResult` ([#862](https://github.com/sigstore/sigstore-python/pull/862))

* **BREAKING API CHANGE**: `Verifier.verify(...)` now takes a `bytes | Hashed`
as its verification input, rather than implicitly receiving the input through
the `VerificationMaterials` parameter
Expand All @@ -59,10 +55,6 @@ All versions prior to 0.9.0 are untracked.
a `Hashed` parameter to convey the digest used for Rekor entry lookup
([#904](https://github.com/sigstore/sigstore-python/pull/904))

* **BREAKING API CHANGE**: `Signer.sign(...)` now takes a `bytes` instead of
an `IO[bytes]` for input. Other input types (such as `Hashed` and
`Statement`) are unchanged ([#921](https://github.com/sigstore/sigstore-python/pull/921))

* **BREAKING API CHANGE**: `Verifier.verify(...)` now takes a `sigstore.verify.models.Bundle`,
instead of a `VerificationMaterials` ([#937](https://github.com/sigstore/sigstore-python/pull/937))

Expand Down
2 changes: 1 addition & 1 deletion sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def _sign(args: argparse.Namespace) -> None:
# digest and sign the prehash rather than buffering it fully.
digest = sha256_digest(io)
try:
result = signer.sign(input_=digest)
result = signer.sign_artifact(input_=digest)
except ExpiredIdentity as exp_identity:
print("Signature failed: identity token has expired")
raise exp_identity
Expand Down
194 changes: 111 additions & 83 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

signing_ctx = SigningContext.production()
with signing_ctx.signer(identity, cache=True) as signer:
result = signer.sign(artifact)
result = signer.sign_artifact(artifact)
print(result)
```
"""
Expand All @@ -58,7 +58,6 @@
from sigstore import hashes as sigstore_hashes
from sigstore._internal.fulcio import (
ExpiredCertificate,
FulcioCertificateSigningResponse,
FulcioClient,
)
from sigstore._internal.rekor.client import RekorClient
Expand Down Expand Up @@ -98,14 +97,12 @@ def __init__(
self._identity_token = identity_token
self._signing_ctx: SigningContext = signing_ctx
self.__cached_private_key: Optional[ec.EllipticCurvePrivateKey] = None
self.__cached_signing_certificate: Optional[
FulcioCertificateSigningResponse
] = None
self.__cached_signing_certificate: Optional[x509.Certificate] = None
if cache:
_logger.debug("Generating ephemeral keys...")
self.__cached_private_key = ec.generate_private_key(ec.SECP256R1())
_logger.debug("Requesting ephemeral certificate...")
self.__cached_signing_certificate = self._signing_cert(self._private_key)
self.__cached_signing_certificate = self._signing_cert()

@property
def _private_key(self) -> ec.EllipticCurvePrivateKey:
Expand All @@ -117,12 +114,22 @@ def _private_key(self) -> ec.EllipticCurvePrivateKey:

def _signing_cert(
self,
private_key: ec.EllipticCurvePrivateKey,
) -> FulcioCertificateSigningResponse:
"""Get or request a signing certificate from Fulcio."""
) -> x509.Certificate:
"""
Get or request a signing certificate from Fulcio.

Internally, this performs a CSR against Fulcio and verifies that
the returned certificate is present in Fulcio's CT log.
"""

# Our CSR cannot possibly succeed if our underlying identity token
# is expired.
if not self._identity_token.in_validity_period():
raise ExpiredIdentity

# If it exists, verify if the current certificate is expired
if self.__cached_signing_certificate:
not_valid_after = self.__cached_signing_certificate.cert.not_valid_after_utc
not_valid_after = self.__cached_signing_certificate.not_valid_after_utc
if datetime.now(timezone.utc) > not_valid_after:
raise ExpiredCertificate
return self.__cached_signing_certificate
Expand All @@ -147,110 +154,131 @@ def _signing_cert(
critical=True,
)
)
certificate_request = builder.sign(private_key, hashes.SHA256())
certificate_request = builder.sign(self._private_key, hashes.SHA256())

certificate_response = self._signing_ctx._fulcio.signing_cert.post(
certificate_request, self._identity_token
)

return certificate_response
# Verify the SCT
sct = certificate_response.sct
cert = certificate_response.cert
chain = certificate_response.chain

verify_sct(sct, cert, chain, self._signing_ctx._trusted_root.ct_keyring())

def sign(
_logger.debug("Successfully verified SCT...")

return cert

def _finalize_sign(
self,
input_: bytes | dsse.Statement | sigstore_hashes.Hashed,
cert: x509.Certificate,
content: MessageSignature | dsse.Envelope,
proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse,
) -> Bundle:
"""
Sign an input, and return a `Bundle` corresponding to the signed result.
Perform the common "finalizing" steps in a Sigstore signing flow.
"""
# Submit the proposed entry to the transparency log
entry = self._signing_ctx._rekor.log.entries.post(proposed_entry)

The input can be one of three forms:
_logger.debug(f"Transparency log entry created with index: {entry.log_index}")

1. A `bytes` buffer;
2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs
that are too large to buffer into memory);
3. An in-toto `Statement` object.
return Bundle._from_parts(cert, content, entry)

In cases (1) and (2), the signing operation will produce a `hashedrekord`
entry within the bundle. In case (3), the signing operation will produce
a DSSE envelope and corresponding `dsse` entry within the bundle.
def sign_intoto(
self,
input_: dsse.Statement,
) -> Bundle:
"""
private_key = self._private_key
Sign the given in-toto statement, and return a `Bundle` containing
the signed result.

if not self._identity_token.in_validity_period():
raise ExpiredIdentity
This API is **only** for in-toto statements; to sign arbitrary artifacts,
use `sign_artifact` instead.
"""
cert = self._signing_cert()

try:
certificate_response = self._signing_cert(private_key)
except ExpiredCertificate as e:
raise e
# Prepare inputs
b64_cert = base64.b64encode(
cert.public_bytes(encoding=serialization.Encoding.PEM)
)

# Verify the SCT
sct = certificate_response.sct
cert = certificate_response.cert
chain = certificate_response.chain
# Sign the statement, producing a DSSE envelope
content = dsse._sign(self._private_key, input_)

verify_sct(sct, cert, chain, self._signing_ctx._trusted_root.ct_keyring())
# Create the proposed DSSE log entry
proposed_entry = rekor_types.Dsse(
spec=rekor_types.dsse.DsseV001Schema(
proposed_content=rekor_types.dsse.ProposedContent(
envelope=content.to_json(),
verifiers=[b64_cert.decode()],
),
),
)

_logger.debug("Successfully verified SCT...")
return self._finalize_sign(cert, content, proposed_entry)

def sign_artifact(
self,
input_: bytes | sigstore_hashes.Hashed,
) -> Bundle:
"""
Sign an artifact, and return a `Bundle` corresponding to the signed result.

The input can be one of two forms:

1. A `bytes` buffer;
2. A `Hashed` object, containing a pre-hashed input (e.g., for inputs
that are too large to buffer into memory).

Regardless of the input format, the signing operation will produce a
`hashedrekord` entry within the bundle. No other entry types
are supported by this API.
"""

cert = self._signing_cert()

# Prepare inputs
b64_cert = base64.b64encode(
cert.public_bytes(encoding=serialization.Encoding.PEM)
)

# Sign artifact
content: MessageSignature | dsse.Envelope
proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse
if isinstance(input_, dsse.Statement):
content = dsse._sign(private_key, input_)

# Create the proposed DSSE entry
proposed_entry = rekor_types.Dsse(
spec=rekor_types.dsse.DsseV001Schema(
proposed_content=rekor_types.dsse.ProposedContent(
envelope=content.to_json(),
verifiers=[b64_cert.decode()],
),
),
)
else:
hashed_input = sha256_digest(input_)
hashed_input = sha256_digest(input_)

artifact_signature = private_key.sign(
hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed())
)
artifact_signature = self._private_key.sign(
hashed_input.digest, ec.ECDSA(hashed_input._as_prehashed())
)

content = MessageSignature(
message_digest=HashOutput(
algorithm=hashed_input.algorithm,
digest=hashed_input.digest,
),
signature=artifact_signature,
)
content = MessageSignature(
message_digest=HashOutput(
algorithm=hashed_input.algorithm,
digest=hashed_input.digest,
),
signature=artifact_signature,
)

# Create the proposed hashedrekord entry
proposed_entry = rekor_types.Hashedrekord(
spec=rekor_types.hashedrekord.HashedrekordV001Schema(
signature=rekor_types.hashedrekord.Signature(
content=base64.b64encode(artifact_signature).decode(),
public_key=rekor_types.hashedrekord.PublicKey(
content=b64_cert.decode()
),
),
data=rekor_types.hashedrekord.Data(
hash=rekor_types.hashedrekord.Hash(
algorithm=hashed_input._as_hashedrekord_algorithm(),
value=hashed_input.digest.hex(),
)
# Create the proposed hashedrekord entry
proposed_entry = rekor_types.Hashedrekord(
spec=rekor_types.hashedrekord.HashedrekordV001Schema(
signature=rekor_types.hashedrekord.Signature(
content=base64.b64encode(artifact_signature).decode(),
public_key=rekor_types.hashedrekord.PublicKey(
content=b64_cert.decode()
),
),
)

# Submit the proposed entry to the transparency log
entry = self._signing_ctx._rekor.log.entries.post(proposed_entry)

_logger.debug(f"Transparency log entry created with index: {entry.log_index}")
data=rekor_types.hashedrekord.Data(
hash=rekor_types.hashedrekord.Hash(
algorithm=hashed_input._as_hashedrekord_algorithm(),
value=hashed_input.digest.hex(),
)
),
),
)

return Bundle._from_parts(cert, content, entry)
return self._finalize_sign(cert, content, proposed_entry)


class SigningContext:
Expand Down
12 changes: 6 additions & 6 deletions test/unit/test_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_sign_rekor_entry_consistent(signer_and_ident):

payload = secrets.token_bytes(32)
with ctx.signer(identity) as signer:
expected_entry = signer.sign(payload).log_entry
expected_entry = signer.sign_artifact(payload).log_entry

actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index)

Expand All @@ -74,7 +74,7 @@ def test_sct_verify_keyring_lookup_error(signer_and_ident, monkeypatch):
payload = secrets.token_bytes(32)
with pytest.raises(VerificationError, match=r"SCT verify failed:"):
with ctx.signer(identity) as signer:
signer.sign(payload)
signer.sign_artifact(payload)


@pytest.mark.online
Expand All @@ -95,7 +95,7 @@ def test_sct_verify_keyring_error(signer_and_ident, monkeypatch):

with pytest.raises(VerificationError):
with ctx.signer(identity) as signer:
signer.sign(payload)
signer.sign_artifact(payload)


@pytest.mark.online
Expand All @@ -112,7 +112,7 @@ def test_identity_proof_claim_lookup(signer_and_ident, monkeypatch):
payload = secrets.token_bytes(32)

with ctx.signer(identity) as signer:
expected_entry = signer.sign(payload).log_entry
expected_entry = signer.sign_artifact(payload).log_entry
actual_entry = ctx._rekor.log.entries.get(log_index=expected_entry.log_index)

assert expected_entry.body == actual_entry.body
Expand All @@ -135,7 +135,7 @@ def test_sign_prehashed(staging):
)

with sign_ctx.signer(identity) as signer:
bundle = signer.sign(hashed)
bundle = signer.sign_artifact(hashed)

assert bundle._inner.message_signature.message_digest.algorithm == hashed.algorithm
assert bundle._inner.message_signature.message_digest.digest == hashed.digest
Expand Down Expand Up @@ -167,6 +167,6 @@ def test_sign_dsse(staging):
).build()

with ctx.signer(identity) as signer:
bundle = signer.sign(stmt)
bundle = signer.sign_intoto(stmt)
# Ensures that all of our inner types serialize as expected.
bundle.to_json()
Loading