Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first version - Test domains for CAA record [website test] #673

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions checks/categories.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self, name="web-tls"):
WebTlsCertPubkey,
WebTlsCertSignature,
WebTlsCertHostmatch,
WebCaa,
WebTlsDaneExists,
WebTlsDaneValid,
WebTlsZeroRTT,
Expand Down Expand Up @@ -221,6 +222,7 @@ def __init__(self, name="mail-tls"):
MailTlsCertPubkey,
MailTlsCertSignature,
MailTlsCertHostmatch,
MailCaa,
MailTlsDaneExists,
MailTlsDaneValid,
MailTlsDaneRollover,
Expand Down Expand Up @@ -1041,6 +1043,35 @@ def result_bad(self, tech_data):
self.tech_data = tech_data


class WebCaa(Subtest):
def __init__(self):
super(WebCaa, self).__init__(
name="cert_caa",
label="detail web cert-caa label",
explanation="detail web cert-caa exp",
tech_string="",
init_tech_type="",
worst_status=scoring.CAA_WORST_STATUS,
full_score=scoring.CAA_GOOD,
model_score_field="cert_caa_score",
)

def result_good(self,tech_data):
self._status(STATUS_SUCCESS)
self.verdict = "detail web cert-caa verdict good"
self.tech_data = ""

def result_info(self, tech_data):
self._status(STATUS_INFO)
self.verdict = "detail web cert-caa verdict warning"
self.tech_data = ""

def result_bad(self, tech_data):
self._status(STATUS_FAIL)
self.verdict = "detail web cert-caa verdict bad"
self.tech_data = ""


class WebTlsDaneExists(Subtest):
def __init__(self):
super(WebTlsDaneExists, self).__init__(
Expand Down Expand Up @@ -1619,6 +1650,38 @@ def result_has_daneTA(self, tech_data):
self.tech_data = tech_data


class MailCaa(Subtest):
def __init__(self):
super(MailCaa, self).__init__(
name="cert_caa",
label="detail mail cert-caa label",
explanation="detail mail cert-caa exp",
tech_string="detail mail cert-caa tech table",
init_tech_type="",
worst_status=scoring.CAA_WORST_STATUS,
full_score=scoring.CAA_GOOD,
model_score_field="cert_caa_score",
)

def result_good(self, tech_data):
self._status(STATUS_SUCCESS)
self.verdict = "detail mail cert-caa verdict good"
self.tech_data = ""

def result_info(self, tech_data):
self._status(STATUS_INFO)
self.verdict = "detail mail cert-caa verdict warning"
self.tech_data = ""

def result_bad(self, tech_data):
self._status(STATUS_FAIL)
self.verdict = "detail mail cert-caa verdict bad"
self.tech_data = ""





class MailTlsZeroRTT(Subtest):
def __init__(self):
super(MailTlsZeroRTT, self).__init__(
Expand Down
9 changes: 7 additions & 2 deletions checks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ class DomainTestTls(BaseTestModel):
cert_hostmatch_bad = ListField(null=True)
cert_hostmatch_score = models.IntegerField(null=True)

cert_caa_score = models.IntegerField(null=True)
cert_caa_record = ListField(default=[])

score = models.IntegerField(null=True)

def __dir__(self):
Expand All @@ -505,7 +508,7 @@ def __dir__(self):
'hsts_score', 'cert_chain', 'cert_trusted', 'cert_trusted_score',
'cert_pubkey_bad', 'cert_pubkey_phase_out', 'cert_pubkey_score',
'cert_signature_bad', 'cert_signature_score', 'cert_hostmatch_bad',
'cert_hostmatch_score', 'score', 'protocols_good',
'cert_hostmatch_score', 'cert_caa_score','cert_caa_record','score', 'protocols_good',
]

def get_web_api_details(self):
Expand Down Expand Up @@ -536,6 +539,7 @@ def get_web_api_details(self):
'cert_pubkey_phase_out': self.cert_pubkey_phase_out,
'cert_signature_bad': self.cert_signature_bad,
'cert_hostmatch_bad': self.cert_hostmatch_bad,
'cert_caa_bad': self.cert_caa_bad,
}

def get_mail_api_details(self):
Expand All @@ -562,7 +566,8 @@ def get_mail_api_details(self):
'cert_pubkey_phase_out': self.cert_pubkey_phase_out,
'cert_signature_bad': self.cert_signature_bad,
'cert_hostmatch_bad': self.cert_hostmatch_bad,
}
'cert_caa_bad': self.cert_caa_bad,
}


class WebTestAppsecpriv(DomainServersModel):
Expand Down
4 changes: 4 additions & 0 deletions checks/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@
WEB_TLS_HOSTMATCH_BAD = NO_POINTS
WEB_TLS_HOSTMATCH_WORST_STATUS = STATUS_FAIL

