Skip to content

Commit

Permalink
hdp-integrity (#17), hdp-spec (#16): is_not_acceptable_load_this() cr…
Browse files Browse the repository at this point in the history
…eated
  • Loading branch information
fititnt committed Mar 30, 2021
1 parent fc29c96 commit 0a16b09
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 6 deletions.
73 changes: 71 additions & 2 deletions hxlm/core/hdp/hazmat/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
"""

import os
from urllib.parse import urlparse

from typing import (
Union
)

from hxlm.core.io.util import (
get_entrypoint_type
)

from hxlm.core.types import (
EntryPointType,
Expand All @@ -28,10 +37,13 @@
# os.environ["HDP_DEBUG"] = "1"
_IS_DEBUG = bool(os.getenv('HDP_DEBUG', ''))

__all__ = ['get_policy_HDSL1', 'get_policy_HDSL4',
'is_not_acceptable_load_this']


def _get_bunker() -> HDPPolicyLoad:
"""bunker"""
policy = HDPPolicyLoad
policy = HDPPolicyLoad()

policy.enforce_startup_generic_tests = True

Expand Down Expand Up @@ -64,7 +76,7 @@ def _get_user_know_what_is_doing() -> HDPPolicyLoad:
Returns:
HDPPolicyLoad: The policy
"""
policy = HDPPolicyLoad
policy = HDPPolicyLoad()

# Note: in general the default generic HDPPolicyLoad somewhat already
# is flexible (with exception that does not have hardcoded names
Expand Down Expand Up @@ -124,3 +136,60 @@ def get_policy_HDSL4() -> HDPPolicyLoad:
"""
# pylint: disable=invalid-name
return _get_bunker()


def is_not_acceptable_load_this(
entrypoint_str: str,
policy: HDPPolicyLoad) -> Union[bool, str]:
"""Checj if an entrypoint string is not acceptable by an policy
Args:
entrypoint_str (str): An full entrypoint string (not an object)
policy (HDPPolicyLoad): the reference policy
Returns:
Union[bool, str]: False if ok. String with explanation if not.
>>> url_INT = 'https://example.int/data/data.lat.urn.yml'
>>> url_com = 'git://example.com/data/data.lat.urn.yml'
>>> url_SSH = 'ssh://example.int/home/user/data/data.lat.urn.yml'
>>> pHDSL1 = get_policy_HDSL1()
>>> pHDSL4 = get_policy_HDSL4()
>>> is_not_acceptable_load_this(url_INT, pHDSL1)
False
>>> is_not_acceptable_load_this(url_com, pHDSL1)
'∉ policy.allowed_entrypoint_type'
>>> is_not_acceptable_load_this(url_SSH, pHDSL1)
False
>>> is_not_acceptable_load_this(url_SSH, pHDSL4)
'¬ policy.allowed_entrypoint_type'
"""

if policy.debug_no_restrictions:
return False

if entrypoint_str.find('://') == -1:
return '∄ (RFC3986 protocol)'

etype = get_entrypoint_type(entrypoint_str)

# print(entrypoint_str, etype, policy.allowed_entrypoint_type)
if etype not in policy.allowed_entrypoint_type:
# print(entrypoint_str, etype, policy.allowed_entrypoint_type)
return '¬ policy.allowed_entrypoint_type'

if etype in [EntryPointType.HTTP, EntryPointType.FTP,
EntryPointType.GIT, EntryPointType.SSH]:
result = urlparse(entrypoint_str).netloc
# print('result', result)
if policy.custom_allowed_domains is None or \
len(policy.custom_allowed_domains) == 0:
return '∅ policy.custom_allowed_domains'
# return False
for domain_suffix in policy.custom_allowed_domains:
if result.endswith(domain_suffix):
return False
return '∉ policy.allowed_entrypoint_type'
# return False

return False
22 changes: 18 additions & 4 deletions hxlm/core/hdp/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)

from hxlm.core.hdp.datamodel import (
HDPPolicyLoad,
HSiloWrapper,
HDPRaw
)
Expand All @@ -52,6 +53,10 @@
is_raw_hdp_item_syntax,
ResourceWrapper
)
from hxlm.core.hdp.hazmat.policy import (
get_policy_HDSL1,
is_not_acceptable_load_this
)

from hxlm.core.localization.util import (
l10n
Expand Down Expand Up @@ -98,11 +103,18 @@ class HDPProject:
allow user correct in running time)
"""

def __init__(self, entrypoint: Any, user_l10n: L10NContext):
def __init__(self, entrypoint: Any,
user_l10n: L10NContext,
policy: HDPPolicyLoad):
# self._entry_point = entrypoint
self._l10n = user_l10n
self._parse_entrypoint(entrypoint)

if is_not_acceptable_load_this(entrypoint, policy):
raise SyntaxError('[' + entrypoint +
'] ¬ is_acceptable_load_this [' +
str(policy) + ']')

def _parse_entrypoint(self, entrypoint: Any):
"""Generic parser for the initial entrypoint
Expand Down Expand Up @@ -198,7 +210,9 @@ def project(entry_point: str) -> HDPProject:
HDPProject: An HDPProject instance
"""
user_l10n = l10n()
# raise SyntaxError(l10n_user.know_languages)
# raise SyntaxError(l10n_user.about())
result = HDPProject(entry_point, user_l10n=user_l10n)

# TODO: eventually the policy should be configurable also on startup
# not only when running
policy = get_policy_HDSL1()
result = HDPProject(entry_point, user_l10n=user_l10n, policy=policy)
return result

0 comments on commit 0a16b09

Please sign in to comment.