|
1 | | -# SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD |
| 1 | +# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD |
2 | 2 | # SPDX-License-Identifier: Unlicense OR CC0-1.0 |
3 | 3 | import logging |
4 | 4 | import re |
@@ -92,10 +92,58 @@ def check_record(self, name, query_type, expected=True, expect=None): |
92 | 92 | if expect is None: |
93 | 93 | expect = name |
94 | 94 | if expected: |
95 | | - assert any(expect in answer for answer in answers), f"Expected record '{expect}' not found in answer section" |
| 95 | + assert any(expect in answer for answer in answers), f"Expected record '{expect}' not in answer section" |
96 | 96 | else: |
97 | 97 | assert not any(expect in answer for answer in answers), f"Unexpected record '{expect}' found in answer section" |
98 | 98 |
|
| 99 | + def parse_section(self, response, section: str, rdtype_text: str): |
| 100 | + """Parse a specific response section (answer, authority, additional) for given rdtype. |
| 101 | +
|
| 102 | + Returns list of textual records for that rdtype. |
| 103 | + """ |
| 104 | + out = [] |
| 105 | + if not response: |
| 106 | + return out |
| 107 | + rrsets = [] |
| 108 | + if section == 'answer': |
| 109 | + rrsets = response.answer |
| 110 | + elif section == 'authority': |
| 111 | + rrsets = response.authority |
| 112 | + elif section == 'additional': |
| 113 | + rrsets = response.additional |
| 114 | + else: |
| 115 | + raise ValueError('invalid section') |
| 116 | + for rr in rrsets: |
| 117 | + if dns.rdatatype.to_text(rr.rdtype) != rdtype_text: |
| 118 | + continue |
| 119 | + for item in rr.items: |
| 120 | + full = ( |
| 121 | + f'{rr.name} {rr.ttl} ' |
| 122 | + f'{dns.rdataclass.to_text(rr.rdclass)} ' |
| 123 | + f'{dns.rdatatype.to_text(rr.rdtype)} ' |
| 124 | + f'{item.to_text()}' |
| 125 | + ) |
| 126 | + out.append(full) |
| 127 | + return out |
| 128 | + |
| 129 | + def check_additional(self, response, rdtype_text: str, owner_contains: str, expected: bool = True, expect_substr: str | None = None): |
| 130 | + """Check Additional section for an RR of type rdtype_text whose owner includes owner_contains. |
| 131 | +
|
| 132 | + If expect_substr is provided, also require it to appear in the textual RR. |
| 133 | + """ |
| 134 | + records = self.parse_section(response, 'additional', rdtype_text) |
| 135 | + logger.info(f'additional({rdtype_text}): {records}') |
| 136 | + |
| 137 | + def _matches(line: str) -> bool: |
| 138 | + in_owner = owner_contains in line |
| 139 | + has_val = (expect_substr in line) if expect_substr else True |
| 140 | + return in_owner and has_val |
| 141 | + found = any(_matches(r) for r in records) |
| 142 | + if expected: |
| 143 | + assert found, f"Expected {rdtype_text} for {owner_contains} in Additional not found" |
| 144 | + else: |
| 145 | + assert not found, f"Unexpected {rdtype_text} for {owner_contains} found in Additional" |
| 146 | + |
99 | 147 |
|
100 | 148 | if __name__ == '__main__': |
101 | 149 | if len(sys.argv) < 3: |
|
0 commit comments