Skip to content

Commit

Permalink
extract labels from domain
Browse files Browse the repository at this point in the history
  • Loading branch information
dtrai2 committed Dec 9, 2024
1 parent 1070483 commit 8af6cb7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 16 deletions.
7 changes: 6 additions & 1 deletion logprep/processor/domain_label_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import ipaddress
import logging
from urllib.parse import urlsplit

from attr import define, field, validators

Expand Down Expand Up @@ -95,7 +96,11 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule):
)
return

labels = Domain(domain)
urlsplit_result = urlsplit(domain)
if urlsplit_result.hostname is not None:
labels = Domain(urlsplit_result.hostname)
else:
labels = Domain(domain)
if labels.suffix != "":
fields = {
f"{rule.target_field}.registered_domain": f"{labels.domain}.{labels.suffix}",
Expand Down
38 changes: 26 additions & 12 deletions logprep/util/url/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

import re
from pathlib import Path
from urllib.parse import urlsplit

valid_schemes = [
Expand Down Expand Up @@ -132,21 +133,34 @@ def is_valid_scheme(value: str) -> bool:
return scheme in valid_schemes


TLDLIST_PATH = Path(f"{Path(__file__).parent}/tldlist/public_suffix_list.dat")
TLD_SET = {
tld
for tld in TLDLIST_PATH.read_text(encoding="utf8").splitlines()
if not tld.startswith("//") and tld != ""
}


class Domain:
"""Domain object for easy access to domain parts."""

def __init__(self, domain_string: str):
if "://" in domain_string:
self.fqdn = urlsplit(domain_string).hostname
else:
self.fqdn = domain_string
splitted_domain = self.fqdn.split(".")
self.subdomain = ".".join(splitted_domain[:-2])
self.domain = splitted_domain[-2]
self.suffix = splitted_domain[-1]

def get_suffix(self, domain: str) -> bool:
pass
def __init__(self, fqdn: str):
self.fqdn = fqdn
self.subdomain = ""
self.domain = ""
self.suffix = ""
self._set_labels()

def _set_labels(self):
suffix = self.fqdn
while suffix != "":
_, _, suffix = suffix.partition(".")
if suffix in TLD_SET:
break
self.suffix = suffix
if self.suffix != "":
domain, _, _ = self.fqdn.rpartition(suffix)
self.subdomain, _, self.domain = domain.strip(".").rpartition(".")

def __repr__(self):
return f"{self.subdomain}.{self.domain}.{self.suffix}"
22 changes: 19 additions & 3 deletions tests/unit/util/test_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ def test_extract_urls_with_large_domain_label(self):
domain_label = "a" * 64
assert extract_urls(f"http://www.{domain_label}.com") == []

def test_get_suffix_from_domain(self):
domain = Domain("www.google.com")
assert domain.suffix == "com"
@pytest.mark.parametrize(
"domain, expected_subdomain, expected_domain, expected_suffix",
[
("www.thedomain.com", "www", "thedomain", "com"),
("www.thedomain.co", "www", "thedomain", "co"),
("www.thedomain.com.ua", "www", "thedomain", "com.ua"),
("www.thedomain.co.uk", "www", "thedomain", "co.uk"),
("save.edu.ao", "", "save", "edu.ao"),
("thedomain.sport", "", "thedomain", "sport"),
("thedomain.联通", "", "thedomain", "联通"),
("www.thedomain.foobar", "", "", ""),
],
)
def test_get_labels_from_domain(
self, domain, expected_subdomain, expected_domain, expected_suffix
):
assert Domain(domain).suffix == expected_suffix
assert Domain(domain).domain == expected_domain
assert Domain(domain).subdomain == expected_subdomain

0 comments on commit 8af6cb7

Please sign in to comment.