From 75700e5c9e7f82d4726aef32699ef11ce8338d56 Mon Sep 17 00:00:00 2001 From: Kashif Iftikhar Date: Tue, 30 Aug 2022 01:04:33 +0500 Subject: [PATCH] key generation backup logic and bugfix (#189) - Fixed code for new gateway_mfr result format - Updated test cases for the new gateway_mfr (>0.2.0) version response format. - Fixed provisioning being passed as list bug - Added key generation logic as backup if key provisioning fails. --- hm_pyhelper/miner_param.py | 79 ++++++--------- hm_pyhelper/tests/test_miner_param.py | 138 ++++++++------------------ setup.py | 2 +- 3 files changed, 73 insertions(+), 146 deletions(-) diff --git a/hm_pyhelper/miner_param.py b/hm_pyhelper/miner_param.py index 492bc5d..174a831 100644 --- a/hm_pyhelper/miner_param.py +++ b/hm_pyhelper/miner_param.py @@ -132,7 +132,11 @@ def get_gateway_mfr_command(sub_command: str) -> list: except (UnknownVariantException, UnknownVariantAttributeException) as e: LOGGER.warning(str(e) + ' Omitting --device arg.') - command.append(sub_command) + if ' ' in sub_command: + command += sub_command.split(' ') + else: + command.append(sub_command) + else: raise UnsupportedGatewayMfrVersion(f"Unsupported gateway_mfr version {gateway_mfr_version}") @@ -168,73 +172,48 @@ def provision_key(): if did_gateway_mfr_test_result_include_miner_key_pass(test_results): return True + provisioning_successful = False + try: - gateway_mfr_result = run_gateway_mfr(["provision"]) + gateway_mfr_result = run_gateway_mfr("provision") LOGGER.info("[ECC Provisioning] %s", gateway_mfr_result) + provisioning_successful = True except subprocess.CalledProcessError: LOGGER.error("[ECC Provisioning] Exited with a non-zero status") - return False + provisioning_successful = False except Exception as exp: LOGGER.error("[ECC Provisioning] Error during provisioning. %s" % str(exp)) - return False + provisioning_successful = False + + # Try key generation. + if provisioning_successful is False: + try: + gateway_mfr_result = run_gateway_mfr("key --generate") + provisioning_successful = True + + except Exception as exp: + LOGGER.error("[ECC Provisioning] key --generate failed: %s" % str(exp)) - return True + return provisioning_successful def did_gateway_mfr_test_result_include_miner_key_pass( gateway_mfr_test_result ): """ - Returns true if gateway_mfr_test_result["tests"] has an entry where - "test": "miner_key(0)" and "result": "pass" - Input: { - "result": "pass", - "tests": [ - { - "output": "ok", - "result": "pass", - "test": "serial" - }, - { - "output": "ok", - "result": "pass", - "test": "zone_locked(data)" - }, - { - "output": "ok", - "result": "pass", - "test": "zone_locked(config)" - }, - { - "output": "ok", - "result": "pass", - "test": "slot_config(0..=15, ecc)" - }, - { - "output": "ok", - "result": "pass", - "test": "key_config(0..=15, ecc)" - }, - { - "output": "ok", - "result": "pass", - "test": "miner_key(0)" - } - ] + Returns true if gateway_mfr_test_result["tests"] has an key "miner_key(0)" with value + being a dict that contains result key value set to pass. + + { + 'result': 'pass', + 'tests': 'miner_key(0)': {'result': 'pass'} } """ - def is_miner_key_and_passed(test_result): - return test_result['test'] == 'miner_key(0)' and \ - test_result['result'] == 'pass' - - results_is_miner_key_and_passed = map( - is_miner_key_and_passed, - gateway_mfr_test_result['tests'] - ) - return any(results_is_miner_key_and_passed) + return gateway_mfr_test_result.get( + 'tests', {}).get('miner_key(0)', {}).get('result', 'fail') == 'pass' def get_ethernet_addresses(diagnostics): diff --git a/hm_pyhelper/tests/test_miner_param.py b/hm_pyhelper/tests/test_miner_param.py index c0f5440..c4736e2 100644 --- a/hm_pyhelper/tests/test_miner_param.py +++ b/hm_pyhelper/tests/test_miner_param.py @@ -13,71 +13,50 @@ get_mac_address, get_public_keys_rust, get_gateway_mfr_version, get_gateway_mfr_command -ALL_PASS_GATEWAY_MFR_TESTS = [ - { - "output": "ok", - "result": "pass", - "test": "serial" - }, - { - "output": "ok", - "result": "pass", - "test": "zone_locked(data)" - }, - { - "output": "ok", - "result": "pass", - "test": "zone_locked(config)" - }, - { - "output": "ok", - "result": "pass", - "test": "slot_config(0..=15, ecc)" - }, - { - "output": "ok", - "result": "pass", - "test": "key_config(0..=15, ecc)" - }, - { - "output": "ok", - "result": "pass", - "test": "miner_key(0)" - } -] - -NONE_PASS_GATEWAY_MFR_TESTS = [ - { - "output": "timeout/retry error", - "result": "fail", - "test": "serial" - }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "zone_locked(data)" - }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "zone_locked(config)" - }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "slot_config(0..=15, ecc)" +ALL_PASS_GATEWAY_MFR_TESTS = { + 'ecdh(0)': {'error': 'decode error\n\nCaused by:\n not a compact key', 'result': 'fail'}, + 'key_config(0)': { + 'checks': { + 'auth_key': '0', + 'intrusion_disable': 'false', + 'key_type': 'ecc', + 'lockable': 'true', + 'private': 'true', + 'pub_info': 'true', + 'req_auth': 'false', + 'req_random': 'false', + 'x509_index': '0' + }, + 'result': 'pass' }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "key_config(0..=15, ecc)" + 'miner_key(0)': {'checks': 'ok', 'result': 'pass'}, + 'sign(0)': {'checks': 'ok', 'result': 'pass'}, + 'slot_config(0)': { + 'checks': { + 'ecdh_operation': 'true', + 'encrypt_read': 'false', + 'external_signatures': 'true', + 'internal_signatures': 'true', + 'limited_use': 'false', + 'secret': 'true' + }, + 'result': 'pass' }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "miner_key(0)" - } - ] + 'zone_locked(config)': {'checks': 'ok', 'result': 'pass'}, + 'zone_locked(data)': {'checks': 'ok', 'result': 'pass'} +} + +ERROR_MESSAGE = 'decode error\n\nCaused by:\n not a compact key' + +NONE_PASS_GATEWAY_MFR_TESTS = { + 'ecdh(0)': {'error': ERROR_MESSAGE, 'result': 'fail'}, + 'key_config(0)': {'error': ERROR_MESSAGE, 'result': 'fail'}, + 'miner_key(0)': {'error': ERROR_MESSAGE, 'result': 'fail'}, + 'sign(0)': {'error': ERROR_MESSAGE, 'result': 'fail'}, + 'slot_config(0)': {'error': ERROR_MESSAGE, 'result': 'fail'}, + 'zone_locked(config)': {'error': ERROR_MESSAGE, 'result': 'fail'}, + 'zone_locked(data)': {'error': ERROR_MESSAGE, 'result': 'fail'} +} MOCK_VARIANT_DEFINITIONS = { 'NEBHNT-WITH-ECC-ADDRESS': { @@ -374,38 +353,7 @@ def test_did_gateway_mfr_test_result_include_miner_key_fail( def test_did_gateway_mfr_test_result_include_miner_key_pass(self): get_gateway_mfr_test_result = { "result": "fail", - "tests": [ - { - "output": "timeout/retry error", - "result": "fail", - "test": "serial" - }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "zone_locked(data)" - }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "zone_locked(config)" - }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "slot_config(0..=15, ecc)" - }, - { - "output": "timeout/retry error", - "result": "fail", - "test": "key_config(0..=15, ecc)" - }, - { - "output": "ok", - "result": "pass", - "test": "miner_key(0)" - } - ] + "tests": ALL_PASS_GATEWAY_MFR_TESTS } self.assertTrue( did_gateway_mfr_test_result_include_miner_key_pass( diff --git a/setup.py b/setup.py index e25e49b..14c844a 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='hm_pyhelper', - version='0.13.34', + version='0.13.35', author="Nebra Ltd", author_email="support@nebra.com", description="Helium Python Helper",