Skip to content

Commit

Permalink
check_zone_dnssec.py
Browse files Browse the repository at this point in the history
  • Loading branch information
shuque committed Jun 2, 2024
1 parent 9e510eb commit 1caf744
Showing 1 changed file with 282 additions and 0 deletions.
282 changes: 282 additions & 0 deletions check_zone_dnssec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
#!/usr/bin/env python3
#

"""
Query each nameserver address for a zone and determine whether
DNSSEC signed responses for a given record and type within the
zone are correct.
This program assumes the parent zone has a valid DS record installed.
It first authenticates that DS record from the root down. It then
individually queries each nameserver for the zone. For each nameserver,
it queries the DNSKEY RRset, verifies the self signature(s) on that set,
matches the DS RRset to the KSKs. It then queries the specified record
name and type within the zone and authenticates its signature.
This program is useful for checking that _every_ authoritative server
for a target zone is responding with correctly signed answers.
"""

import argparse
import json
import dns.name
import dns.resolver
import dns.message
import dns.query

from reslib.prefs import Prefs
from reslib.cache import RootZone
from reslib.query import Query
from reslib.exception import ResError
from reslib.dnssec import check_self_signature, ds_rr_matches_dnskey
from reslib.dnssec import key_cache, load_keys, validate_all
from reslib.lookup import initialize_dnssec, resolve_name


__version__ = "0.0.1"
__description__ = f"""\
Version {__version__}
Query all nameserver addresses for a given zone and validate DNSSEC"""

DEFAULT_TIMEOUT = 3
DEFAULT_RETRIES = 2
DEFAULT_EDNS_BUFSIZE = 1420
DEFAULT_IP_RRTYPES = [dns.rdatatype.AAAA, dns.rdatatype.A]

# We default to some public validating DNS resolvers for looking up the NS names
# and addresses. Ideally, you should only talk to resolver that you have a secured
# channel to, but since DNSSEC does not really depend on the security of nameserver
# names and addresses, this is okay.
RESOLVER_LIST = ['8.8.8.8', '1.1.1.1']

def query_type(qtype):
"""Check qtype argument value is well formed"""
try:
dns.rdatatype.from_text(qtype)
except Exception as catchall_except:
raise ValueError(f"invalid query type: {qtype}") from catchall_except
return qtype.upper()


def process_arguments(arguments=None):
"""Process command line arguments"""

parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=__description__,
allow_abbrev=False)
parser.add_argument("zone", help="DNS zone name")
parser.add_argument("recname", help="Record name in the zone")
parser.add_argument("rectype", help="Record type for that name")

parser.add_argument("-v", "--verbose", action="count", default=0,
help="increase output verbosity")

ip_rrtypes = parser.add_mutually_exclusive_group()
ip_rrtypes.add_argument("-4", dest='ip_rrtypes',
action='store_const', const=[dns.rdatatype.A],
default=DEFAULT_IP_RRTYPES,
help="Use IPv4 transport only")
ip_rrtypes.add_argument("-6", dest='ip_rrtypes',
action='store_const', const=[dns.rdatatype.AAAA],
default=DEFAULT_IP_RRTYPES,
help="Use IPv6 transport only")

parser.add_argument("--bufsize", type=int, metavar='N',
default=DEFAULT_EDNS_BUFSIZE,
help="Set EDNS buffer size in octets (default: %(default)d)")

parser.add_argument("--text", dest='text', action='store_true',
help="Emit abbreviated text output (default is json)")
parser.add_argument("--timeout", type=int, metavar='N',
default=DEFAULT_TIMEOUT,
help="Query timeout in secs (default: %(default)d)")
parser.add_argument("--retries", type=int, metavar='N',
default=DEFAULT_RETRIES,
help="Number of UDP retries (default: %(default)d)")

if arguments is not None:
return parser.parse_args(args=arguments)
return parser.parse_args()


def get_resolver(addresses=None, dnssec_ok=False, timeout=5):
"""return an appropriately configured Resolver object"""

res = dns.resolver.Resolver()
# Set query flags to RD=1, AD=1, CD=1: # binary 0000 0001 0011 0000
res.set_flags(0x0130)
res.lifetime = timeout
if dnssec_ok:
res.use_edns(edns=0, ednsflags=dns.flags.DO, payload=CONFIG.payload)
if addresses is not None:
res.nameservers = addresses
return res


def get_ds_data(zone):
"""Get secured DS recordset data"""

query = Query(zone, 'DS', 'IN')
resolve_name(query, RootZone, addResults=query)
if not query.is_secure():
raise ValueError(f'{zone}/DS returned insecure answer')
for entry in query.full_answer_rrset:
if entry.rrname == zone and entry.rrtype == dns.rdatatype.DS:
return entry.rrset
raise ValueError(f'{zone}/DS not found')


def get_ns_list(resolver, zone):
"""Query and return list of nameservers for given zone"""