CAA_GOOD = FULL_WEIGHT_POINTS
CAA_BAD = NO_POINTS
CAA_WORST_STATUS = STATUS_INFO

# DANE_EXISTS has no score. It is combined with
# DANE_VALID below.
WEB_TLS_DANE_EXISTS_WORST_STATUS = STATUS_INFO
Expand Down
59 changes: 59 additions & 0 deletions checks/tasks/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import socket
import ssl
import time
import unbound
from enum import Enum
from timeit import default_timer as timer

Expand Down Expand Up @@ -576,6 +577,8 @@ def save_results(model, results, addr, domain, category):
model.cert_signature_score = result.get("sigalg_score")
model.cert_hostmatch_score = result.get("hostmatch_score")
model.cert_hostmatch_bad = result.get("hostmatch_bad")
model.cert_caa_score = result.get("caa_score")
model.cert_caa_record = result.get("caa_record")
model.dane_log = result.get("dane_log")
model.dane_score = result.get("dane_score")
model.dane_status = result.get("dane_status")
Expand Down Expand Up @@ -645,6 +648,7 @@ def save_results(model, results, addr, domain, category):
model.cert_signature_score = result.get("sigalg_score")
model.cert_hostmatch_score = result.get("hostmatch_score")
model.cert_hostmatch_bad = result.get("hostmatch_bad")
model.cert_caa_score = result.get("caa_score")
model.dane_log = result.get("dane_log")
model.dane_score = result.get("dane_score")
model.dane_status = result.get("dane_status")
Expand Down Expand Up @@ -802,6 +806,13 @@ def annotate_and_combine(bad_items, phaseout_items):
else:
category.subtests['cert_hostmatch'].result_good()

if dttls.cert_caa_score is None:
category.subtests['cert_caa'].result_info(dttls.cert_caa_record)
elif dttls.cert_caa_score is scoring.CAA_WORST_STATUS:
category.subtests['cert_caa'].result_info(dttls.cert_caa_record)
else:
category.subtests['cert_caa'].result_good(dttls.cert_caa_record)

if dttls.dane_status == DaneStatus.none:
category.subtests['dane_exists'].result_bad()
elif dttls.dane_status == DaneStatus.none_bogus:
Expand Down Expand Up @@ -1545,6 +1556,51 @@ def do_web_cert(af_ip_pairs, url, task, *args, **kwargs):
return ('cert', results)


def as_txt(data):
try:
txt = "".join(unbound.ub_data.dname2str(data))
except UnicodeError:
txt = "<Non ASCII characters found>"
return txt


def caa_callback(data, status, r):
data['score'] = scoring.CAA_WORST_STATUS
data['available'] = False
data['record'] = []
if status == 0:
available = False
if r.rcode == unbound.RCODE_NOERROR and r.havedata == 1:
available = True
score = scoring.CAA_GOOD
for d in r.data.data:
txt = as_txt(d)
data['record'].append(txt)
elif r.rcode == unbound.RCODE_NXDOMAIN:
# we know for sure there is no DKIM pubkey
score = scoring.CAA_WORST_STATUS
else:
# resolving problems, servfail probably
score = scoring.CAA_WORST_STATUS
data['score'] = score
data['available'] = available
data['done'] = True


def check_caa(task,url):
caa_score = scoring.CAA_GOOD
try:
cb_data = task.async_resolv(url, unbound.RR_TYPE_CAA, caa_callback)
result = dict(available='available' in cb_data and cb_data['available'], score=cb_data['score'])
caa_score = cb_data['score']
# KeyError is due to score missing, happens in case of timeout on non resolving domain
except (SoftTimeLimitExceeded, KeyError):
result = dict(available=False, score=scoring.scoring.CAA_WORST_STATUS)
caa_score = scoring.CAA_WORST_STATUS
return ( caa_score, cb_data['record'])



def cert_checks(
url, mode, task, af_ip_pair=None, starttls_details=None,
*args, **kwargs):
Expand Down Expand Up @@ -1610,6 +1666,7 @@ def cert_checks(
pubkey_score, pubkey_bad, pubkey_phase_out = debug_chain.check_pubkey()
sigalg_score, sigalg_bad = debug_chain.check_sigalg()
chain_str = debug_chain.chain_str()
caa_score, caa_record = check_caa(task,url)

if starttls_details:
dane_results = debug_chain.check_dane(
Expand All @@ -1631,6 +1688,8 @@ def cert_checks(
sigalg_score=sigalg_score,
hostmatch_bad=hostmatch_bad,
hostmatch_score=hostmatch_score,
caa_score=caa_score,
caa_record=caa_record,
)
results.update(dane_results)

Expand Down