From 0a16b09a56687cd7b21126e981bb0c5bc251eac2 Mon Sep 17 00:00:00 2001 From: Emerson Rocha Date: Tue, 30 Mar 2021 16:21:17 -0300 Subject: [PATCH] hdp-integrity (#17), hdp-spec (#16): is_not_acceptable_load_this() created --- hxlm/core/hdp/hazmat/policy.py | 73 +++++++++++++++++++++++++++++++++- hxlm/core/hdp/project.py | 22 ++++++++-- 2 files changed, 89 insertions(+), 6 deletions(-) diff --git a/hxlm/core/hdp/hazmat/policy.py b/hxlm/core/hdp/hazmat/policy.py index 4fd0c78..80b2064 100644 --- a/hxlm/core/hdp/hazmat/policy.py +++ b/hxlm/core/hdp/hazmat/policy.py @@ -15,6 +15,15 @@ """ import os +from urllib.parse import urlparse + +from typing import ( + Union +) + +from hxlm.core.io.util import ( + get_entrypoint_type +) from hxlm.core.types import ( EntryPointType, @@ -28,10 +37,13 @@ # os.environ["HDP_DEBUG"] = "1" _IS_DEBUG = bool(os.getenv('HDP_DEBUG', '')) +__all__ = ['get_policy_HDSL1', 'get_policy_HDSL4', + 'is_not_acceptable_load_this'] + def _get_bunker() -> HDPPolicyLoad: """bunker""" - policy = HDPPolicyLoad + policy = HDPPolicyLoad() policy.enforce_startup_generic_tests = True @@ -64,7 +76,7 @@ def _get_user_know_what_is_doing() -> HDPPolicyLoad: Returns: HDPPolicyLoad: The policy """ - policy = HDPPolicyLoad + policy = HDPPolicyLoad() # Note: in general the default generic HDPPolicyLoad somewhat already # is flexible (with exception that does not have hardcoded names @@ -124,3 +136,60 @@ def get_policy_HDSL4() -> HDPPolicyLoad: """ # pylint: disable=invalid-name return _get_bunker() + + +def is_not_acceptable_load_this( + entrypoint_str: str, + policy: HDPPolicyLoad) -> Union[bool, str]: + """Checj if an entrypoint string is not acceptable by an policy + + Args: + entrypoint_str (str): An full entrypoint string (not an object) + policy (HDPPolicyLoad): the reference policy + + Returns: + Union[bool, str]: False if ok. String with explanation if not. + + >>> url_INT = 'https://example.int/data/data.lat.urn.yml' + >>> url_com = 'git://example.com/data/data.lat.urn.yml' + >>> url_SSH = 'ssh://example.int/home/user/data/data.lat.urn.yml' + >>> pHDSL1 = get_policy_HDSL1() + >>> pHDSL4 = get_policy_HDSL4() + >>> is_not_acceptable_load_this(url_INT, pHDSL1) + False + >>> is_not_acceptable_load_this(url_com, pHDSL1) + '∉ policy.allowed_entrypoint_type' + >>> is_not_acceptable_load_this(url_SSH, pHDSL1) + False + >>> is_not_acceptable_load_this(url_SSH, pHDSL4) + '¬ policy.allowed_entrypoint_type' + """ + + if policy.debug_no_restrictions: + return False + + if entrypoint_str.find('://') == -1: + return '∄ (RFC3986 protocol)' + + etype = get_entrypoint_type(entrypoint_str) + + # print(entrypoint_str, etype, policy.allowed_entrypoint_type) + if etype not in policy.allowed_entrypoint_type: + # print(entrypoint_str, etype, policy.allowed_entrypoint_type) + return '¬ policy.allowed_entrypoint_type' + + if etype in [EntryPointType.HTTP, EntryPointType.FTP, + EntryPointType.GIT, EntryPointType.SSH]: + result = urlparse(entrypoint_str).netloc + # print('result', result) + if policy.custom_allowed_domains is None or \ + len(policy.custom_allowed_domains) == 0: + return '∅ policy.custom_allowed_domains' + # return False + for domain_suffix in policy.custom_allowed_domains: + if result.endswith(domain_suffix): + return False + return '∉ policy.allowed_entrypoint_type' + # return False + + return False diff --git a/hxlm/core/hdp/project.py b/hxlm/core/hdp/project.py index d5373e2..97bdfcd 100644 --- a/hxlm/core/hdp/project.py +++ b/hxlm/core/hdp/project.py @@ -39,6 +39,7 @@ ) from hxlm.core.hdp.datamodel import ( + HDPPolicyLoad, HSiloWrapper, HDPRaw ) @@ -52,6 +53,10 @@ is_raw_hdp_item_syntax, ResourceWrapper ) +from hxlm.core.hdp.hazmat.policy import ( + get_policy_HDSL1, + is_not_acceptable_load_this +) from hxlm.core.localization.util import ( l10n @@ -98,11 +103,18 @@ class HDPProject: allow user correct in running time) """ - def __init__(self, entrypoint: Any, user_l10n: L10NContext): + def __init__(self, entrypoint: Any, + user_l10n: L10NContext, + policy: HDPPolicyLoad): # self._entry_point = entrypoint self._l10n = user_l10n self._parse_entrypoint(entrypoint) + if is_not_acceptable_load_this(entrypoint, policy): + raise SyntaxError('[' + entrypoint + + '] ¬ is_acceptable_load_this [' + + str(policy) + ']') + def _parse_entrypoint(self, entrypoint: Any): """Generic parser for the initial entrypoint @@ -198,7 +210,9 @@ def project(entry_point: str) -> HDPProject: HDPProject: An HDPProject instance """ user_l10n = l10n() - # raise SyntaxError(l10n_user.know_languages) - # raise SyntaxError(l10n_user.about()) - result = HDPProject(entry_point, user_l10n=user_l10n) + + # TODO: eventually the policy should be configurable also on startup + # not only when running + policy = get_policy_HDSL1() + result = HDPProject(entry_point, user_l10n=user_l10n, policy=policy) return result