-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathddnsv6.py
executable file
·157 lines (131 loc) · 4.7 KB
/
ddnsv6.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import argparse
import socket
from time import sleep
import sys
from dnspod import DnsPod
import pyroute2
import pyroute2.netlink.rtnl
all_domains = []
records = []
ipr = pyroute2.IPRoute()
class Record:
dnspod = None
domain_id = 0
record_id = 0
line_id = 0
fqdn = ""
sub_domain = ""
def __init__(self, dnspod, fqdn, domain_id, sub_domain, record_id, line_id):
self.dnspod = dnspod
self.fqdn = fqdn
self.domain_id = domain_id
self.sub_domain = sub_domain
self.record_id = record_id
self.line_id = line_id
def __str__(self):
return "{}({:d}:{:d})".format(self.fqdn, self.domain_id, self.record_id)
def set(self, ip):
self.dnspod.Record.Modify(domain_id=self.domain_id,
record_type="AAAA",
record_id=self.record_id,
sub_domain=self.sub_domain,
record_line_id=self.line_id,
value=ip)
print("{}: updated to {}".format(self.fqdn, ip))
def parse_fqdn(dnspod, all_domains, fqdn, ip):
"""
:param dnspod:
:param all_domains:
:param str fqdn:
:return:
"""
sub_domain = None
for d in all_domains:
if d["name"] == fqdn:
sub_domain = "@"
break
if fqdn.endswith("." + d["name"]):
sub_domain = fqdn[0:len(fqdn) - len(d["name"]) - 1]
break
if sub_domain is None:
raise LookupError("No such domain name {}".format(fqdn))
rep = dnspod.Record.List(domain_id=d["id"], sub_domain=sub_domain, record_type="AAAA")
count = int(rep["info"]["record_total"])
if count == 0:
r = dnspod.Record.Create(domain_id=d["id"],
sub_domain=sub_domain,
record_type="AAAA",
value=ip,
record_line_id=0
)
if r["status"]["code"] != "1":
print(r["status"]["message"], file=sys.stderr)
sys.exit(1)
print("Created new AAAA record {} = {}".format(fqdn, ip))
return Record(dnspod, "{}.{}".format(sub_domain, rep["domain"]["punycode"]),
domain_id=d["id"],
sub_domain=sub_domain,
record_id=r["record"]["id"],
line_id=0
)
else:
rec = rep["records"][0]
r = Record(dnspod, "{}.{}".format(sub_domain, rep["domain"]["punycode"]),
domain_id=d["id"],
sub_domain=sub_domain,
record_id=rec["id"],
line_id=rec["line_id"]
)
r.set(ip)
return r
def get_ipv6():
all_rt = ipr.get_routes(socket.AF_INET6, lambda x: x["dst_len"] == 0)
if len(all_rt) == 0:
print("Unable to find the default ipv6 gateway", file=sys.stderr)
return None
for rt in all_rt:
oif = rt.get_attr('RTA_OIF')
if oif is None:
mp = rt.get_attr('RTA_MULTIPATH')
if mp is None:
continue
oif = mp[0]['oif']
all_addrs = ipr.get_addr(socket.AF_INET6, index=oif)
if len(all_addrs) == 0:
continue
for addr in all_addrs:
if addr['scope'] != 0: # Ignore link local addres
continue
ipa = addr.get_attr('IFA_ADDRESS')
if ipa.startswith("fc") or ipa.startswith("fd") or ipa.startswith("fe"):
continue
return ipa
print("Unable to get a valid ipv6 address", file=sys.stderr)
def parse_args():
parser = argparse.ArgumentParser(description="DDns Script for dnspod")
parser.add_argument('domains', metavar="domain", type=str, nargs=1, help="The domain to update")
return parser.parse_args()
def main():
if "DNSPOD_TOKEN" not in os.environ:
print("You should provider the user's token by environment variable \"DNSPOD_TOKEN\"")
exit(1)
args = parse_args()
old_ip = get_ipv6()
print("IPv6 Address Detected: {}".format(old_ip))
dnspod = DnsPod(os.environ["DNSPOD_TOKEN"])
print("Listing domains...")
all_domains = dnspod.Domain.List()["domains"]
print("Finding records...")
record = parse_fqdn(dnspod, all_domains, args.domains[0], old_ip)
ipr.bind(pyroute2.netlink.rtnl.RTMGRP_IPV6_IFADDR)
while True:
for m in ipr.get():
print(m)
ip = get_ipv6()
if old_ip != ip:
print("IPv6 Address Changed: {}".format(ip))
record.set(ip)
old_ip = ip
if __name__ == '__main__':
main()