diff --git a/logprep/processor/domain_label_extractor/processor.py b/logprep/processor/domain_label_extractor/processor.py index 6bc45fc47..50646470e 100644 --- a/logprep/processor/domain_label_extractor/processor.py +++ b/logprep/processor/domain_label_extractor/processor.py @@ -35,6 +35,7 @@ import ipaddress import logging +from urllib.parse import urlsplit from attr import define, field, validators @@ -95,7 +96,11 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule): ) return - labels = Domain(domain) + urlsplit_result = urlsplit(domain) + if urlsplit_result.hostname is not None: + labels = Domain(urlsplit_result.hostname) + else: + labels = Domain(domain) if labels.suffix != "": fields = { f"{rule.target_field}.registered_domain": f"{labels.domain}.{labels.suffix}", diff --git a/logprep/util/url/url.py b/logprep/util/url/url.py index 4f324e1bc..dd4fa719d 100644 --- a/logprep/util/url/url.py +++ b/logprep/util/url/url.py @@ -4,6 +4,7 @@ """ import re +from pathlib import Path from urllib.parse import urlsplit valid_schemes = [ @@ -132,21 +133,34 @@ def is_valid_scheme(value: str) -> bool: return scheme in valid_schemes +TLDLIST_PATH = Path(f"{Path(__file__).parent}/tldlist/public_suffix_list.dat") +TLD_SET = { + tld + for tld in TLDLIST_PATH.read_text(encoding="utf8").splitlines() + if not tld.startswith("//") and tld != "" +} + + class Domain: """Domain object for easy access to domain parts.""" - def __init__(self, domain_string: str): - if "://" in domain_string: - self.fqdn = urlsplit(domain_string).hostname - else: - self.fqdn = domain_string - splitted_domain = self.fqdn.split(".") - self.subdomain = ".".join(splitted_domain[:-2]) - self.domain = splitted_domain[-2] - self.suffix = splitted_domain[-1] - - def get_suffix(self, domain: str) -> bool: - pass + def __init__(self, fqdn: str): + self.fqdn = fqdn + self.subdomain = "" + self.domain = "" + self.suffix = "" + self._set_labels() + + def _set_labels(self): + suffix = self.fqdn + while suffix != "": + _, _, suffix = suffix.partition(".") + if suffix in TLD_SET: + break + self.suffix = suffix + if self.suffix != "": + domain, _, _ = self.fqdn.rpartition(suffix) + self.subdomain, _, self.domain = domain.strip(".").rpartition(".") def __repr__(self): return f"{self.subdomain}.{self.domain}.{self.suffix}" diff --git a/tests/unit/util/test_url.py b/tests/unit/util/test_url.py index f74791e02..9ad7038a0 100644 --- a/tests/unit/util/test_url.py +++ b/tests/unit/util/test_url.py @@ -74,6 +74,22 @@ def test_extract_urls_with_large_domain_label(self): domain_label = "a" * 64 assert extract_urls(f"http://www.{domain_label}.com") == [] - def test_get_suffix_from_domain(self): - domain = Domain("www.google.com") - assert domain.suffix == "com" + @pytest.mark.parametrize( + "domain, expected_subdomain, expected_domain, expected_suffix", + [ + ("www.thedomain.com", "www", "thedomain", "com"), + ("www.thedomain.co", "www", "thedomain", "co"), + ("www.thedomain.com.ua", "www", "thedomain", "com.ua"), + ("www.thedomain.co.uk", "www", "thedomain", "co.uk"), + ("save.edu.ao", "", "save", "edu.ao"), + ("thedomain.sport", "", "thedomain", "sport"), + ("thedomain.联通", "", "thedomain", "联通"), + ("www.thedomain.foobar", "", "", ""), + ], + ) + def test_get_labels_from_domain( + self, domain, expected_subdomain, expected_domain, expected_suffix + ): + assert Domain(domain).suffix == expected_suffix + assert Domain(domain).domain == expected_domain + assert Domain(domain).subdomain == expected_subdomain