Skip to content

Commit

Permalink
Issue #1930 Replaced for code based on ssl.get_server_certificate().
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikitin, Alexander committed Jan 14, 2024
1 parent 9718381 commit b81b9c8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
3 changes: 3 additions & 0 deletions changelogs/fragments/1930-fix_ssl_deprecation_function.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
bugfixes:
- module_utils/vmware.py - remove ssl.wrap_socet() function. Replaced for code based on ssl.get_server_certificate().
https://github.com/ansible-collections/community.vmware/issues/1930
23 changes: 11 additions & 12 deletions plugins/module_utils/vmware.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,9 +1174,9 @@ def vcenter_version_at_least(self, version=None):
self.module.fail_json(msg='The passed vCenter version: %s is None.' % version)

def get_cert_fingerprint(self, fqdn, port, proxy_host=None, proxy_port=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
if proxy_host:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((
proxy_host,
proxy_port))
Expand All @@ -1191,17 +1191,16 @@ def get_cert_fingerprint(self, fqdn, port, proxy_host=None, proxy_port=None):
der_cert_bin = ctx.wrap_socket(sock, server_hostname=fqdn).getpeercert(True)
sock.close()
else:
wrapped_socket = ssl.wrap_socket(sock)
try:
wrapped_socket.connect((fqdn, port))
except socket.error as socket_error:
self.module.fail_json(msg="Cannot connect to host : %s" % socket_error)
else:
der_cert_bin = wrapped_socket.getpeercert(True)
wrapped_socket.close()

string = str(hashlib.sha1(der_cert_bin).hexdigest())
return ':'.join(a + b for a, b in zip(string[::2], string[1::2]))
pem = ssl.get_server_certificate((fqdn, port))
except Exception:
self.module.fail_json(msg=f"Cannot connect to host: {fqdn}")
der_cert_bin = ssl.PEM_cert_to_DER_cert(pem)
if der_cert_bin:
string = str(hashlib.sha1(der_cert_bin).hexdigest())
return ':'.join(a + b for a, b in zip(string[::2], string[1::2]))
else:
self.module.fail_json(msg=f"Unable to obtain certificate fingerprint for host: {fqdn}")

def get_managed_objects_properties(self, vim_type, properties=None):
"""
Expand Down

0 comments on commit b81b9c8

Please sign in to comment.