-
Notifications
You must be signed in to change notification settings - Fork 8
/
ku9.py
88 lines (76 loc) · 3.39 KB
/
ku9.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import requests
from bs4 import BeautifulSoup
import json
import html
import re
import os
import zipfile
import hashlib
def calculate_zip_hash_without_timestamp(zip_path):
"""计算 zip 文件内容的哈希值,忽略文件时间戳"""
hash_md5 = hashlib.md5()
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file_info in sorted(zip_ref.infolist(), key=lambda x: x.filename):
# 读取文件内容而不是时间戳
with zip_ref.open(file_info.filename) as file:
while chunk := file.read(4096):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def create_zip(zip_path, icon_dir, new_epg_data_path):
"""创建新的 ZIP 文件"""
with zipfile.ZipFile(zip_path, 'w') as icon_zip:
for icon_file in os.listdir(icon_dir):
icon_zip.write(os.path.join(icon_dir, icon_file), arcname=icon_file)
icon_zip.write(new_epg_data_path, arcname='epg_data.json')
def fetch_alias_data():
"""获取频道别名数据"""
response = requests.get('https://diyp.112114.xyz/alias')
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
alias_data = {}
for p in soup.find_all('p'):
content = html.unescape(p.decode_contents())
for line in content.split('\n'):
line = line.strip()
if '-->' in line:
alias, channel = map(str.strip, re.split(r'\s*-->\s*', line))
alias_data.setdefault(channel.upper(), []).append(alias.upper())
return {channel: ','.join(aliases) for channel, aliases in alias_data.items()}
def generate_epg_data(icon_data, formatted_alias):
"""生成 EPG 数据"""
epg_data = {"epgs": []}
for channel, logo in icon_data.items():
channel_upper = channel.upper()
aliases = formatted_alias.get(channel_upper, '').split(',')
name_field = ','.join([channel_upper] + aliases)
epg_data["epgs"].append({
"epgid": channel,
"logo": logo,
"name": name_field
})
return epg_data
# 主程序逻辑
os.makedirs('ku9', exist_ok=True)
formatted_alias = fetch_alias_data()
with open('ku9/alias.json', 'w', encoding='utf-8') as alias_file:
json.dump(formatted_alias, alias_file, ensure_ascii=False, indent=2)
with open('iconList_default.json', 'r', encoding='utf-8') as icon_file:
icon_data = json.load(icon_file)
epg_data = generate_epg_data(icon_data, formatted_alias)
with open('ku9/epg_data.json', 'w', encoding='utf-8') as epg_file:
json.dump(epg_data, epg_file, ensure_ascii=False, indent=2)
# 生成第二份 epg_data.json,logo 使用 epgid
new_epg_data = {"epgs": [{"epgid": channel["epgid"], "logo": channel["epgid"], "name": channel["name"]} for channel in epg_data["epgs"]]}
new_epg_data_path = 'ku9/new_epg_data.json'
with open(new_epg_data_path, 'w', encoding='utf-8') as new_epg_file:
json.dump(new_epg_data, new_epg_file, ensure_ascii=False, indent=2)
# 创建临时 ZIP 文件并检查更新
temp_zip_path = 'ku9/temp_icon.zip'
create_zip(temp_zip_path, 'icon', new_epg_data_path)
zip_path = 'ku9/icon.zip'
if os.path.exists(zip_path) and calculate_zip_hash_without_timestamp(zip_path) == calculate_zip_hash_without_timestamp(temp_zip_path):
print("icon.zip 文件没有变化,未更新。")
os.remove(temp_zip_path)
else:
print("icon.zip 文件已更新。")
os.replace(temp_zip_path, zip_path)