-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdesec_dns_api.py
326 lines (258 loc) · 9.97 KB
/
desec_dns_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
#!/usr/bin/python
"""
Author Gerhard Steinbeis
Version: 0.2.0
"""
from __future__ import print_function
import json
import requests
class deSEC_DNS_API(object):
"""
Class to handle the deSEC DNS APIT requests
Requires: api_url, api_token
"""
def __init__(self, api_url, api_token, debug=False):
"""
Initially set the base url and the auth header
Keyword arguments:
api_url -- The API url used to cennect to
api_token -- The API token used to authentiocate on the API
debug -- Enable / Disable debug output (default False)
"""
super(deSEC_DNS_API, self).__init__()
self.url_base = api_url
self.header = {'Authorization': 'Token ' + api_token}
self.debug = debug
self.http_body = None
self.http_code = None
self.http_errmsg = None
self.single_result = False
def http_request(self, url, header, method='GET', data=None):
"""
Function performing http requests
Keyword arguments:
url -- The api url to send the request to
header -- Headers to send with the HTTP request
method -- The HTTP method used for the request (default 'GET')
data -- The request data to be sent with the request (default None)
"""
self.http_errmsg = ''
if self.debug:
print("*** DEBUG: http-request : http-url : " + url)
print("*** DEBUG: http-request : http-method : " + method)
print("*** DEBUG: http-request : http-header : " + str(header))
print("*** DEBUG: http-request : http-data : " + str(data))
if data is None:
req_data = None
else:
# encode data if passed to the function
req_data = data.encode('utf-8')
# Set the request type (GET, POST, PATCH, DELETE)
try:
ret = requests.request(method=method, url=url, data=data, headers=header)
except requests.exceptions.RequestException as err:
self.http_code = err.code
self.http_errmsg = err.msg
self.http_body = err.read()
if self.debug:
print("*** DEBUG: http-response: http-code : " + str(self.http_code))
print("*** DEBUG: http-response: http-error : '" + str(err.code) + ": " + err.msg + "'")
print("*** DEBUG: http-response: http-body :\n" + self.http_body + "\n")
return False
self.http_body = ret.text
self.http_code = ret.status_code
if self.debug:
print("*** DEBUG: http-request : url : " + ret.geturl())
print("*** DEBUG: http-response: http-code : " + str(self.http_code))
print("*** DEBUG: http-response: http-header :\n" + str(ret.info()))
print("*** DEBUG: http-response: http-body :\n" + self.http_body + "\n")
return True
def get_response_dict(self):
"""
Function to get json response parsed
Return: array of dicts
"""
# decode http_body from json to dict
ret_dict = json.loads(self.http_body)
# if single result is expected, create an array to remain structure
if self.single_result:
ret_dict = [ret_dict]
if self.debug:
print("*** DEBUG: json2dict : ret_dict : " + str(ret_dict))
return ret_dict
def domain_list(self, zone=None):
"""
Function to request the domain list
Return: boolean (based on http_code)
Keyword arguments:
zone -- The domain name that should be filtered for
"""
# check for zone to filter result
url_addition = ''
self.single_result = False
if zone:
url_addition = zone + "/"
self.single_result = True
# compile request url
req_url = self.url_base + url_addition
# request the list from the api
self.http_request(url=req_url, header=self.header, data=None, method='GET')
# return code indicates success
if self.http_code < 300:
return True
else:
return False
def domain_create(self, zone):
"""
Function to create a new domain
Return: boolean (based on http_code)
Keyword arguments:
zone -- The domain name that should be created
"""
self.single_result = True
# compose POST data
post_data = dict()
post_data['name'] = zone
data = json.dumps(post_data)
# Extend headers with Content-Type
headers = self.header
headers['Content-Type'] = "application/json"
# compile request url
req_url = self.url_base
# request the list from the api
self.http_request(url=req_url, header=headers, data=data, method='POST')
# return code indicates success
if self.http_code < 300:
return True
else:
return False
def domain_delete(self, zone):
"""
Function to delete a domain
Return: boolean (based on http_code)
Keyword arguments:
zone -- The domain name that should be deleted
"""
# set zone to specify domain
url_addition = ''
url_addition = zone + "/"
# compile request url
req_url = self.url_base + url_addition
# request the list from the api
self.http_request(url=req_url, header=self.header, method='DELETE')
# return code indicates success
if self.http_code < 300:
return True
else:
return False
def rrset_list(self, zone, type=None, subname=None):
"""
Function to request the rrset list
Return: boolean (based on http_code)
Keyword arguments:
zone -- The domain that should be used
type -- The type of rrsets that should be shown (default None)
subname -- The subname of rrset that should be shown (default None)
"""
# check for filter arguments
url_addition = ''
self.single_result = False
if type:
url_addition = "?type=" + type
if subname:
url_addition = "?subname=" + subname
if type and subname:
url_addition = subname + ".../" + type + "/"
self.single_result = True
# compile request url
req_url = self.url_base + zone + "/rrsets/" + url_addition
# request the list from the api
self.http_request(url=req_url, header=self.header, data=None, method='GET')
# return code indicates success
if self.http_code < 300:
return True
else:
return False
def rrset_create(self, zone, type, subname, records, ttl):
"""
Function to create a new rrset
Return: boolean (based on http_code)
Keyword arguments:
zone -- The domain that should be used
type -- The type of rrsets that should be created
subname -- The subname of rrset that should be created
records -- The records that should be set for this rrset
ttl -- The ttl that should be set for this rrset
"""
self.single_result = True
# compose POST data
post_data = dict()
post_data['subname'] = subname
post_data['type'] = type
post_data['ttl'] = ttl
post_data['records'] = records.split(",")
data = json.dumps(post_data)
if self.debug:
print("*** DEBUG: data=" + data)
# Extend headers with Content-Type
headers = self.header
headers['Content-Type'] = "application/json"
# compile request url
req_url = self.url_base + zone + "/rrsets/"
# request the list from the api
self.http_request(url=req_url, header=headers, data=data, method='POST')
# return code indicates success
if self.http_code < 300:
return True
else:
return False
def rrset_delete(self, zone, type, subname):
"""
Function to delete a new rrset
Return: boolean (based on http_code)
Keyword arguments:
zone -- The domain that should be used
type -- The type of rrsets that should be deleted
subname -- The subname of rrset that should be deleted
"""
self.single_result = True
# compile request url
req_url = self.url_base + zone + "/rrsets/" + subname + ".../" + type + "/"
# request the list from the api
self.http_request(url=req_url, header=self.header, data=None, method='DELETE')
# return code indicates success
if self.http_code < 300:
return True
else:
return False
def rrset_modify(self, zone, type, subname, records=None, ttl=None):
"""
Function to modify a new rrset
Return: boolean (based on http_code)
Keyword arguments:
zone -- The domain that should be used
type -- The type of rrsets that should be modified
subname -- The subname of rrset that should be modified
records -- The records that should be set for this rrset (default None)
ttl -- The ttl that should be set for this rrset (default None)
"""
self.single_result = True
# compose POST data
post_data = dict()
if ttl:
post_data['ttl'] = ttl
if records:
post_data['records'] = records.split(",")
data = json.dumps(post_data)
# Extend headers with Content-Type
headers = self.header
headers['Content-Type'] = "application/json"
# compile request url
req_url = self.url_base + zone + "/rrsets/" + subname + ".../" + type + "/"
# request the list from the api
self.http_request(url=req_url, header=headers, data=data, method='PATCH')
# return code indicates success
if self.http_code < 300:
return True
else:
return False