Skip to content

Commit

Permalink
Fix Support for ssh-certificates
Browse files Browse the repository at this point in the history
  • Loading branch information
Senjuu committed Mar 19, 2024
1 parent dace3eb commit fc7684d
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 11 deletions.
76 changes: 76 additions & 0 deletions libagent/ssh/certificate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Utilities for signing ssh-certificates."""
import io

from . import formats, util


def _parse_stringlist(i):
res = []
length, = util.recv(i, '>L')
while length >= 4:
size, = util.recv(i, '>L')
length -= 4
size = min(size, length)
if size == 0:
continue
res.append(util.recv(i, size).decode('utf8'))
length -= size
return res


def parse(blob):
"""Parses a data blob to a to-be-signed ssh-certificate."""
# https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys
res = {}
i = io.BytesIO(blob)
firstString = util.read_frame(i)
if firstString.endswith(b'[email protected]'):
res['isCertificate'] = True
_certificate_key_type = firstString
_nonce = util.read_frame(i)
if (_certificate_key_type.startswith(b'ssh-rsa')):
_pub_key = {}
_pub_key['e'] = util.read_frame(i)
_pub_key['n'] = util.read_frame(i)
elif (_certificate_key_type.startswith(b'ssh-dsa')):
_pub_key = {}
_pub_key['p'] = util.read_frame(i)
_pub_key['q'] = util.read_frame(i)
_pub_key['g'] = util.read_frame(i)
_pub_key['y'] = util.read_frame(i)
elif (_certificate_key_type.startswith(b'ecdsa-sha2-nistp')):
_curve = util.read_frame(i)
_pub_key = util.read_frame(i)
elif (_certificate_key_type.startswith(b'ssh-ed25519')):
_pub_key = util.read_frame(i)
else:
raise ValueError('unknown certificate key type: '+_certificate_key_type.decode('utf8'))
_serial_number, = util.recv(i, '>Q')
res['certificate_type'], = util.recv(i, '>L')
_key_id_ = util.read_frame(i)
res['principals'] = _parse_stringlist(i)
res['principals'] = ', '.join(res['principals'])
_valid_after, = util.recv(i, '>Q')
_valid_before, = util.recv(i, '>Q')
_critical_options = _parse_stringlist(i)
_extensions = _parse_stringlist(i)
_reserved = util.read_frame(i)
_signature_key = util.read_frame(i)
assert not i.read()
return res
res['isCertificate'] = False
i.close()
return res


def format(certificate):
"""
Makes certificate better human readable.
Formats list properties to comma seperated strings and
the signature key to human readable string.
"""
certificate['principals'] = ', '.join(certificate['principals'])
certificate['critical_options'] = ', '.join(certificate['critical_options'])
certificate['extensions'] = ', '.join(certificate['extensions'])
certificate['signature_key'] = formats.parse_pubkey(certificate['signature_key'])
40 changes: 29 additions & 11 deletions libagent/ssh/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io
import logging

from . import formats, util
from . import certificate, formats, util

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -36,6 +36,15 @@ def sign_ssh_challenge(self, blob, identity):
if msg['sshsig']:
log.info('please confirm "%s" signature for "%s" using %s...',
msg['namespace'], identity.to_string(), self.device)
elif msg['sshcertsign']:
entity = 'unknown'
if msg['certificate_type'] == 1:
entity = 'user'
elif msg['certificate_type'] == 2:
entity = 'host'
log.info('please confirm signing public key for %s "%s" with "%s" using %s...',
entity, msg['principals'], identity.to_string(),
self.device)
else:
log.debug('%s: user %r via %r (%r)',
msg['conn'], msg['user'], msg['auth'], msg['key_type'])
Expand All @@ -59,22 +68,31 @@ def parse_ssh_blob(data):
i = io.BytesIO(data[6:])
# https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.sshsig
res['sshsig'] = True
res['sshcertsign'] = False
res['namespace'] = util.read_frame(i)
res['reserved'] = util.read_frame(i)
res['hashalg'] = util.read_frame(i)
res['message'] = util.read_frame(i)
else:
i = io.BytesIO(data)
res['sshsig'] = False
res['nonce'] = util.read_frame(i)
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
res['user'] = util.read_frame(i)
res['conn'] = util.read_frame(i)
res['auth'] = util.read_frame(i)
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
res['key_type'] = util.read_frame(i)
public_key = util.read_frame(i)
res['public_key'] = formats.parse_pubkey(public_key)
_certificate = certificate.parse(data)
if _certificate['isCertificate']:
certificate.format(_certificate)
_certificate['sshsig'] = res['sshsig']
_certificate['sshcertsign'] = True
return _certificate
else:
res['sshcertsign'] = False
i = io.BytesIO(data)
res['nonce'] = util.read_frame(i)
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
res['user'] = util.read_frame(i)
res['conn'] = util.read_frame(i)
res['auth'] = util.read_frame(i)
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
res['key_type'] = util.read_frame(i)
public_key = util.read_frame(i)
res['public_key'] = formats.parse_pubkey(public_key)

unparsed = i.read()
if unparsed:
Expand Down
2 changes: 2 additions & 0 deletions libagent/ssh/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def test_parse_ssh_challenge():
'fingerprint': '47:a3:26:af:0b:5d:a2:c3:91:ed:26:36:94:be:3a:d5',
'type': b'ssh-ed25519'},
'sshsig': False,
'sshcertsign': False,
'user': b'git',
}

Expand All @@ -122,4 +123,5 @@ def test_parse_ssh_signature():
'namespace': b'file',
'reserved': b'',
'sshsig': True,
'sshcertsign': False,
}

0 comments on commit fc7684d

Please sign in to comment.