Skip to content

Commit

Permalink
setup django based url validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Dec 6, 2024
1 parent 442f9dc commit ea0dfe1
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 2 deletions.
5 changes: 3 additions & 2 deletions logprep/processor/pseudonymizer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import re
from functools import cached_property, lru_cache
from itertools import chain
from typing import Generator, Optional, Pattern
from typing import Optional, Pattern
from urllib.parse import parse_qs, urlencode, urlparse

from attrs import define, field, validators
Expand Down Expand Up @@ -293,11 +293,12 @@ def _pseudonymize_field(
field_value = re.sub(re.escape(clear_value), pseudonymized_value, field_value)
return field_value

def _gen_urls(self, field_value: str) -> Generator:
def _gen_urls(self, field_value: str) -> list:
url_pattern = re.compile(
r"(?:http[s]?://)?(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F])|[/?=]|#)+"
)
matches = url_pattern.findall(field_value)
matches = list(filter(lambda url: urlparse(url).scheme in ["http", "https", ""], matches))
return matches

def _pseudonymize_string(self, value: str) -> str:
Expand Down
83 changes: 83 additions & 0 deletions logprep/util/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,86 @@ def get_versions_string(config: "Configuration" = None) -> str:
config_version = f"no configuration found in {', '.join([DEFAULT_CONFIG_LOCATION])}"
version_string += f"\n{'configuration version:'.ljust(padding)}{config_version}"
return version_string


def extract_urls(field_value: str) -> list:
"""
Extracts URLs from a given string.
Parameters
----------
field_value: str
The field value from which URLs should be extracted.
Returns
-------
list
A list of URLs extracted from the field value.
"""
ul = "\u00a1-\uffff" # Unicode letters range (must not be a raw string).

# IP patterns
ipv4_re = (
r"(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)"
r"(?:\.(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)){3}"
)
ipv6_re = r"\[[0-9a-f:.]+\]" # (simple regex, validated later)

# Host patterns
hostname_re = r"[a-z" + ul + r"0-9](?:[a-z" + ul + r"0-9-]{0,61}[a-z" + ul + r"0-9])?"
# Max length for domain name labels is 63 characters per RFC 1034 sec. 3.1
domain_re = r"(?:\.(?!-)[a-z" + ul + r"0-9-]{1,63}(?<!-))*"
tld_re = (
r"\." # dot
r"(?!-)" # can't start with a dash
r"(?:[a-z" + ul + "-]{2,63}" # domain label
r"|xn--[a-z0-9]{1,59})" # or punycode label
r"(?<!-)" # can't end with a dash
r"\.?" # may have a trailing dot
)
host_re = rf"{hostname_re}{domain_re}{tld_re}"

url_pattern = re.compile(
r"(?:(?:[a-z0-9.+-]*)://)?" # scheme is validated separately
r"(?:[^\s:@/]+(?::[^\s:@/]*)?@)?" # user:pass authentication
r"(?:" + ipv4_re + "|" + ipv6_re + "|" + host_re + ")"
r"(?::[0-9]{1,5})?" # port
r"(?:[/?#][^\s]*)?", # resource path
re.IGNORECASE,
)
matches = url_pattern.findall(field_value)
return list(filter(filter_valid_schemes, matches))


def filter_valid_schemes(value: str) -> bool:
"""
Filters out invalid URL schemes.
Parameters
----------
value: str
The URL scheme to be checked.
Returns
-------
bool
True if the scheme is valid, False otherwise.
"""
valid_schemes = [
"http",
"https",
"ftp",
"sftp",
"ssh",
"file",
"git",
"svn",
"svn+ssh",
"git+ssh",
"scp",
"rsync",
]
if "://" not in value:
return True
scheme = value.split("://")[0].lower()
return scheme in valid_schemes
46 changes: 46 additions & 0 deletions tests/unit/util/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from logprep.util.configuration import Configuration
from logprep.util.helper import (
camel_to_snake,
extract_urls,
get_dotted_field_value,
get_versions_string,
pop_dotted_field_value,
Expand Down Expand Up @@ -271,3 +272,48 @@ def test_get_version_string_without_config(self):

result = get_versions_string(None)
assert re.search(expected_pattern, result)


class TestExtractUrls:

@pytest.mark.parametrize(
"field_value, expected",
[
("https://www.test.de", ["https://www.test.de"]),
(
"https://www.test.de https://www.test.de",
["https://www.test.de", "https://www.test.de"],
),
(
"some text https://www.test.de other text https://www.test.de",
["https://www.test.de", "https://www.test.de"],
),
(
"some text https://www.test.de, other text https://www.test.de",
["https://www.test.de", "https://www.test.de"],
),
(
"some text www.test.de other text https://www.test.de",
["www.test.de", "https://www.test.de"],
),
(
"www.test.de",
["www.test.de"],
),
(
"https://stackoverflow.com/questions/520031/whats-the-cleanest-way-to-extract-urls-from-a-string-using-python",
[
"https://stackoverflow.com/questions/520031/whats-the-cleanest-way-to-extract-urls-from-a-string-using-python"
],
),
(
"https://stackoverflow.com/questions/520031/whats-the-cleanest-way-to-extract-urls-from-a-string-using-python",
[
"https://stackoverflow.com/questions/520031/whats-the-cleanest-way-to-extract-urls-from-a-string-using-python"
],
),
("fail://www.test.de", []),
],
)
def test_extract_urls(self, field_value, expected):
assert extract_urls(field_value) == expected

0 comments on commit ea0dfe1

Please sign in to comment.