diff --git a/doc/README-GPG.md b/doc/README-GPG.md index 3542ec48..6d27be80 100644 --- a/doc/README-GPG.md +++ b/doc/README-GPG.md @@ -18,7 +18,8 @@ Thanks! Run ``` - $ (trezor|keepkey|ledger|jade|onlykey)-gpg init "Roman Zeyde " + $ (trezor|keepkey|ledger|jade|onlykey)-gpg init + $ (trezor|keepkey|ledger|jade|onlykey)-gpg add -d "Roman Zeyde " ``` Follow the instructions provided to complete the setup. Keep note of the timestamp value which you'll need if you want to regenerate the key later. @@ -137,13 +138,14 @@ $ gpg2 --export 'john@doe.bit' | gpg2 --list-packets | grep created | head -n1 After your main identity is created, you can add new user IDs using the regular GnuPG commands: ``` -$ trezor-gpg init "Foobar" -vv +$ trezor-gpg init +$ trezor-gpg add -d "Foobar" -vv $ export GNUPGHOME=${HOME}/.gnupg/trezor $ gpg2 -K ------------------------------------------ -sec nistp256/6275E7DA 2017-12-05 [SC] +sec nistp256/6275E7DA 1970-01-01 [SC] uid [ultimate] Foobar -ssb nistp256/35F58F26 2017-12-05 [E] +ssb nistp256/35F58F26 1970-01-01 [E] $ gpg2 --edit Foobar gpg> adduid @@ -159,10 +161,24 @@ gpg> save $ gpg2 -K ------------------------------------------ -sec nistp256/6275E7DA 2017-12-05 [SC] +sec nistp256/6275E7DA 1970-01-01 [SC] uid [ultimate] Xyzzy uid [ultimate] Foobar -ssb nistp256/35F58F26 2017-12-05 [E] +ssb nistp256/35F58F26 1970-01-01 [E] +``` + +This adds new user IDs to the same key. You can also add a new key using the `add` command: +``` +$ trezor-gpg add "Xyzzy" -vv +$ gpg2 -K +------------------------------------------ +sec nistp256/6275E7DA 1970-01-01 [SC] +uid [ultimate] Foobar +ssb nistp256/35F58F26 1970-01-01 [E] + +sec nistp256/BE61C208 1970-01-01 [SC] +uid [ultimate] Xyzzy +ssb nistp256/65088366 1970-01-01 [E] ``` ### Generate GnuPG subkeys @@ -173,7 +189,17 @@ pub rsa2048/90C4064B 2017-10-10 [SC] uid [ultimate] foobar sub rsa2048/4DD05FF0 2017-10-10 [E] -$ trezor-gpg init "foobar" --subkey +$ trezor-gpg add "foobar" --subkey +``` + +If you have already set the new folder as your default profile, and you want to add the subkey to an existing GnuPG from a previous (e.g. non-hardware) profile, you can specify the previous profile location using `--primary-homedir`: +``` +$ gpg2 -k foobar --homedir ~/.gnupg +pub rsa2048/90C4064B 2017-10-10 [SC] +uid [ultimate] foobar +sub rsa2048/4DD05FF0 2017-10-10 [E] + +$ trezor-gpg add "foobar" --subkey --primary-homedir ~/.gnupg ``` [![asciicast](https://asciinema.org/a/Ick5G724zrZRFsGY7ZUdFSnV1.png)](https://asciinema.org/a/Ick5G724zrZRFsGY7ZUdFSnV1) diff --git a/doc/README-Windows.md b/doc/README-Windows.md index 28d08414..8c6ada63 100644 --- a/doc/README-Windows.md +++ b/doc/README-Windows.md @@ -56,11 +56,19 @@ git clone https://github.com/romanz/trezor-agent.git Build and install the library: ``` +pip install ./trezor-agent +``` +If you want to be able to edit it without having to rebuild, use this command instead: +``` pip install -e trezor-agent ``` Build and install the agent of your choice: ``` +pip install ./trezor-agent/agents/ +``` +If you want to be able to edit it without having to rebuild, use this command instead: +``` pip install -e trezor-agent/agents/ ``` @@ -166,7 +174,8 @@ choco install gpg4win You must first create a signing identity: ``` --gpg init -e ed25519 "My Full Name " +-gpg init +-gpg add -d -e ed25519 "My Full Name " ``` You will be asked for confirmation on your device **twice**. diff --git a/libagent/gpg/__init__.py b/libagent/gpg/__init__.py index 281890b4..8b4782f9 100644 --- a/libagent/gpg/__init__.py +++ b/libagent/gpg/__init__.py @@ -28,65 +28,92 @@ import semver from .. import device, formats, server, util -from . import agent, client, encode, keyring, protocol +from . import agent, client, decode, encode, keyring, keystore, protocol log = logging.getLogger(__name__) -async def export_public_key(device_type, args): +async def export_public_key(device_type, homedir, args): """Generate a new pubkey for a new/existing GPG identity.""" + # pylint: disable=too-many-branches log.warning('NOTE: in order to re-generate the exact same GPG key later, ' 'run this command with "--time=%d" commandline flag (to set ' 'the timestamp of the GPG key manually).', args.time) async with await device.ui.UI.create(device_type=device_type, config=vars(args)) as ui: c = client.Client(ui=ui) - identity = client.create_identity(user_id=args.user_id, - curve_name=args.ecdsa_curve) - verifying_key = await c.pubkey(identity=identity, ecdh=False) - decryption_key = await c.pubkey(identity=identity, ecdh=True) - signer_func = functools.partial(c.sign, identity=identity) + if args.derivation_path: + user_id = args.derivation_path + else: + user_id = args.user_id fingerprints = [] + result = None if args.subkey: # add as subkey - log.info('adding %s GPG subkey for "%s" to existing key', - args.ecdsa_curve, args.user_id) - # subkey for signing - signing_key = protocol.PublicKey( - curve_name=args.ecdsa_curve, created=args.time, - verifying_key=verifying_key, ecdh=False) - fingerprints.append(util.hexlify(signing_key.fingerprint())) - # subkey for encryption - encryption_key = protocol.PublicKey( - curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve), - created=args.time, verifying_key=decryption_key, ecdh=True) - fingerprints.append(util.hexlify(encryption_key.fingerprint())) - primary_bytes = await keyring.export_public_key(args.user_id) - result = await encode.create_subkey(primary_bytes=primary_bytes, - subkey=signing_key, - signer_func=signer_func) - result = await encode.create_subkey(primary_bytes=result, - subkey=encryption_key, - signer_func=signer_func) - else: # add as primary - log.info('creating new %s GPG primary key for "%s"', - args.ecdsa_curve, args.user_id) - # primary key for signing - primary = protocol.PublicKey( - curve_name=args.ecdsa_curve, created=args.time, - verifying_key=verifying_key, ecdh=False) - fingerprints.append(util.hexlify(primary.fingerprint())) - # subkey for encryption - subkey = protocol.PublicKey( - curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve), - created=args.time, verifying_key=decryption_key, ecdh=True) - fingerprints.append(util.hexlify(subkey.fingerprint())) - - result = await encode.create_primary(user_id=args.user_id, - pubkey=primary, - signer_func=signer_func) + sign_identity = None + try: + if args.primary_homedir is None: + result = await keyring.export_public_key(args.user_id) + # Check if the key was generated with this device + sign_identity = await decode.identity_for_key(c, result, + os.environ['GNUPGHOME']) + else: + result = await keyring.export_public_key(args.user_id, + env={'GNUPGHOME': + args.primary_homedir}) + # Check if the key was generated with this device + sign_identity = await decode.identity_for_key(c, result, args.primary_homedir) + if sign_identity is None: + if args.primary_homedir is None: + signer_func = await keyring.create_agent_signer( + next(decode.iter_keygrips(result)), + env=os.environ) + else: + signer_func = await keyring.create_agent_signer( + next(decode.iter_keygrips(result)), + env={'GNUPGHOME': args.primary_homedir}) + else: + signer_func = functools.partial(c.sign, identity=sign_identity) + except Exception: # pylint: disable=broad-except + log.warning('Could not find a primary key matching the specified user id. ' + 'Creating a new primary key instead of a subkey') + + if result is None: + identity = client.create_identity(user_id=user_id, + curve_name=args.ecdsa_curve_name) + # No external + signer_func = functools.partial(c.sign, identity=identity) + + if result is None or not args.no_sign: # Signing or certification key + pubkey = await keystore.store_key(c, user_id, args.ecdsa_curve_name, + False, args.time, homedir) + fingerprints.append(util.hexlify(pubkey.fingerprint())) + if result is None: + result = await encode.create_primary(user_id=args.user_id, + pubkey=pubkey, + signer_func=signer_func, + flags=1 if args.no_sign else 3) + else: + result = await encode.create_subkey(primary_bytes=result, + subkey=pubkey, + signer_func=signer_func, + flags=2) + + if args.encrypt != 'none': # Encryption key + if args.encrypt == 'communications': + flags = 4 + elif args.encrypt == 'storage': + flags = 8 + else: + flags = 12 + pubkey = await keystore.store_key(c, user_id, + formats.get_ecdh_curve_name(args.ecdsa_curve_name), + True, args.time, homedir) + fingerprints.append(util.hexlify(pubkey.fingerprint())) + assert result is not None result = await encode.create_subkey(primary_bytes=result, - subkey=subkey, - signer_func=signer_func) + subkey=pubkey, + signer_func=signer_func, + flags=flags) return (fingerprints, protocol.armor(result, 'PUBLIC KEY BLOCK')) @@ -130,9 +157,6 @@ async def run_init(device_type, args): 'remove it manually if required', homedir) sys.exit(1) - # Prepare the key before making any changes - fingerprints, public_key_bytes = await export_public_key(device_type, args) - await trio.Path(homedir).mkdir(mode=0o700, parents=True, exist_ok=True) agent_path = await util.which('{}-gpg-agent'.format(device_name)) @@ -163,11 +187,12 @@ async def run_init(device_type, args): # Prepare GPG configuration file async with await trio.open_file(os.path.join(homedir, 'gpg.conf'), 'w') as f: + # Do not bother escaping or quoting config parameters. + # _gpgrt_argparse simply reads until EOL. await f.write("""# Hardware-based GPG configuration -agent-program "{0}" +agent-program {0} personal-digest-preferences SHA512 -default-key {1} -""".format(util.escape_cmd_quotes(run_agent_script), fingerprints[0])) +""".format(run_agent_script)) # Prepare a helper script for setting up the new identity async with await trio.open_file(os.path.join(homedir, 'env'), 'w') as f: @@ -184,6 +209,37 @@ async def run_init(device_type, args): """.format(homedir)) await trio.Path(f.name).chmod(0o700) + +async def run_add(device_type, args): + """Initialize hardware-based GnuPG identity.""" + util.setup_logging(verbosity=args.verbose) + log.warning('This GPG tool is still in EXPERIMENTAL mode, ' + 'so please note that the API and features may ' + 'change without backwards compatibility!') + + await verify_gpg_version() + + # Add a new hardware-based identity to the GPG home directory + device_name = device_type.package_name().rsplit('-', 1)[0] + log.info('device name: %s', device_name) + homedir = args.homedir + if not homedir: + homedir = os.path.expanduser('~/.gnupg/{}'.format(device_name)) + + log.info('GPG home directory: %s', homedir) + + if not os.path.exists(homedir): + log.error('GPG home directory %s is missing, ' + 'use %s-gpg init first', homedir, device_name) + sys.exit(1) + + # Prepare the keys + fingerprints, public_key_bytes = await export_public_key(device_type, homedir, args) + + if not fingerprints: + log.warning('No keys created') + sys.exit(1) + # Generate new GPG identity and import into GPG keyring verbosity = ('-' + ('v' * args.verbose)) if args.verbose else '--quiet' await check_call(await keyring.gpg_command(['--homedir', homedir, verbosity, @@ -195,9 +251,11 @@ async def run_init(device_type, args): '--import-ownertrust']), input_bytes=(fingerprints[0] + ':6\n').encode()) - # Load agent and make sure it responds with the new identity - await check_call(await keyring.gpg_command(['--homedir', homedir, - '--list-secret-keys', args.user_id])) + if args.default: + # Make new key the default key + await check_call([await util.which('gpgconf'), '--homedir', homedir, + '--change-options', 'gpg'], + input_bytes=('default-key:0:"' + fingerprints[0]).encode()) async def run_unlock(device_type, args): @@ -226,18 +284,16 @@ def run_agent(device_type): p = argparse.ArgumentParser() p.add_argument('--homedir', default=os.environ.get('GNUPGHOME')) p.add_argument('-v', '--verbose', default=0, action='count') - p.add_argument('--server', default=False, action='store_true', - help='Use stdin/stdout for communication with GPG.') if daemon: p.add_argument('--daemon', default=False, action='store_true', - help='Daemonize the agent.') + help='daemonize the agent') p.add_argument('--pin-entry-binary', type=str, default=argparse.SUPPRESS, - help='Path to PIN entry UI helper.') + help='path to PIN entry UI helper') p.add_argument('--passphrase-entry-binary', type=str, default=argparse.SUPPRESS, - help='Path to passphrase entry UI helper.') + help='path to passphrase entry UI helper') p.add_argument('--cache-expiry-seconds', type=float, default=argparse.SUPPRESS, - help='Expire passphrase from cache after this duration.') + help='expire passphrase from cache after this duration') args, _ = p.parse_known_args() @@ -248,10 +304,10 @@ def run_agent(device_type): trio.run(run_agent_internal, args, device_type) -async def handle_connection(conn, ui, pubkey_bytes, quit_event): +async def handle_connection(conn, ui, homedir, quit_event): """Handle a single connection to the agent.""" try: - await agent.Handler(ui=ui, pubkey_bytes=pubkey_bytes).handle(conn) + await agent.Handler(ui=ui, homedir=homedir).handle(conn) except agent.AgentStop: log.info('stopping gpg-agent') quit_event.set() @@ -275,7 +331,6 @@ async def run_agent_internal(args, device_type): log.debug('pid: %d, parent pid: %d', os.getpid(), os.getppid()) try: env = {'GNUPGHOME': args.homedir, 'PATH': os.environ['PATH']} - pubkey_bytes = await keyring.export_public_keys(env=env) async with await device.ui.UI.create(device_type=device_type, config=vars(args)) as ui: sock_server = await _server_from_assuan_fd(os.environ) if sock_server is None: @@ -285,7 +340,7 @@ async def run_agent_internal(args, device_type): quit_event = trio.Event() handle_conn = functools.partial(handle_connection, ui=ui, - pubkey_bytes=pubkey_bytes, + homedir=args.homedir, quit_event=quit_event) try: await server.server_thread(sock, handle_conn, quit_event) @@ -314,25 +369,53 @@ def main(device_type): subparsers.required = True p = subparsers.add_parser('init', - help='initialize hardware-based GnuPG identity') - p.add_argument('user_id') - p.add_argument('-e', '--ecdsa-curve', default='nist256p1') - p.add_argument('-t', '--time', type=int, default=0) + help='initialize a hardware-based GnuPG home directory') p.add_argument('-v', '--verbose', default=0, action='count') - p.add_argument('-s', '--subkey', default=False, action='store_true') p.add_argument('--homedir', type=str, default=os.environ.get('GNUPGHOME'), - help='Customize GnuPG home directory for the new identity.') + help='GnuPG home directory to create') p.add_argument('--pin-entry-binary', type=str, default=argparse.SUPPRESS, - help='Path to PIN entry UI helper.') + help='path to PIN entry UI helper') p.add_argument('--passphrase-entry-binary', type=str, default=argparse.SUPPRESS, - help='Path to passphrase entry UI helper.') + help='path to passphrase entry UI helper') p.add_argument('--cache-expiry-seconds', type=float, default=argparse.SUPPRESS, - help='Expire passphrase from cache after this duration.') + help='expire passphrase from cache after this duration') p.set_defaults(func=run_init) + p = subparsers.add_parser('add', + help='add a hardware-based GnuPG identity or subkey to the profile') + p.add_argument('user_id') + p.add_argument('-e', '--ecdsa-curve-name', default='nist256p1', + choices=sorted(formats.SUPPORTED_CURVES), + help='specify curve name') + p.add_argument('-t', '--time', type=int, default=0, + help='set key creation time. This will modify the key\'s fingerprint, ' + 'but not the associated private key') + p.add_argument('-v', '--verbose', default=0, action='count') + p.add_argument('-d', '--default', default=False, action='store_true', + help='sets the newly created key as the default key for the profile') + p.add_argument('--derivation-path', default=None, + help='custom derivation path for the key. If not specified, ' + 'the user id is used') + p.add_argument('-s', '--subkey', default=False, action='store_true', + help='create a subkey instead of a primary key') + p.add_argument('--primary-homedir', default=None, + help='home directory in which the primary is stored, if creating a subkey. ' + 'Useful for keeping subkey and primary in separate profiles') + p.add_argument('--no-sign', default=False, action='store_true', + help='do not create a signing key. ' + 'If creating a primary key, it will be set to certify-only') + p.add_argument('--encrypt', default='any', choices=['none', 'any', 'communications', 'storage'], + help='select allowed encryption usage for the key. ' + 'If set to none, an encryption key will not be created') + + p.add_argument('--homedir', type=str, default=os.environ.get('GNUPGHOME'), + help='customize GnuPG home directory for the new identity') + + p.set_defaults(func=run_add) + p = subparsers.add_parser('unlock', help='unlock the hardware device') p.add_argument('-v', '--verbose', default=0, action='count') p.set_defaults(func=run_unlock) diff --git a/libagent/gpg/agent.py b/libagent/gpg/agent.py index 66d54ad4..5e373bb6 100644 --- a/libagent/gpg/agent.py +++ b/libagent/gpg/agent.py @@ -3,7 +3,7 @@ import logging from .. import util -from . import client, decode, keyring, protocol +from . import client, keyring, keystore log = logging.getLogger(__name__) @@ -36,17 +36,6 @@ def parse_ecdh(line): return dict(items)[b'e'] -async def _key_info(conn, keygrip, *_): - """ - Dummy reply (mainly for 'gpg --edit' to succeed). - - For details, see GnuPG agent KEYINFO command help. - https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082 - """ - fmt = 'S KEYINFO {0} X - - - - - - -' - await keyring.sendline(conn, fmt.format(keygrip).encode('ascii')) - - class AgentError(Exception): """GnuPG agent-related error.""" @@ -62,7 +51,7 @@ class Handler: def _get_options(self): return self.options - def __init__(self, ui, pubkey_bytes): + def __init__(self, ui, homedir): """C-tor.""" self.keygrip = None self.digest = None @@ -70,8 +59,7 @@ def __init__(self, ui, pubkey_bytes): self.options = [] self.ui = ui self.client = client.Client(ui=ui, options_getter=self._get_options) - # Cache public keys from GnuPG - self.pubkey_bytes = pubkey_bytes + self.homedir = homedir self.handlers = { b'RESET': self.reset, @@ -86,7 +74,8 @@ def __init__(self, ui, pubkey_bytes): b'PKSIGN': self.pksign, b'PKDECRYPT': self.pkdecrypt, b'HAVEKEY': self.have_key, - b'KEYINFO': _key_info, + b'DELETE_KEY': self.delete_key, + b'KEYINFO': self.key_info, b'SCD': self.handle_scd, b'GET_PASSPHRASE': self.handle_get_passphrase, } @@ -146,34 +135,19 @@ async def handle_scd(self, conn, *args): raise AgentError(b'ERR 100696144 No such device ') await keyring.sendline(conn, b'D ' + reply) - @util.memoize_method # global cache for key grips async def get_identity(self, keygrip): """ Returns device.interface.Identity that matches specified keygrip. In case of missing keygrip, KeyError will be raised. """ - keygrip_bytes = binascii.unhexlify(keygrip) - pubkey_dict, user_ids = decode.load_by_keygrip( - pubkey_bytes=self.pubkey_bytes, keygrip=keygrip_bytes) - # We assume the first user ID is used to generate TREZOR-based GPG keys. - user_id = user_ids[0]['value'].decode('utf-8') - curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid']) - ecdh = pubkey_dict['algo'] == protocol.ECDH_ALGO_ID - - identity = client.create_identity(user_id=user_id, curve_name=curve_name) - verifying_key = await self.client.pubkey(identity=identity, ecdh=ecdh) - pubkey = protocol.PublicKey( - curve_name=curve_name, created=pubkey_dict['created'], - verifying_key=verifying_key, ecdh=ecdh) - assert pubkey.key_id() == pubkey_dict['key_id'] - assert pubkey.keygrip() == keygrip_bytes - return identity + key = await keystore.load_key(self.client, binascii.unhexlify(keygrip), self.homedir) + return key['identity'] async def pksign(self, conn, *_): """Sign a message digest using a private EC key.""" log.debug('signing %r digest (algo #%s)', self.digest, self.algo) - identity = await self.get_identity(keygrip=self.keygrip) + identity = await self.get_identity(self.keygrip) r, s = await self.client.sign(identity=identity, digest=binascii.unhexlify(self.digest)) result = sig_encode(r, s) @@ -189,16 +163,14 @@ async def pkdecrypt(self, conn, *_): assert await keyring.recvline(conn) == b'END' remote_pubkey = parse_ecdh(line) - identity = await self.get_identity(keygrip=self.keygrip) + identity = await self.get_identity(self.keygrip) ec_point = await self.client.ecdh(identity=identity, pubkey=remote_pubkey) await keyring.sendline(conn, b'D ' + _serialize_point(ec_point)) async def have_key(self, conn, *keygrips): """Check if any keygrip corresponds to a TREZOR-based key.""" if len(keygrips) == 1 and keygrips[0].startswith(b"--list="): - # Support "fast-path" key listing: - # https://dev.gnupg.org/rG40da61b89b62dcb77847dc79eb159e885f52f817 - keygrips = list(decode.iter_keygrips(pubkey_bytes=self.pubkey_bytes)) + keygrips = await keystore.list_keys(self.client, self.homedir) log.debug('keygrips: %r', keygrips) await keyring.sendline(conn, b'D ' + util.assuan_serialize(b''.join(keygrips))) return @@ -212,6 +184,30 @@ async def have_key(self, conn, *keygrips): else: raise AgentError(b'ERR 67108881 No secret key ') + async def delete_key(self, _, *keygrips): + """Remove the specified keys from the key database.""" + for keygrip in keygrips: + try: + if keygrip in ('--force', '--stub'): + continue + await keystore.delete_key(binascii.unhexlify(keygrip), self.homedir) + except IOError as e: + log.warning('DELETE_KEY(%s) failed: %s', keygrip, e) + + async def key_info(self, conn, keygrip, *_): + """ + Dummy reply (mainly for 'gpg --edit' to succeed). + + For details, see GnuPG agent KEYINFO command help. + https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082 + """ + try: + await self.get_identity(keygrip=keygrip) + except KeyError as e: + raise AgentError(b'ERR 67108891 Not found ') from e + fmt = 'S KEYINFO {0} X - - - - - - -' + await keyring.sendline(conn, fmt.format(keygrip.decode('ascii')).encode('ascii')) + async def set_key(self, _conn, keygrip, *_): """Set hexadecimal keygrip for next operation.""" self.keygrip = keygrip diff --git a/libagent/gpg/decode.py b/libagent/gpg/decode.py index 1d03b4b1..2332e12c 100644 --- a/libagent/gpg/decode.py +++ b/libagent/gpg/decode.py @@ -10,7 +10,7 @@ import nacl.signing from .. import util -from . import protocol +from . import keystore, protocol log = logging.getLogger(__name__) @@ -95,12 +95,6 @@ def _parse_embedded_signatures(subpackets): yield _parse_signature(util.Reader(stream)) -def has_custom_subpacket(signature_packet): - """Detect our custom public keys by matching subpacket data.""" - return any(protocol.CUSTOM_KEY_LABEL == subpacket[1:] - for subpacket in signature_packet['unhashed_subpackets']) - - def _parse_signature(stream): """See https://tools.ietf.org/html/rfc4880#section-5.2 for details.""" p = {'type': 'signature'} @@ -166,7 +160,8 @@ def _parse_pubkey(stream, packet_type='pubkey'): p['secret'] = leftover.read() parse_func, keygrip_func = SUPPORTED_CURVES[oid] - keygrip = keygrip_func(parse_func(mpi)) + p['verifying_key'] = parse_func(mpi) + keygrip = keygrip_func(p['verifying_key']) log.debug('keygrip: %s', util.hexlify(keygrip)) p['keygrip'] = keygrip @@ -293,16 +288,6 @@ def _parse_pubkey_packets(pubkey_bytes): return packets_per_pubkey -def load_by_keygrip(pubkey_bytes, keygrip): - """Return public key and first user ID for specified keygrip.""" - for packets in _parse_pubkey_packets(pubkey_bytes): - user_ids = [p for p in packets if p['type'] == 'user_id'] - for p in packets: - if p.get('keygrip') == keygrip: - return p, user_ids - raise KeyError('{} keygrip not found'.format(util.hexlify(keygrip))) - - def iter_keygrips(pubkey_bytes): """Iterate over all keygrips in this pubkey.""" for packets in _parse_pubkey_packets(pubkey_bytes): @@ -312,6 +297,25 @@ def iter_keygrips(pubkey_bytes): yield keygrip +async def identity_for_key(client, pubkey_bytes, homedir): + """Returns the identity used to produce the associated primary key. + + If a key matching the specified public key is not found in the keystore, ``None`` is returned. + """ + packets = parse_packets(io.BytesIO(pubkey_bytes)) + pubkey_dict = next(packets, None) + if pubkey_dict is None or pubkey_dict['type'] != 'pubkey' or 'verifying_key' not in pubkey_dict: + return None + try: + key = await keystore.load_key(client, pubkey_dict['keygrip'], homedir) + except Exception: # pylint: disable=broad-except + return None + # Check that it's the same key + if key['pubkey'].data_to_hash() != pubkey_dict['_to_hash']: + return None + return key['identity'] + + def load_signature(stream, original_data): """Load signature from stream, and compute GPG digest for verification.""" signature, = list(parse_packets((stream))) @@ -326,7 +330,7 @@ def remove_armor(armored_data): """Decode armored data into its binary form.""" stream = io.BytesIO(armored_data) lines = stream.readlines()[3:-1] - data = base64.b64decode(b''.join(lines)) - payload, checksum = data[:-3], data[-3:] + payload = base64.b64decode(b''.join(lines[:-1])) + checksum = base64.b64decode(lines[-1]) assert util.crc24(payload) == checksum return payload diff --git a/libagent/gpg/encode.py b/libagent/gpg/encode.py index 3053a483..f53f60f2 100644 --- a/libagent/gpg/encode.py +++ b/libagent/gpg/encode.py @@ -3,12 +3,12 @@ import logging from .. import util -from . import decode, keyring, protocol +from . import decode, protocol log = logging.getLogger(__name__) -async def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): +async def create_primary(user_id, pubkey, signer_func, flags, secret_bytes=b''): """Export new primary GPG public key, ready for "gpg2 --import".""" pubkey_packet = protocol.packet(tag=(5 if secret_bytes else 6), blob=pubkey.data() + secret_bytes) @@ -21,7 +21,7 @@ async def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): # https://tools.ietf.org/html/rfc4880#section-5.2.3.7 protocol.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256) # https://tools.ietf.org/html/rfc4880#section-5.2.3.4 - protocol.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign) + protocol.subpacket_byte(0x1B, flags), # key flags # https://tools.ietf.org/html/rfc4880#section-5.2.3.21 protocol.subpacket_bytes(0x15, [8, 9, 10]), # preferred hash # https://tools.ietf.org/html/rfc4880#section-5.2.3.8 @@ -32,9 +32,7 @@ async def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): protocol.subpacket_byte(0x1E, 0x01), # advanced features (MDC) # https://tools.ietf.org/html/rfc4880#section-5.2.3.24 ] - unhashed_subpackets = [ - protocol.subpacket(16, pubkey.key_id()), # issuer key id - protocol.CUSTOM_SUBPACKET] + unhashed_subpackets = [protocol.subpacket(16, pubkey.key_id())] # issuer key id signature = await protocol.make_signature( signer_func=signer_func, @@ -48,12 +46,13 @@ async def create_primary(user_id, pubkey, signer_func, secret_bytes=b''): return pubkey_packet + user_id_packet + sign_packet -async def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''): +async def create_subkey(primary_bytes, subkey, signer_func, flags, secret_bytes=b''): """Export new subkey to GPG primary key.""" + # pylint: disable=too-many-arguments subkey_packet = protocol.packet(tag=(7 if secret_bytes else 14), blob=subkey.data() + secret_bytes) - packets = list(decode.parse_packets(io.BytesIO(primary_bytes))) - primary, user_id, signature = packets[:3] + primary = next(decode.parse_packets(io.BytesIO(primary_bytes))) + assert primary['type'] == 'pubkey' data_to_sign = primary['_to_hash'] + subkey.data_to_hash() @@ -75,10 +74,6 @@ async def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''): # Subkey Binding Signature - # Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21 - # (certify & sign) (encrypt) - flags = (2) if (not subkey.ecdh) else (4 | 8) - hashed_subpackets = [ protocol.subpacket_time(subkey.created), # signature time protocol.subpacket_byte(0x1B, flags)] @@ -87,10 +82,6 @@ async def create_subkey(primary_bytes, subkey, signer_func, secret_bytes=b''): unhashed_subpackets.append(protocol.subpacket(16, primary['key_id'])) if embedded_sig is not None: unhashed_subpackets.append(protocol.subpacket(32, embedded_sig)) - unhashed_subpackets.append(protocol.CUSTOM_SUBPACKET) - - if not decode.has_custom_subpacket(signature): - signer_func = await keyring.create_agent_signer(user_id['value']) signature = await protocol.make_signature( signer_func=signer_func, diff --git a/libagent/gpg/keyring.py b/libagent/gpg/keyring.py index be53cee6..520474aa 100644 --- a/libagent/gpg/keyring.py +++ b/libagent/gpg/keyring.py @@ -39,8 +39,8 @@ async def get_agent_sock_path(env=None, run_process=trio.run_process): async def connect_to_agent(env=None, run_process=trio.run_process): """Connect to GPG agent's UNIX socket.""" sock_path = get_agent_sock_path(run_process=run_process, env=env) - # Make sure the original gpg-agent is running. - await check_output(args=['gpg-connect-agent', '/bye'], run_process=run_process) + # This forces the gpg-agent configured for this environment to run. + await check_output(args=gpg_command(['--list-secret-keys']), run_process=run_process, env=env) if sys.platform == 'win32': sock = await win_server.Client.open(sock_path) else: @@ -228,13 +228,6 @@ async def gpg_command(args, env=None): return [cmd] + args -async def get_keygrip(user_id, run_process=trio.run_process): - """Get a keygrip of the primary GPG key of the specified user.""" - args = await gpg_command(['--list-keys', '--with-keygrip', user_id]) - output = await check_output(args=args, run_process=run_process).decode('utf-8') - return re.findall(r'Keygrip = (\w+)', output)[0] - - async def gpg_version(run_process=trio.run_process): """Get a keygrip of the primary GPG key of the specified user.""" args = await gpg_command(['--version']) @@ -247,7 +240,7 @@ async def gpg_version(run_process=trio.run_process): async def export_public_key(user_id, env=None, run_process=trio.run_process): """Export GPG public key for specified `user_id`.""" - args = await gpg_command(['--export', user_id]) + args = await gpg_command(['--export', '--export-filter', 'select=uid=' + user_id]) result = await check_output(args=args, env=env, run_process=run_process) if not result: log.error('could not find public key %r in local GPG keyring', user_id) @@ -264,10 +257,15 @@ async def export_public_keys(env=None, run_process=trio.run_process): return result -async def create_agent_signer(user_id): +async def delete_public_key(key_id, env=None, run_process=trio.run_process): + """Export all GPG public keys.""" + args = await gpg_command(['--delete-keys', '--expert', '--batch', '--yes', key_id]) + await check_output(args=args, env=env, run_process=run_process) + + +async def create_agent_signer(keygrip, env): """Sign digest with existing GPG keys using gpg-agent tool.""" - sock = await connect_to_agent(env=os.environ) - keygrip = await get_keygrip(user_id) + sock = await connect_to_agent(env=env) async def sign(digest): """Sign the digest and return an ECDSA/RSA/DSA signature.""" diff --git a/libagent/gpg/keystore.py b/libagent/gpg/keystore.py new file mode 100644 index 00000000..cf20a8b8 --- /dev/null +++ b/libagent/gpg/keystore.py @@ -0,0 +1,114 @@ +"""Storage for information needed to regenerate GPG private keys.""" + +import binascii +import logging +import os + +import trio + +from . import client, protocol, util + +log = logging.getLogger(__name__) + + +async def store_key(c, user_id, curve_name, ecdh, created, homedir, open_file=trio.open_file): + """Stores the specified key parameters into a matching file in the keystore. + + The file name is based on the keygrip generated from the specified key parameters. + Returns the generated public key object for the key parameters. + """ + # pylint: disable=too-many-arguments + assert curve_name in protocol.SUPPORTED_CURVES + identity = client.create_identity(user_id=user_id, curve_name=curve_name) + verifying_key = await c.pubkey(identity=identity, ecdh=ecdh) + pubkey = protocol.PublicKey( + curve_name=curve_name, created=created, + verifying_key=verifying_key, ecdh=ecdh) + keygrip = util.hexlify(pubkey.keygrip()) + keydir = os.path.join(homedir, 'private-keys-v1.d') + await trio.Path(keydir).mkdir(mode=0o700, parents=True, exist_ok=True) + async with await open_file(os.path.join(keydir, '{0}.key'.format(keygrip)), 'w') as f: + await f.write('Path: {0}\n'.format(util.hexlify(user_id.encode('utf-8')))) + await f.write('Curve: {0}\n'.format(curve_name)) + await f.write('ECDH: {0}\n'.format('True' if ecdh else 'False')) + await f.write('Created: {0}\n'.format(created)) + await trio.Path(f.name).chmod(0o700) + return pubkey + + +async def load_key(c, keygrip, homedir, open_file=trio.open_file): + """Loads the key matching the specified keygrip from the keystore. + + Verifies that the loaded parameters match a key generated with the current device. + Returns a dictionary containing the parameters used to generate the key, + as well as the identity and public key generated from the key. + """ + user_id = None + curve_name = None + ecdh = None + created = None + keyfile = os.path.join(homedir, 'private-keys-v1.d', '{0}.key'.format(util.hexlify(keygrip))) + try: + async with await open_file(keyfile, 'r') as f: + async for line in f: + if line.startswith('Path: '): + try: + user_id = binascii.unhexlify(line[6:-1].encode('ascii')).decode('utf-8') + except binascii.Error: + pass + except UnicodeDecodeError: + pass + elif line.startswith('Curve: '): + curve_name = line[7:-1] + elif line == 'ECDH: True\n': + ecdh = True + elif line == 'ECDH: False\n': + ecdh = False + elif line.startswith('Created: '): + created = int(line[9:-1]) + except IOError as e: + raise KeyError('key file "{0}" could not be read'.format(keyfile)) from e + if user_id is None or curve_name is None or ecdh is None or created is None: + raise KeyError('key file "{0}" is corrupt or incomplete'.format(keyfile)) + if curve_name not in protocol.SUPPORTED_CURVES: + raise KeyError('key file "{0}" has invalid curve name "{1}"'.format(keyfile, curve_name)) + identity = client.create_identity(user_id=user_id, curve_name=curve_name) + verifying_key = await c.pubkey(identity=identity, ecdh=ecdh) + pubkey = protocol.PublicKey( + curve_name=curve_name, created=created, + verifying_key=verifying_key, ecdh=ecdh) + if keygrip != pubkey.keygrip(): + # Sanity check: If the key file is invalid, or was generated by a different device, fail + raise KeyError( + 'key file "{0}" generated mismatched keygrip "{1}". Did you use the wrong device?' + .format(keyfile, util.hexlify(pubkey.keygrip()))) + return { + 'user_id': user_id, + 'curve_name': curve_name, + 'ecdh': ecdh, + 'created': created, + 'identity': identity, + 'verifying_key': verifying_key, + 'pubkey': pubkey + } + + +async def delete_key(keygrip, homedir, trio_path=trio.Path): + """Deletes the key matching the specified key grip from the key store.""" + keyfile = os.path.join(homedir, 'private-keys-v1.d', '{0}.key'.format(util.hexlify(keygrip))) + await trio_path(keyfile).unlink() + + +async def list_keys(c, homedir, open_file=trio.open_file, trio_path=trio.Path): + """Lists all available keys in the key store.""" + result = [] + keyglob = os.path.join(homedir, 'private-keys-v1.d') + for keyfile in await trio_path(keyglob).glob('*.key'): + try: + keygrip = binascii.unhexlify(keyfile.stem) + # Actually testing the keys takes too long. Better to return it unconditionally. + # await load_key(c, keygrip, homedir, open_file) + result.append(keygrip) + except Exception as e: # pylint: disable=broad-except + log.warning('Invalid key file "%s" in key store: %s', keyfile, e) + return result diff --git a/libagent/gpg/protocol.py b/libagent/gpg/protocol.py index 8941be02..0e54130c 100644 --- a/libagent/gpg/protocol.py +++ b/libagent/gpg/protocol.py @@ -174,10 +174,6 @@ def keygrip_curve25519(vk): ECDH_ALGO_ID = 18 -CUSTOM_KEY_LABEL = b'TREZOR-GPG' # marks "our" pubkey -CUSTOM_SUBPACKET_ID = 26 # use "policy URL" subpacket -CUSTOM_SUBPACKET = subpacket(CUSTOM_SUBPACKET_ID, CUSTOM_KEY_LABEL) - def get_curve_name_by_oid(oid): """Return curve name matching specified OID, or raise KeyError.""" diff --git a/libagent/gpg/tests/test_decode.py b/libagent/gpg/tests/test_decode.py index 7cd240eb..1daff5d1 100644 --- a/libagent/gpg/tests/test_decode.py +++ b/libagent/gpg/tests/test_decode.py @@ -41,38 +41,3 @@ def public_key_path(request): def test_gpg_files(public_key_path): # pylint: disable=redefined-outer-name with open(public_key_path, 'rb') as f: assert list(decode.parse_packets(f)) - - -def test_has_custom_subpacket(): - sig = {'unhashed_subpackets': []} - assert not decode.has_custom_subpacket(sig) - - custom_markers = [ - protocol.CUSTOM_SUBPACKET, - protocol.subpacket(10, protocol.CUSTOM_KEY_LABEL), - ] - for marker in custom_markers: - sig = {'unhashed_subpackets': [marker]} - assert decode.has_custom_subpacket(sig) - - -def test_load_by_keygrip_missing(): - with pytest.raises(KeyError): - decode.load_by_keygrip(pubkey_bytes=b'', keygrip=b'') - - -def test_keygrips(): - pubkey_bytes = (cwd / "romanz-pubkey.gpg").open("rb").read() - keygrips = list(decode.iter_keygrips(pubkey_bytes)) - assert [k.hex() for k in keygrips] == [ - '7b2497258d76bc6539ed88d018cd1c739e2dbb6c', - '30ae97f3d8e0e34c5ed80e1715fd442ca24c0a8e', - ] - - for keygrip in keygrips: - pubkey_dict, user_ids = decode.load_by_keygrip(pubkey_bytes, keygrip) - assert pubkey_dict['keygrip'] == keygrip - assert [u['value'] for u in user_ids] == [ - b'Roman Zeyde ', - b'Roman Zeyde ', - ] diff --git a/libagent/server.py b/libagent/server.py index 1fb9ede9..586eb5ff 100644 --- a/libagent/server.py +++ b/libagent/server.py @@ -1,4 +1,4 @@ -"""UNIX-domain socket server for ssh-agent implementation.""" +"""UNIX-domain socket server and related utility functions.""" import contextlib import functools import logging