-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathqjs_drpy.py
171 lines (140 loc) · 5.62 KB
/
qjs_drpy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : qjs_drpy.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Date : 2024/3/20
import os
import threading
import ujson
from pyquickjs import Context
from utils.quickjs_ctx import initContext
from utils.tools import get_md5
from t4.test.ad_remove import fixAdM3u8
from concurrent.futures import ThreadPoolExecutor, wait
class Drpy:
base_path = os.path.dirname(__file__)
def _(self, p):
return os.path.join(self.base_path, p)
def __init__(self, api='drpy', t4_js_api='', debug=0):
"""
初始化一个drpy类
@param api: 可以传api如996影视.js
@param t4_js_api: 本地代理url
@param debug: 开启调试模式|默认关闭,开启后可以显示源里打印的日志
"""
key = '_' + get_md5(api)
self.executor = ThreadPoolExecutor(max_workers=1)
self._lock = threading.Lock()
self._api = api
self.key = key
self.t4_js_api = t4_js_api
self.debug = debug
self.ctx = self.submit(self.initCtx)
def moduleloader(self,number):
print(f"module_ name: {number}")
_str = ''
with open(self._(number), encoding='utf-8') as f:
_str = f.read()
# print(_str)
return _str
def initCtx(self) -> Context:
ctx = initContext(Context(), url='', prefix_code='true', env={'debug': self.debug}, getParams=lambda: {},
getCryptoJS=lambda: 'true')
ctx.moduleloader(self.moduleloader)
ctx.module(f'import spider from "../files/drpy_libs/drpy2.js";\n globalThis.{self.key}=spider;')
return ctx
def submit(self, function, *args):
with self._lock:
future = self.executor.submit(function, *args)
wait([future])
return future.result()
def setDebug(self, debug):
return self.submit(self.ctx.set, '_debug', debug)
def getName(self):
return self._api.split('/')[-1]
def toJsObJect(self, any):
if isinstance(any, dict) or isinstance(any, list):
return self.submit(self.ctx.parse_json, ujson.dumps(any, ensure_ascii=False))
return any
@staticmethod
def toDict(_object):
return ujson.loads(_object.json())
@staticmethod
def setDict(_str):
return ujson.loads(_str)
def call(self, func: str, *args):
return self.submit(self._call, func, *args)
def _call(self, func: str, *args, run_gc=True):
def convert_arg(arg):
if isinstance(arg, (type(None), str, bool, float, int)):
return arg
else:
# More complex objects are passed through JSON.
return self.ctx.parse_json(ujson.dumps(arg, ensure_ascii=False))
try:
return self.ctx.eval(f'globalThis.{self.key}.{func}')(*[convert_arg(a) for a in args])
finally:
if run_gc:
self.ctx.gc()
def init(self, extend=""):
self.call('init', extend)
def homeContent(self, filter=None):
return self.setDict(self.call('home', filter))
def homeVideoContent(self):
return self.setDict(self.call('homeVod'))
def categoryContent(self, tid, pg, filter, extend):
return self.setDict(self.call('category', tid, pg, filter, extend))
def detailContent(self, ids):
if isinstance(ids, list):
ids = ids[0]
return self.setDict(self.call('detail', ids))
def searchContent(self, key, quick, pg=1):
return self.setDict(self.call('search', key, quick, pg))
def playerContent(self, flag, id, vipFlags=None):
return self.setDict(self.call('play', flag, id, vipFlags))
def isVideo(self):
"""
返回是否为视频的匹配字符串
@return: None空 reg:正则表达式 js:input js代码
"""
# return 'js:input.includes(".m3u8)?true:false'
@staticmethod
def adRemove():
"""
m3u8广告移除函数。将自动执行返回的字符串的本地代理功能
@return: None空 reg:正则表达式 js:input js代码
"""
# return 'reg:/video/adjump.*?ts'
def getProxyUrl(self):
"""
获取本地代理地址
@return:
"""
return self.t4_js_api
@staticmethod
def fixAdM3u8(*args):
return fixAdM3u8(*args)
def localProxy(self, params):
return self.toDict(self.call('proxy', params))
if __name__ == '__main__':
drpy = Drpy(debug=1)
# drpy.init('https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/raw/main/js/荐片.js')
# drpy.init('https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/raw/main/js/555影视[飞].js')
# drpy.init('https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/raw/main/js/农民影视.js')
# drpy.init('https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/raw/main/js/996影视.js')
drpy.init('https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/raw/main/js/奇珍异兽.js')
# with open('../files/drpy_js/freeok.js', encoding='utf-8') as f:
# code = f.read()
# drpy.init(code)
drpy.setDebug(1)
print(drpy.homeContent())
# f = quickjs.Function(
# "adder", """
# function adder(x, y) {
# return x + y;
# }
# """)
# print(drpy.categoryContent('3', 1, False, {}))
# print(drpy.detailContent("3$/detail/790.html"))
# print(drpy.playerContent("索尼", "https://www.cs1369.com/play/790-1-1.html", []))
# print(drpy.searchContent("斗罗大陆", False, 1))