diff --git a/gcp_pilot/iam.py b/gcp_pilot/iam.py index ff21623..7ff0b0d 100644 --- a/gcp_pilot/iam.py +++ b/gcp_pilot/iam.py @@ -1,7 +1,7 @@ # More Information: import base64 import json -from datetime import datetime +from datetime import UTC, datetime, timedelta from typing import Any, Generator import requests @@ -15,6 +15,9 @@ KeyType = dict[str, Any] +IDP_JWT_AUDIENCE = "https://identitytoolkit.googleapis.com/google.identity.identitytoolkit.v1.IdentityToolkit" + + class IdentityAccessManager(AccountManagerMixin, DiscoveryMixin, GoogleCloudPilotAPI): def __init__(self, **kwargs): super().__init__( @@ -228,6 +231,41 @@ def decode_jwt(cls, token: str, issuer_email: str, audience: str | None, verify: ) ) + @classmethod + def decode_id_token(cls, token: str, issuer_email: str, verify: bool = True) -> dict[str, Any]: + return cls.decode_jwt( + token=token, + issuer_email=issuer_email, + audience=IDP_JWT_AUDIENCE, + verify=verify, + ) + + def generate_custom_token( + self, + uid: str, + expires_in_seconds: int, + tenant_id: str | None = None, + auth_email: str | None = None, + claims: dict | None = None, + ) -> str: + authenticator_email = auth_email or self.service_account_email + payload = { + "iat": datetime.now(tz=UTC).timestamp(), + "exp": (datetime.now(tz=UTC) + timedelta(seconds=expires_in_seconds)).timestamp(), + "aud": IDP_JWT_AUDIENCE, + "iss": authenticator_email, + "sub": authenticator_email, + "email": authenticator_email, + "uid": uid, + "tenant_id": tenant_id, + "claims": claims or {}, + } + + return self.encode_jwt( + payload=payload, + service_account_email=authenticator_email, + ) + @classmethod def _fetch_public_certs(cls, email: str) -> dict: url = f"https://www.googleapis.com/robot/v1/metadata/x509/{email}"