msg = resolver.resolve(zone, dns.rdatatype.NS).response
rrset = msg.get_rrset(msg.answer, zone, dns.rdataclass.IN, dns.rdatatype.NS)
return [x.target for x in rrset.to_rdataset()]


def get_addresses(resolver, name, ip_rrtypes):
"""Get list of addresses for given domain name"""

address_list = []
for rrtype in ip_rrtypes:
msg = resolver.resolve(name, rrtype).response
rrset = msg.get_rrset(msg.answer, name, dns.rdataclass.IN, rrtype)
for entry in rrset.to_rdataset():
address_list.append(entry.address)
return address_list


def get_rrset_and_signature(rrname, rrtype, address):
"""Get RRset and signature for name and type at given nameserver address"""

msg = dns.message.make_query(rrname,
rrtype,
dns.rdataclass.IN,
use_edns=True,
want_dnssec=True,
payload=1460)
response, _ = dns.query.udp_with_fallback(msg, address, timeout=5,
ignore_unexpected=True)
rrset = response.get_rrset(response.answer,
rrname,
dns.rdataclass.IN,
rrtype)
rrsig = response.get_rrset(response.answer,
rrname,
dns.rdataclass.IN,
dns.rdatatype.RRSIG,
covers=rrtype)
return rrset, rrsig


def ds_rrset_matches_ksk_set(ds_set, ksk_set):
"""Check that a DS matches a KSK"""

result = False
for ds_rdata in ds_set:
for key in ksk_set:
if not key.zone_flag:
continue
if ds_rr_matches_dnskey(ds_rdata, key):
result = True
break
return result


class ZoneChecker:
"""Zone class"""

def __init__(self, zonename, recname, rectype):
self.status = []
self.name = zonename
self.recname = recname
self.rectype = rectype
self.resolver = get_resolver(dnssec_ok=False, addresses=RESOLVER_LIST)
self.dsdata = get_ds_data(self.name)
self.nslist = get_ns_list(self.resolver, self.name)
self.server_count_all = 0
self.server_count_good = 0

def check_nslist(self):
"""Check nameservers"""
for nsname in self.nslist:
alist = get_addresses(self.resolver, nsname, CONFIG.ip_rrtypes)
for nsaddress in alist:
self.server_count_all += 1
self.check_dnskey(nsname, nsaddress)
self.check_record(nsname, nsaddress)

def check_dnskey(self, nsname, address):
"""Check a single nameserver address"""
dnskey_set, dnskey_sig = get_rrset_and_signature(self.name, dns.rdatatype.DNSKEY, address)
try:
_, ksklist = check_self_signature(dnskey_set, dnskey_sig)
except ResError as err:
self.status.append((nsname, address, False, err))
return
if not ds_rrset_matches_ksk_set(self.dsdata, ksklist):
self.status.append((nsname, address, False, "DS did not match any DNSKEY"))
return
key_cache.install(self.name, load_keys(dnskey_set)[0])

def check_record(self, nsname, address):
"""Check data record at single nameserver address"""
rec_set, rec_sig = get_rrset_and_signature(self.recname, self.rectype, address)
if not rec_set:
self.status.append((nsname, address, False, "Non existent record"))
return
if not rec_sig:
self.status.append((nsname, address, False, "Non existent record signature"))
return
try:
verified, failed = validate_all(rec_set, rec_sig)
except ResError as err:
self.status.append((nsname, address, False, err))
return
if not verified:
self.status.append((nsname, address, False,
"No valid record signatures found: " + str(failed)))
else:
self.status.append((nsname, address, True, None))
self.server_count_good += 1

def print_status(self):
"""Print Status"""
if CONFIG.text:
for nsname, address, success, error in self.status:
prefix = "DNSSEC SUCCESS" if success else "DNSSEC FAILED"
print(prefix, nsname, address, error)
else:
result_dict = {}
result_dict['zone'] = self.name.to_text()
result_dict['recname'] = self.recname.to_text()
result_dict['rectype'] = dns.rdatatype.to_text(self.rectype)
result_dict['server_count_total'] = self.server_count_all
result_dict['server_count_good'] = self.server_count_good
result_dict['servers'] = []
for nsname, address, success, error in self.status:
entry = {}
entry['nsname'] = nsname.to_text()
entry['nsip'] = address
entry['dnssec'] = success
if error:
entry['error'] = error
result_dict['servers'].append(entry)
print(json.dumps(result_dict, indent=2))


if __name__ == '__main__':

CONFIG = process_arguments()
ZONENAME = dns.name.from_text(CONFIG.zone)
RECNAME = dns.name.from_text(CONFIG.recname)
RECTYPE = dns.rdatatype.from_text(CONFIG.rectype)

Prefs.DNSSEC = True
initialize_dnssec()

CHECKER = ZoneChecker(ZONENAME, RECNAME, RECTYPE)
CHECKER.check_nslist()
CHECKER.print_status()

0 comments on commit 1caf744

Please sign in to comment.