diff --git a/README.md b/README.md index 9f68110..c1fa709 100644 --- a/README.md +++ b/README.md @@ -37,28 +37,36 @@ pip install predeldomain-X.X.X-py3-none-any.whl ```bash » predeldomain --help -usage: predeldomain [-h] [-l [1-10]] [-m {1,2,3}] [-s {cn,top}] [-t {text,json}] [-w WHOIS] +usage: predeldomain [-h] [-d [1-30]] [-l [1-10]] [-m {1,2,3}] [-o OUPUT] [-s {cn,top}] [-t {text,json}] [-w WHOIS] [-v] The domain to be pre-deleted. options: -h, --help show this help message and exit + -d [1-30], --delay [1-30] + Delay: 1s to 30s -l [1-10], --length [1-10] Length: 1 to 10 -m {1,2,3}, --mode {1,2,3} Mode: 1. Alphanumeric, 2. Numeric, 3. Alphabetic + -o OUPUT, --ouput OUPUT + Output: print data to stdout -s {cn,top}, --suffix {cn,top} Suffix: 'cn' or 'top' -t {text,json}, --type {text,json} Save type: 'text' or 'json' -w WHOIS, --whois WHOIS - Whois: whois, isp, none + Whois: whois, isp, nic, none + -v, --version Print version ``` 1. length: 长度,不含后缀 2. mode: 模式, 1. 数字 + 字母, 2. 数字, 3. 字母 3. suffix: 域名后缀, 'cn' 或者 'top' 4. type: 保存类型, 'text' 或者 'json' (数据保存和发送通知的格式) 5. whois: whois, isp,查询可用的方式。`留空`,则不查询,而是直接根据官网提供的数据判断;`whois`,则使用 `whois` 库查询;`isp` 则使用官方接口(`.top`)、腾讯云(`.cn`)的 API 查询。 +6. version: 版本信息 +7. delay: 接口查询延时,单位秒,默认为 3 +8. ouput: 是否输出到控制台,默认为 False 结果将会通过 PUSH 通知,和保存到本地文件。本地文件将会以 `后缀_日期.log` 的格式保存(`_next`则是明天及以后预删除的域名)。 diff --git a/pyproject.toml b/pyproject.toml index 6d1a389..f4e7b1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "predeldomain" -version = "0.1.0" +version = "0.2.0" description = "预删除域名查询" authors = [ { name = "Jetsung Chan", email = "jetsungchan@gmail.com" } @@ -14,6 +14,7 @@ dependencies = [ readme = "README.md" keywords = ["domain"] requires-python = ">= 3.10" +license = { text = "Apache-2.0" } classifiers = [ 'Development Status :: 4 - Beta', 'Environment :: Console', diff --git a/src/predeldomain/__init__.py b/src/predeldomain/__init__.py index 8e057e1..dd43546 100644 --- a/src/predeldomain/__init__.py +++ b/src/predeldomain/__init__.py @@ -3,10 +3,21 @@ from predeldomain.app.entry import entry +version = '0.2.0' + def parse_arguments(): parser = argparse.ArgumentParser(description='The domain to be pre-deleted.') + parser.add_argument( + '-d', + '--delay', + type=int, + default=3, + metavar='[1-30]', + choices=range(1, 30), + help='Delay: 1s to 30s', + ) parser.add_argument( '-l', '--length', @@ -24,6 +35,13 @@ def parse_arguments(): default=1, help='Mode: 1. Alphanumeric, 2. Numeric, 3. Alphabetic', ) + parser.add_argument( + '-o', + '--ouput', + type=bool, + default=False, + help='Output: print data to stdout', + ) parser.add_argument( '-s', '--suffix', @@ -45,7 +63,14 @@ def parse_arguments(): '--whois', type=str, default='', - help='Whois: whois, isp, none', + help='Whois: whois, isp, nic, none', + ) + parser.add_argument( + '-v', + '--version', + action='version', + version='%(prog)s ' + version, + help='Print version', ) args = parser.parse_args() diff --git a/src/predeldomain/app/entry.py b/src/predeldomain/app/entry.py index fb10c1a..59e17ad 100644 --- a/src/predeldomain/app/entry.py +++ b/src/predeldomain/app/entry.py @@ -1,4 +1,5 @@ import json +import sys from datetime import datetime from os import environ @@ -10,59 +11,65 @@ from predeldomain.provider.provider_top import TOP -def write_log(data_list, suffix, type='text'): +def write_log(provider, suffix, type='text'): """ 处理数据 """ - if len(data_list) == 0: + if len(provider.data) < 2: return today = datetime.now().date() file_log = f'{suffix}_{today}.log' + file_log_prev = f'{suffix}_{today}_prev.log' file_log_next = f'{suffix}_{today}_next.log' - # print(data_list) + data = provider.data_all() + data_early = provider.data_early() + data_today = provider.data_today() if type == 'text': - data_str = f'.{suffix}\n'.join(data_list[0]) + f'.{suffix}\n' - with open(file_log, 'w') as f: - f.write(data_str) - - for i in range(1, len(data_list)): - if i == 1: + if len(data_early) > 0: + with open(file_log_prev, 'w') as f: + f.write(f'.{suffix}\n'.join(data_early) + f'.{suffix}\n') + if len(data_today) > 0: + with open(file_log, 'w') as f: + f.write(f'.{suffix}\n'.join(data_today) + f'.{suffix}\n') + + for i in range(2, len(data)): + if i == 2: wmode = 'w' + title = '明天过期' else: wmode = 'a' + title = '明天以后过期' - data_str = f'.{suffix}\n'.join(data_list[i]) + f'.{suffix}\n' + data_str = f'.{suffix}\n'.join(data[i]) + f'.{suffix}\n' with open(file_log_next, wmode) as f: - f.write(f'===========================================\n{data_str}') + f.write(f'============={title}=====================\n{data_str}') elif type == 'json': - data_json = json.dumps(data_list[0], indent=4) + data_json = json.dumps(data, indent=4) with open(file_log, 'w') as f: f.write(data_json) - merged_list = sum(data_list[1:], []) - data_json = json.dumps(merged_list, indent=4) - with open(file_log_next, 'w') as f: - f.write(data_json) - -def notify(data_list, suffix): +def notify(provider, suffix): """ 发送通知 """ - if len(data_list) == 0: + if len(provider.data) < 2: return + data_today = provider.data_today() + data_tomorrow = provider.data_tomorrow() + content = '' content_markdown = '' content_text = '' # 今天数据 - if len(data_list[0]) > 0: - content = '\n'.join(data_list[0]) + if len(data_today) > 0: + content = '\n'.join(data_today) content_markdown = f'**域名 `{suffix}` 今天过期:**\n```bash\n{content}\n```' content_text = f'域名 {suffix} 今天过期:\n{content}\n' @@ -70,8 +77,8 @@ def notify(data_list, suffix): content_next = '' content_next_markdown = '' content_next_text = '' - if len(data_list) > 1: - content_next = '\n'.join(data_list[1]) + if len(data_tomorrow) > 1: + content_next = '\n'.join(data_tomorrow) content_next_markdown = ( f'**域名 `{suffix}` 明天过期:**\n```bash\n{content_next}\n```' ) @@ -106,13 +113,22 @@ def entry(args): functions = {'top': TOP, 'cn': CN} + if args.suffix not in functions: + raise Exception(f'Unsupported suffix: {args.suffix}') + + if args.whois == 'nic': + if args.suffix != 'top': + print('nic.top only', file=sys.stderr) + sys.exit(1) + # print( # f'Domain Suffix: {args.suffix}, Length: {args.length}, Mode: {args.mode}, Whois: {args.whois}' # ) - provider = functions.get(args.suffix, Provider)(args.length, args.mode, args.whois) + provider = functions.get(args.suffix, Provider)( + args.length, args.mode, args.whois, args.delay, args.ouput + ) provider.entry() - data_list = provider.data_all() - write_log(data_list, args.suffix, args.type) + write_log(provider, args.suffix, args.type) - notify(data_list, args.suffix) + notify(provider, args.suffix) diff --git a/src/predeldomain/provider/provider.py b/src/predeldomain/provider/provider.py index 468cc2e..99e75ff 100644 --- a/src/predeldomain/provider/provider.py +++ b/src/predeldomain/provider/provider.py @@ -1,6 +1,7 @@ import os import re import sys +import time from datetime import datetime import requests @@ -23,16 +24,14 @@ def __init__( length=3, mode=Mode.ALPHABETIC.value, # noqa: F821 whois='', + delay=3, + ouput=False, ): self.length = length self.mode = mode self.whois = whois - - def is_domain_available(self, domain: str) -> bool: - """ - 判断是否可注册 - """ - return False + self.delay = delay + self.ouput = ouput def entry(self): # noqa: B027 """ @@ -46,17 +45,29 @@ def data_all(self): """ return self.data + def data_early(self): + """ + 获取昨日数据 + """ + return self.data[0] if len(self.data) > 0 else [] + def data_today(self): """ 获取今日数据 """ - return self.data[0] if len(self.data) > 0 else [] + return self.data[1] if len(self.data) > 1 else [] + + def data_tomorrow(self): + """ + 获取明日数据 + """ + return self.data[2] if len(self.data) > 2 else [] def data_future(self): """ 获取未来数据 """ - return self.data[1:] if len(self.data) > 1 else [] + return self.data[3:] if len(self.data) > 3 else [] def match_mode(self, data): """ @@ -88,6 +99,47 @@ def should_download_file(self, file_name: str): file_time = datetime.fromtimestamp(os.path.getmtime(file_name)) return file_time.date() != datetime.now().date() + def print_data(self, domain, is_available=False): + """ + 打印数据 + """ + if self.ouput: + print(f'{domain} is available: {is_available}') + + def is_domain_available(self, domain): + """ + 判断是否可注册 + """ + + if self.whois == 'nic': # nic.top + if '.top' in domain: + is_available = self.nic_top_available(domain) + self.print_data(domain, is_available) + return is_available + else: + return False + + time.sleep(self.delay) + + is_available = False + if self.whois == 'isp': + is_available = self.isp_available(domain) + elif self.whois == 'whois': + is_available = self.whois_available(domain) + else: + return True + + self.print_data(domain, is_available) + return is_available + + def nic_top_available(self, domain): + """ + 通过 nic.top 判断是否可注册 + """ + params = {'domainName': domain} + response = requests.post('https://www.nic.top/cn/whoischeck.asp', data=params) + return 'is available' in response.text + def whois_available(self, domain): """ 通过 Whois 判断是否可注册 @@ -142,10 +194,6 @@ def isp_available(self, domain): raise ValueError(f'status code {response.status_code}') resp = response.json() - if response.status_code != 200: - raise ValueError(f'status code is: {response.status_code}') - if 'code' not in resp or resp['code'] != 0: - raise ValueError(f'code not in resp: {resp}') if 'message' in resp and '未注册' in resp['message']: return True else: diff --git a/src/predeldomain/provider/provider_cn.py b/src/predeldomain/provider/provider_cn.py index 24d9160..e726a3d 100644 --- a/src/predeldomain/provider/provider_cn.py +++ b/src/predeldomain/provider/provider_cn.py @@ -22,18 +22,6 @@ def download_txt(self, url): f'Failed to download TXT file from {url}, status code: {response.status_code}' ) - def is_domain_available(self, domain): - """ - 判断是否可注册 - """ - full_domain = f'{domain}.cn' - if self.whois == 'isp': - return self.isp_available(full_domain) - elif self.whois == 'whois': - return self.whois_available(full_domain) - else: - return True - def _process_response(self, response, is_today=False): """ 处理响应数据并返回符合条件的域名列表 @@ -47,7 +35,7 @@ def _process_response(self, response, is_today=False): continue if len(domain) <= self.length: if is_today: - if self.is_domain_available(domain): + if self.is_domain_available(f'{domain}.cn'): domain_list.append(domain) else: domain_list.append(domain) @@ -59,11 +47,7 @@ def entry(self): """ # 下载并处理今天的数据 today_resp = self.download_txt(self.file_urls['today']) - data_list = self._process_response(today_resp, True) - - # 下载并处理明天和后天的数据 - data_tomorrow = [] - data_after_tomorrow = [] + data_today = self._process_response(today_resp, True) tomorrow_resp = self.download_txt(self.file_urls['tomorrow']) data_tomorrow = self._process_response(tomorrow_resp) @@ -71,8 +55,10 @@ def entry(self): after_tomorrow_resp = self.download_txt(self.file_urls['after_tomorrow']) data_after_tomorrow = self._process_response(after_tomorrow_resp) + data_early = [] + # 排序结果 - data_list.sort() + data_today.sort() data_tomorrow.sort() data_after_tomorrow.sort() - self.data = [data_list, data_tomorrow, data_after_tomorrow] + self.data = [data_early, data_today, data_tomorrow, data_after_tomorrow] diff --git a/src/predeldomain/provider/provider_top.py b/src/predeldomain/provider/provider_top.py index b148d9a..1029d5f 100644 --- a/src/predeldomain/provider/provider_top.py +++ b/src/predeldomain/provider/provider_top.py @@ -24,22 +24,6 @@ def download_csv(self): f'Failed to download CSV file, status code: {response.status_code}' ) - def is_domain_available(self, domain): - """ - 判断是否可注册 - """ - - if self.whois == 'isp': - params = {'domainName': domain} - response = requests.post( - 'https://www.nic.top/cn/whoischeck.asp', data=params - ) - return 'is available' in response.text - elif self.whois == 'whois': - return self.whois_available(f'{domain}.top') - else: - return True - def entry(self): """ 主函数 @@ -57,23 +41,44 @@ def entry(self): next(reader) # 跳过 CSV 文件的头部 - data_list = [] - data_next = [] + # 早期 + data_early = [] + data_today = [] + data_tomorrow = [] + data_future = [] for row in reader: - data = row[0].replace('.top', '') - if not self.match_mode(data): + domain = row[0].replace('.top', '') + if not self.match_mode(domain): continue - if len(data) <= self.length: - given_time = datetime.strptime(row[1], '%Y/%m/%d %H:%M') - if given_time < datetime.now(): - if self.is_domain_available(data): - data_list.append(data) - else: - data_list.append(data) - elif given_time < datetime.now() + timedelta(days=1): - data_next.append(data) + if len(domain) <= self.length: + given_time = datetime.strptime( + row[1], '%Y/%m/%d %H:%M' + ) # 解析时间字符串 + current_date = datetime.now().date() # 获取当前日期 + tomorrow_date = current_date + timedelta(days=1) # 计算明天的日期 + + # 检查日期 + if given_time.date() < current_date: # 如果日期是过去 + data_early.append(domain) # 过去的数据 + if given_time.date() == current_date: # 如果日期是今天 + if ( + given_time.time() > datetime.now().time() + ): # 如果时间大于当前时间 + continue + if not self.is_domain_available( + f'{domain}.top' + ): # 判断域名是否可用 + continue + + data_today.append(domain) # 今天过期且可用 + elif given_time.date() == tomorrow_date: # 如果日期是明天 + data_tomorrow.append(domain) # 明天过期的数据 + elif given_time.date() > tomorrow_date: # 如果日期是未来 + data_future.append(domain) # 添加到未来过期的数据列表 - data_list.sort() - data_next.sort() - self.data = [data_list, data_next] + data_early.sort() + data_today.sort() + data_tomorrow.sort() + data_future.sort() + self.data = [data_early, data_today, data_tomorrow, data_future]