Skip to content

Commit

Permalink
key generation backup logic and bugfix (#189)
Browse files Browse the repository at this point in the history
- Fixed code for new gateway_mfr result format
- Updated test cases for the new gateway_mfr (>0.2.0) version response format.
- Fixed provisioning being passed as list bug
- Added key generation logic as backup if key provisioning fails.
  • Loading branch information
kashifpk authored Aug 29, 2022
1 parent 50518a0 commit 75700e5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 146 deletions.
79 changes: 29 additions & 50 deletions hm_pyhelper/miner_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ def get_gateway_mfr_command(sub_command: str) -> list:
except (UnknownVariantException, UnknownVariantAttributeException) as e:
LOGGER.warning(str(e) + ' Omitting --device arg.')

command.append(sub_command)
if ' ' in sub_command:
command += sub_command.split(' ')
else:
command.append(sub_command)

else:
raise UnsupportedGatewayMfrVersion(f"Unsupported gateway_mfr version {gateway_mfr_version}")

Expand Down Expand Up @@ -168,73 +172,48 @@ def provision_key():
if did_gateway_mfr_test_result_include_miner_key_pass(test_results):
return True

provisioning_successful = False

try:
gateway_mfr_result = run_gateway_mfr(["provision"])
gateway_mfr_result = run_gateway_mfr("provision")
LOGGER.info("[ECC Provisioning] %s", gateway_mfr_result)
provisioning_successful = True

except subprocess.CalledProcessError:
LOGGER.error("[ECC Provisioning] Exited with a non-zero status")
return False
provisioning_successful = False

except Exception as exp:
LOGGER.error("[ECC Provisioning] Error during provisioning. %s" % str(exp))
return False
provisioning_successful = False

# Try key generation.
if provisioning_successful is False:
try:
gateway_mfr_result = run_gateway_mfr("key --generate")
provisioning_successful = True

except Exception as exp:
LOGGER.error("[ECC Provisioning] key --generate failed: %s" % str(exp))

return True
return provisioning_successful


def did_gateway_mfr_test_result_include_miner_key_pass(
gateway_mfr_test_result
):
"""
Returns true if gateway_mfr_test_result["tests"] has an entry where
"test": "miner_key(0)" and "result": "pass"
Input: {
"result": "pass",
"tests": [
{
"output": "ok",
"result": "pass",
"test": "serial"
},
{
"output": "ok",
"result": "pass",
"test": "zone_locked(data)"
},
{
"output": "ok",
"result": "pass",
"test": "zone_locked(config)"
},
{
"output": "ok",
"result": "pass",
"test": "slot_config(0..=15, ecc)"
},
{
"output": "ok",
"result": "pass",
"test": "key_config(0..=15, ecc)"
},
{
"output": "ok",
"result": "pass",
"test": "miner_key(0)"
}
]
Returns true if gateway_mfr_test_result["tests"] has an key "miner_key(0)" with value
being a dict that contains result key value set to pass.
{
'result': 'pass',
'tests': 'miner_key(0)': {'result': 'pass'}
}
"""

def is_miner_key_and_passed(test_result):
return test_result['test'] == 'miner_key(0)' and \
test_result['result'] == 'pass'

results_is_miner_key_and_passed = map(
is_miner_key_and_passed,
gateway_mfr_test_result['tests']
)
return any(results_is_miner_key_and_passed)
return gateway_mfr_test_result.get(
'tests', {}).get('miner_key(0)', {}).get('result', 'fail') == 'pass'


def get_ethernet_addresses(diagnostics):
Expand Down
138 changes: 43 additions & 95 deletions hm_pyhelper/tests/test_miner_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,71 +13,50 @@
get_mac_address, get_public_keys_rust, get_gateway_mfr_version, get_gateway_mfr_command


ALL_PASS_GATEWAY_MFR_TESTS = [
{
"output": "ok",
"result": "pass",
"test": "serial"
},
{
"output": "ok",
"result": "pass",
"test": "zone_locked(data)"
},
{
"output": "ok",
"result": "pass",
"test": "zone_locked(config)"
},
{
"output": "ok",
"result": "pass",
"test": "slot_config(0..=15, ecc)"
},
{
"output": "ok",
"result": "pass",
"test": "key_config(0..=15, ecc)"
},
{
"output": "ok",
"result": "pass",
"test": "miner_key(0)"
}
]

NONE_PASS_GATEWAY_MFR_TESTS = [
{
"output": "timeout/retry error",
"result": "fail",
"test": "serial"
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "zone_locked(data)"
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "zone_locked(config)"
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "slot_config(0..=15, ecc)"
ALL_PASS_GATEWAY_MFR_TESTS = {
'ecdh(0)': {'error': 'decode error\n\nCaused by:\n not a compact key', 'result': 'fail'},
'key_config(0)': {
'checks': {
'auth_key': '0',
'intrusion_disable': 'false',
'key_type': 'ecc',
'lockable': 'true',
'private': 'true',
'pub_info': 'true',
'req_auth': 'false',
'req_random': 'false',
'x509_index': '0'
},
'result': 'pass'
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "key_config(0..=15, ecc)"
'miner_key(0)': {'checks': 'ok', 'result': 'pass'},
'sign(0)': {'checks': 'ok', 'result': 'pass'},
'slot_config(0)': {
'checks': {
'ecdh_operation': 'true',
'encrypt_read': 'false',
'external_signatures': 'true',
'internal_signatures': 'true',
'limited_use': 'false',
'secret': 'true'
},
'result': 'pass'
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "miner_key(0)"
}
]
'zone_locked(config)': {'checks': 'ok', 'result': 'pass'},
'zone_locked(data)': {'checks': 'ok', 'result': 'pass'}
}

ERROR_MESSAGE = 'decode error\n\nCaused by:\n not a compact key'

NONE_PASS_GATEWAY_MFR_TESTS = {
'ecdh(0)': {'error': ERROR_MESSAGE, 'result': 'fail'},
'key_config(0)': {'error': ERROR_MESSAGE, 'result': 'fail'},
'miner_key(0)': {'error': ERROR_MESSAGE, 'result': 'fail'},
'sign(0)': {'error': ERROR_MESSAGE, 'result': 'fail'},
'slot_config(0)': {'error': ERROR_MESSAGE, 'result': 'fail'},
'zone_locked(config)': {'error': ERROR_MESSAGE, 'result': 'fail'},
'zone_locked(data)': {'error': ERROR_MESSAGE, 'result': 'fail'}
}

MOCK_VARIANT_DEFINITIONS = {
'NEBHNT-WITH-ECC-ADDRESS': {
Expand Down Expand Up @@ -374,38 +353,7 @@ def test_did_gateway_mfr_test_result_include_miner_key_fail(
def test_did_gateway_mfr_test_result_include_miner_key_pass(self):
get_gateway_mfr_test_result = {
"result": "fail",
"tests": [
{
"output": "timeout/retry error",
"result": "fail",
"test": "serial"
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "zone_locked(data)"
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "zone_locked(config)"
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "slot_config(0..=15, ecc)"
},
{
"output": "timeout/retry error",
"result": "fail",
"test": "key_config(0..=15, ecc)"
},
{
"output": "ok",
"result": "pass",
"test": "miner_key(0)"
}
]
"tests": ALL_PASS_GATEWAY_MFR_TESTS
}
self.assertTrue(
did_gateway_mfr_test_result_include_miner_key_pass(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name='hm_pyhelper',
version='0.13.34',
version='0.13.35',
author="Nebra Ltd",
author_email="[email protected]",
description="Helium Python Helper",
Expand Down

0 comments on commit 75700e5

Please sign in to comment.