-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpixiv.py
181 lines (161 loc) · 8.73 KB
/
pixiv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# -*- coding: utf-8 -*-
from prompt_toolkit import print_formatted_text as print
class dummy():
def __init__(self) -> None:
pass
class Pixiv():
def __init__(self, storage = None):
self.loop = 10
import importlib
self.debug = False
self.ajax_url = dummy()
if storage is None:
self.storage = None
self.modules = dummy()
else:
self.storage = storage
try:
self.logger = self.storage.logger
except:
pass
try:
self.logger_debug = self.storage.logger_debug
except:
pass
try:
self.modules = self.storage.modules
except:
self.modules = dummy()
self.modules.requests = importlib.import_module('requests')
self.modules.time = importlib.import_module('time')
self.ajax_url_setup()
pass
def logger_debug(self, data):
if self.debug:
print(self.modules.time.strftime("%Y-%m-%d %H:%M:%S ")+data)
def logger(self, data):
print(self.modules.time.strftime("%Y-%m-%d %H:%M:%S ")+data)
pass
def ajax_url_setup(self):
self.ajax_url.illust = "https://www.pixiv.net/ajax/illust/{}?lang=en"
self.ajax_url.uigora_url = "https://www.pixiv.net/ajax/illust/{}/ugoira_meta?lang=en"
self.ajax_url.novel = "https://www.pixiv.net/ajax/novel/{}?lang=en"
self.ajax_url.illust_small = "https://www.pixiv.net/ajax/user/0/illusts?lang=en"
def get_small_illust_data(self, range_start, range_end):
if range_end < range_start:
self.logger("ERROR:IF: range_end cannot be less than range_start")
return {"stauts": 400, "message": "range_end cannot be less than range_start", "data": {}}
if range_end - range_start > 100:
self.logger("ERROR:IF: range count cannot be greater than 100")
return {"stauts": 400, "message": "range count cannot be greater than 100", "data": {}}
url = self.ajax_url.illust_small
for i in range(range_start, range_end):
url += "&ids[]=" + str(i)
data = self.modules.requests.get(url)
if data.json()["error"]:
self.logger("ERROR:GET: {status_code} {message}".format(status_code=data.status_code, message=data.json()["message"]))
return {"stauts": data.status_code, "message": data.json()["message"]}
else:
return {"status": 200, "data": data.json()["body"], "message": data.json()["message"]}
def get_illust_data(self, illust_id):
loop = 0
while True:
if self.storage.exit: return
loop += 1
if loop > self.loop:
self.logger("ERROR: {id}: Too many loops".format(id=illust_id))
return {"stauts": 400, "message": "Too many loops"}
try:
data = self.modules.requests.get(self.ajax_url.illust.format(illust_id))
#Except windows ssl error
except self.modules.requests.exceptions.SSLError:
self.logger("WARNING:GET: {id}: SSL ERROR while downloading picture meta. Sleeping 10 seconds".format(id=illust_id))
self.modules.time.sleep(10)
continue
if data.status_code == 200:
if data.json()["error"]:
self.logger("ERROR:GET: {id}: {message}".format(id=illust_id, message=data.json()["message"]))
break
p = data.json()["body"]
del(p["zoneConfig"])
del(p["noLoginData"])
del(p["userIllusts"])
del(p["comicPromotion"])
del(p["fanboxPromotion"])
del(p["descriptionYoutubeId"])
del(p["descriptionBoothId"])
if p["illustType"] == 2:
while True:
if self.storage.exit: return
try:
ugoira_meta = self.modules.requests.get(self.ajax_url.uigora_url.format(illust_id))
except self.modules.requests.exceptions.SSLError:
self.logger("WARNING:GET: {id}: SSL ERROR while downloading uigora meta. Sleeping 10 seconds".format(id=illust_id))
self.modules.time.sleep(10)
continue
if ugoira_meta.status_code == 200:
if ugoira_meta.json()["error"]:
self.logger("ERROR:GET: {id}: {message}".format(id=illust_id, message=ugoira_meta.json()["message"]))
break
p.update({"ugoira_meta": ugoira_meta.json()["body"]}) # UPDATE META UGOIRA
break
elif ugoira_meta.status_code == 429:
self.logger("WARNING:GET: {id}: Too many requests. Sleeping 1 min".format(id=illust_id))
self.modules.time.sleep(60)
else: # on ERROR
try:
if ugoira_meta.json()["error"]:
self.logger("ERROR:GET: Ugoira {id}: {status_code} {message}".format(id=illust_id, status_code=ugoira_meta.status_code, message=ugoira_meta.json()["message"]))
break
else:
self.logger("ERROR:GET: Ugoira {id}: {status_code}".format(id=illust_id, status_code=ugoira_meta.status_code))
except:
self.logger("ERROR:GET: Ugoira {id}: {status_code}".format(id=illust_id, status_code=ugoira_meta.status_code))
break # Error handling end
self.logger_debug("DEBUG:GET: {id} - {status_code}".format(id=illust_id, status_code=data.status_code))
break
elif data.status_code == 429:
self.logger("WARNING:GET: {id}: Too many requests. Sleeping 1 min".format(id=illust_id))
self.modules.time.sleep(60)
else: # on ERROR
try:
if data.json()["error"]:
self.logger("ERROR:GET: {id}: {status_code} {message}".format(id=illust_id, status_code=data.status_code, message=data.json()["message"]))
return {"stauts": data.status_code, "message": data.json()["message"]}
else:
self.logger("ERROR:GET: {id}: {status_code}".format(id=illust_id, status_code=data.status_code))
return {"stauts": data.status_code, "message": "Unknown error"}
except:
self.logger("ERROR:GET: {id}: {status_code}".format(id=illust_id, status_code=data.status_code))
return {"stauts": data.status_code, "message": "Unknown error"}
# Error handling end
return {"stauts": 200, "data": p}
def get_novel_data(self, novel_id):
while True:
data = self.modules.requests.get(self.ajax_url.novel.format(novel_id))
if data.status_code == 200:
if data.json()["error"]:
self.logger("ERROR:GET: {id}: {message}".format(id=novel_id, message=data.json()["message"]))
break
p = data.json()["body"]
del(p["zoneConfig"])
del(p["noLoginData"])
del(p["userNovels"])
self.logger("DEBUG:GET: {id} - {status_code}".format(id=novel_id, status_code=data.status_code))
break
elif data.status_code == 429:
self.logger("WARNING: {id}: Too many requests. Sleeping 1 min".format(id=novel_id))
self.modules.time.sleep(60)
else: # on ERROR
try:
if data.json()["error"]:
self.logger("ERROR: {id}: {status_code} {message}".format(id=novel_id, status_code=data.status_code, message=data.json()["message"]))
return {"stauts": data.status_code, "message": data.json()["message"]}
else:
self.logger("ERROR: {id}: {status_code}".format(id=novel_id, status_code=data.status_code))
return {"stauts": data.status_code, "message": "Unknown error"}
except:
self.logger("ERROR: {id}: {status_code}".format(id=novel_id, status_code=data.status_code))
return {"stauts": data.status_code, "message": "Unknown error"}
# Error handling end
return {"stauts": 200, "data": p}