forked from limoruirui/misaka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchina_telecom.py
415 lines (395 loc) · 15.9 KB
/
china_telecom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
#!/usr/bin/python3
# -- coding: utf-8 --
# -------------------------------
# @Author : github@limoruirui https://github.com/limoruirui
# @Time : 2022/9/12 16:10
# cron "1 9,12 * * *" script-path=xxx.py,tag=匹配cron用
# const $ = new Env('电信签到');
# -------------------------------
"""
1. 电信签到 不需要抓包 脚本仅供学习交流使用, 请在下载后24h内删除
2. cron说明 12点必须执行一次(用于兑换) 然后12点之外还需要执行一次(用于执行每日任务) 一天共两次 可直接使用默认cron
2. 环境变量说明:
必须 TELECOM_PHONE : 电信手机号
选填 TELECOM_PASSWORD : 电信服务密码 填写后会执行更多任务
选填 TELECOM_FOOD : 给宠物喂食次数 默认为0 不喂食 根据用户在网时长 每天可以喂食5-10次
3. 必须登录过 电信营业厅 app的账号才能正常运行
"""
"""
update:
2022.10.25 参考大佬 github@QGCliveDavis https://github.com/QGCliveDavis 的 loginAuthCipherAsymmertric 参数解密 新增app登录获取token 完成星播客系列任务 感谢大佬
2022.11.11 增加分享任务
"""
from datetime import date, datetime
from random import shuffle, randint, choices
from time import sleep, strftime
from re import findall
from requests import get, post
from base64 import b64encode
from tools.aes_encrypt import AES_Ctypt
from tools.rsa_encrypt import RSA_Encrypt
from tools.tool import timestamp, get_environ, print_now
from tools.send_msg import push
from login.telecom_login import TelecomLogin
from string import ascii_letters, digits
class ChinaTelecom:
def __init__(self, account, pwd, checkin=True):
self.phone = account
self.ticket = ""
self.token = ""
if pwd != "" and checkin:
userLoginInfo = TelecomLogin(account, pwd).main()
self.ticket = userLoginInfo[0]
self.token = userLoginInfo[1]
def init(self):
self.msg = ""
self.ua = f"CtClient;9.6.1;Android;12;SM-G9860;{b64encode(self.phone[5:11].encode()).decode().strip('=+')}!#!{b64encode(self.phone[0:5].encode()).decode().strip('=+')}"
self.headers = {
"Host": "wapside.189.cn:9001",
"Referer": "https://wapside.189.cn:9001/resources/dist/signInActivity.html",
"User-Agent": self.ua
}
self.key = "-----BEGIN PUBLIC KEY-----\nMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+ugG5A8cZ3FqUKDwM57GM4io6\nJGcStivT8UdGt67PEOihLZTw3P7371+N47PrmsCpnTRzbTgcupKtUv8ImZalYk65\ndU8rjC/ridwhw9ffW2LBwvkEnDkkKKRi2liWIItDftJVBiWOh17o6gfbPoNrWORc\nAdcbpk2L+udld5kZNwIDAQAB\n-----END PUBLIC KEY-----"
def req(self, url, method, data=None):
re_try = 3
while re_try > 0:
try:
if method == "GET":
data = get(url, headers=self.headers).json()
return data
elif method.upper() == "POST":
data = post(url, headers=self.headers, json=data).json()
return data
else:
print_now("您当前使用的请求方式有误,请检查")
break
except:
re_try -= 1
sleep(5)
continue
# 长明文分段rsa加密
def telecom_encrypt(self, text):
if len(text) <= 32:
return RSA_Encrypt(self.key).encrypt(text)
else:
encrypt_text = ""
for i in range(int(len(text) / 32) + 1):
split_text = text[(32 * i):(32 * (i + 1))]
encrypt_text += RSA_Encrypt(self.key).encrypt(split_text)
return encrypt_text
@staticmethod
def geneRandomToken():
randomList = choices(ascii_letters + digits, k=129)
token = f"V1.0{''.join(x for x in randomList)}"
return token
# 签到
def chech_in(self):
url = "https://wapside.189.cn:9001/jt-sign/api/home/sign"
data = {
"encode": AES_Ctypt("34d7cb0bcdf07523").encrypt(
f'{{"phone":{self.phone},"date":{timestamp()},"signSource":"smlprgrm"}}')
}
print_now(self.req(url, "post", data))
# 获取任务列表
def get_task(self):
url = "https://wapside.189.cn:9001/jt-sign/paradise/getTask"
data = {
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
}
msg = self.req(url, "post", data)
# print_now(dumps(msg, indent=2, ensure_ascii=False))
if msg["resoultCode"] == "0":
self.task_list = msg["data"]
else:
print_now("获取任务列表失败")
print_now(msg)
return
# 做每日任务
def do_task(self):
url = "https://wapside.189.cn:9001/jt-sign/paradise/polymerize"
for task in self.task_list:
if "翻牌抽好礼" in task["title"] or "查看我的订单" in task["title"] or "查看我的云盘" in task["title"]:
print_now(f'{task["title"]}----{task["taskId"]}')
decrept_para = f'{{"phone":"{self.phone}","jobId":"{task["taskId"]}"}}'
data = {
"para": self.telecom_encrypt(decrept_para)
}
data = self.req(url, "POST", data)
if data["data"]["code"] == 0:
# print(data["resoultMsg"])
print_now(data)
else:
print_now(f'聚合任务完成失败,原因是{data["resoultMsg"]}')
# 给宠物喂食
def food(self):
url = "https://wapside.189.cn:9001/jt-sign/paradise/food"
data = {
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
}
res_data = self.req(url, "POST", data)
if res_data["resoultCode"] == "0":
print_now(res_data["resoultMsg"])
else:
print_now(f'聚合任务完成失败,原因是{res_data["resoultMsg"]}')
# 查询宠物等级
def get_level(self):
url = "https://wapside.189.cn:9001/jt-sign/paradise/getParadiseInfo"
body = {
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
}
data = self.req(url, "POST", body)
self.level = int(data["userInfo"]["paradiseDressup"]["level"])
if self.level < 5:
print_now("当前等级小于5级 不领取等级权益")
return
url = "https://wapside.189.cn:9001/jt-sign/paradise/getLevelRightsList"
right_list = self.req(url, "POST", body)[f"V{self.level}"]
for data in right_list:
# print(dumps(data, indent=2, ensure_ascii=0))
if "00金豆" in data["righstName"] or "话费" in data["righstName"]:
rightsId = data["id"]
self.level_ex(rightsId)
continue
# print(self.rightsId)
# 每月领取等级金豆
def level_ex(self, rightsId):
# self.get_level()
url = "https://wapside.189.cn:9001/jt-sign/paradise/conversionRights"
data = {
"para": self.telecom_encrypt(f'{{"phone":{self.phone},"rightsId":"{rightsId}"}},"receiveCount":1')
}
print_now(self.req(url, "POST", data))
# 查询连续签到天数
def query_signinfo(self):
url = "https://wapside.189.cn:9001/jt-sign/reward/activityMsg"
body = {
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
}
data = self.req(url, "post", body)
# print(dumps(data, indent=2, ensure_ascii=0))
recordNum = data["recordNum"]
if recordNum != 0:
return data["date"]["id"]
return ""
# 若连续签到为7天 则兑换
def convert_reward(self):
url = "https://wapside.189.cn:9001/jt-sign/reward/convertReward"
try:
rewardId = self.query_signinfo() # "baadc927c6ed4d8a95e28fa3fc68cb9"
except:
rewardId = "baadc927c6ed4d8a95e28fa3fc68cb9"
if rewardId == "":
return
body = {
"para": self.telecom_encrypt(
f'{{"phone":"{self.phone}","rewardId":"{rewardId}","month":"{date.today().__format__("%Y%m")}"}}')
}
for i in range(10):
try:
data = self.req(url, "post", body)
except Exception as e:
print(f"请求发送失败: " + str(e))
sleep(6)
continue
print_now(data)
if data["code"] == "0":
break
sleep(6)
reward_status = self.get_coin_info()
if reward_status:
self.msg += f"账号{self.phone}连续签到7天兑换1元话费成功\n"
print_now(self.msg)
push("电信签到兑换", self.msg)
else:
self.msg += f"账号{self.phone}连续签到7天兑换1元话费失败 明天会继续尝试兑换\n"
print_now(self.msg)
push("电信签到兑换", self.msg)
# 查询金豆数量
def coin_info(self):
url = "https://wapside.189.cn:9001/jt-sign/api/home/userCoinInfo"
data = {
"para": self.telecom_encrypt(f'{{"phone":{self.phone}}}')
}
self.coin_count = self.req(url, "post", data)
print_now(self.coin_count)
def author(self):
"""
通过usercode 获取 authorization
:return:
"""
self.get_usercode()
url = "https://xbk.189.cn/xbkapi/api/auth/userinfo/codeToken"
data = {
"usercode": self.usercode
}
data = post(url, headers=self.headers_live, json=data).json()
self.authorization = f"Bearer {data['data']['token']}"
self.headers_live["Authorization"] = self.authorization
def get_usercode(self):
"""
授权星播客登录获取 usercode
:return:
"""
url = f"https://xbk.189.cn/xbkapi/api/auth/jump?userID={self.ticket}&version=9.3.3&type=room&l=renwu"
self.headers_live = {
"User-Agent": self.ua,
"Host": "xbk.189.cn",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh-Hans;q=0.9"
}
location = get(url, headers=self.headers_live, allow_redirects=False).headers["location"]
usercode = findall(r"usercode=(.*?)&", location)[0]
self.usercode = usercode
def watch_video(self):
"""
看视频 一天可完成6次
:return:
"""
url = "https://xbk.189.cn/xbkapi/lteration/liveTask/index/watchVideo"
data = {
"articleId": 3453
}
data = post(url, headers=self.headers_live, json=data).json()
if data["code"] == 0:
print("看小视频15s完成一次")
else:
print(f"完成看小视频15s任务失败, 失败原因为{data['msg']}")
def like(self):
"""
点赞直播间 可完成5次
:return:
"""
url = "https://xbk.189.cn/xbkapi/lteration/room/like"
liveId_list = [1820, 2032, 2466, 2565, 1094, 2422, 1858, 2346]
shuffle(liveId_list)
for liveId in liveId_list[:5]:
data = {
"account": self.phone,
"liveId": liveId
}
try:
data = post(url, headers=self.headers_live, json=data).json()
if data["code"] == 8888:
sleep(2)
print(data["msg"])
else:
print(f"完成点赞直播间任务失败,原因是{data['msg']}")
except Exception:
print(Exception)
def watch_live(self):
# 首先初始化任务,等待15秒倒计时后再完成 可完成10次
url = "https://xbk.189.cn/xbkapi/lteration/liveTask/index/watchLiveInit"
live_id = randint(1000, 2700)
data = {
"period": 1,
"liveId": live_id
}
data = post(url, headers=self.headers_live, json=data).json()
if data["code"] == 0:
taskcode = data["data"]
url = "https://xbk.189.cn/xbkapi/lteration/liveTask/index/watchLive"
data = {
"key": taskcode,
"period": 1,
"liveId": live_id
}
print("正在等待15秒")
sleep(15)
data = post(url, headers=self.headers_live, json=data).json()
if data["code"] == 0:
print("完成1次观看直播任务")
else:
print(f"完成观看直播任务失败,原因是{data['msg']}")
else:
print(f"初始化观看直播任务失败,失败原因为{data['msg']}")
def get_userid(self):
url = "https://wapside.189.cn:9001/jt-sign/api/home/homeInfo"
body = {
"para": self.telecom_encrypt(f'{{"phone":"{self.phone}","signDate":"{datetime.now().__format__("%Y-%m")}"}}')
}
userid = post(url, json=body, headers=self.headers).json()["data"]["userInfo"]["userThirdId"]
return userid
def share(self):
"""
50的分享任务 token不做校检 有值即可 若登录成功了 使用自己的token 否则生成随机的token
:return:
"""
url = "https://appfuwu.189.cn:9021/query/sharingGetGold"
body = {
"headerInfos": {
"code": "sharingGetGold",
"timestamp": datetime.now().__format__("%Y%m%d%H%M%S"),
"broadAccount": "",
"broadToken": "",
"clientType": "#9.6.1#channel50#iPhone 14 Pro Max#",
"shopId": "20002",
"source": "110003",
"sourcePassword": "Sid98s",
"token": self.token if self.token != "" else self.geneRandomToken(),
"userLoginName": self.phone
},
"content": {
"attach": "test",
"fieldData": {
"shareSource": "3",
"userId": self.get_userid(),
"account": TelecomLogin.get_phoneNum(self.phone)
}
}
}
headers = {
"user-agent": "iPhone 14 Pro Max/9.6.1"
}
data = post(url, headers=headers, json=body).json()
print_now(data)
def main(self):
self.init()
self.chech_in()
self.get_task()
self.do_task()
if foods != 0:
for i in range(foods):
self.food()
# self.convert_reward()
if datetime.now().day == 1:
self.get_level()
self.share()
if self.ticket != "":
self.author()
for i in range(6):
self.watch_video()
sleep(15)
# self.like()
for i in range(10):
try:
self.watch_live()
except:
continue
self.coin_info()
self.msg += f"你账号{self.phone} 当前有金豆{self.coin_count['totalCoin']}"
push("电信app签到", self.msg)
def get_coin_info(self):
url = "https://wapside.189.cn:9001/jt-sign/api/getCoinInfo"
decrept_para = f'{{"phone":"{self.phone}","pageNo":0,"pageSize":10,type:"1"}}'
data = {
"para": self.telecom_encrypt(decrept_para)
}
data = self.req(url, "POST", data)
if "skuName" in data["data"]["biz"]["results"][0] and "连续签到" in data["data"]["biz"]["results"][0]["skuName"]:
return True
return False
if __name__ == "__main__":
phone = get_environ("TELECOM_PHONE")
password = get_environ("TELECOM_PASSWORD")
foods = int(float(get_environ("TELECOM_FOOD", 0, False)))
if phone == "":
exit(0)
if password == "":
print_now("电信服务密码未提供 只执行部分任务")
if datetime.now().hour + (8 - int(strftime("%z")[2])) == 12:
telecom = ChinaTelecom(phone, password, False)
telecom.init()
telecom.convert_reward()
else:
telecom = ChinaTelecom(phone, password)
telecom.main()