-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathmessage_downloader.py
281 lines (239 loc) · 10.6 KB
/
message_downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import asyncio
import logging
import typing
import telethon
from telethon.tl import types as tl_types
from telethon.tl.custom.message import Message as TLMessage
from .. import settings
from ..dict_types.dialog import DialogMetadata
from ..dict_types.message import MessageAttributes, MessageType, PeerID
from ..utils import async_retry
logger = logging.getLogger(__name__)
class DialogReader(typing.Protocol):
def read_dialog(self, dialog_id: int) -> DialogMetadata: ...
class MessageWriter(typing.Protocol):
def write_messages(
self, dialog: DialogMetadata, messages: list[MessageAttributes]
) -> None: ...
class MessageDownloader:
"""
Class for downloading and saving messages from user's dialogs.
For detailed info on message data structure, see `MessageAttributes` class.
Attributes:
client (telethon.TelegramClient): Telegram client for fetching the messages
dialog_reader (DialogReader): Dialog reader for reading the dialogs
message_writer (MessageWriter): Message writer for saving the messages
reactions_limit_per_message (int): maximum amount of reactions to fetch per message
"""
def __init__(
self,
client: telethon.TelegramClient,
dialog_reader: DialogReader,
message_writer: MessageWriter,
*,
reactions_limit_per_message: int,
) -> None:
self.client = client
self.dialog_reader = dialog_reader
self.message_writer = message_writer
self.reactions_limit_per_message = reactions_limit_per_message
self._semaphore = asyncio.Semaphore(5)
@property
def concurrent_dialog_downloads(self) -> int:
"""
Current number of dialogs, that will be processed concurrently during download.
"""
return self._semaphore._value # pylint: disable=protected-access
@concurrent_dialog_downloads.setter
def concurrent_dialog_downloads(self, value: int) -> None:
self._semaphore = asyncio.Semaphore(value)
def _reformat_message(self, message: TLMessage) -> MessageAttributes:
"""
Reformat a single message to a more convenient data structure.
"""
fwd_from = (
telethon.utils.get_peer_id(message.fwd_from.from_id)
if message.fwd_from and message.fwd_from.from_id
else None
)
from_id = (
telethon.utils.get_peer_id(message.from_id) if message.from_id else None
)
msg_attributes: MessageAttributes = {
"id": message.id,
"date": message.date,
"from_id": PeerID(from_id) if from_id else None,
"fwd_from": PeerID(fwd_from) if fwd_from else None,
"message": message.message or "",
"type": MessageType.TEXT,
"duration": None,
"to_id": PeerID(telethon.utils.get_peer_id(message.to_id)),
"reactions": {},
}
if media := message.media:
# For stickers, videos, and voice messages
if (
isinstance(media, tl_types.MessageMediaDocument)
and media.document
and not isinstance(media.document, tl_types.DocumentEmpty)
):
for attribute in media.document.attributes:
if isinstance(attribute, tl_types.DocumentAttributeSticker):
msg_attributes["message"] = attribute.alt
msg_attributes["type"] = MessageType.STICKER
break
if isinstance(attribute, tl_types.DocumentAttributeVideo):
msg_attributes["duration"] = attribute.duration
msg_attributes["type"] = MessageType.VIDEO
break
if (
isinstance(attribute, tl_types.DocumentAttributeAudio)
and attribute.voice
):
msg_attributes["duration"] = attribute.duration
msg_attributes["type"] = MessageType.VOICE
break
elif isinstance(message.media, tl_types.MessageMediaPhoto):
msg_attributes["type"] = MessageType.PHOTO
return msg_attributes
@async_retry(
telethon.errors.common.InvalidBufferError,
base_sleep_time=settings.MESSAGE_REACTION_EXPONENTIAL_BACKOFF_SLEEP_TIME,
max_tries=settings.MESSAGE_REACTION_EXPONENTIAL_BACKOFF_MAX_TRIES,
)
async def _get_message_reactions(
self, message: TLMessage, dialog_peer: tl_types.TypeInputPeer
) -> dict[PeerID, tl_types.ReactionEmoji]:
"""
Get reactions for a single message.
Args:
message (TLMessage): message to get reactions for
dialog_peer (tl_types.TypeInputPeer): dialog to get the reactions from
This is required because message id is relative to the dialog.
Returns:
dict[PeerID, tl_types.ReactionEmoji]
"""
try:
result: tl_types.messages.MessageReactionsList = await self.client(
telethon.functions.messages.GetMessageReactionsListRequest(
peer=dialog_peer,
id=message.id,
limit=self.reactions_limit_per_message,
)
) # type: ignore
except telethon.errors.BroadcastForbiddenError:
logger.debug("channel is broadcast: cannot retrieve reactions from message")
reactions = {}
except telethon.errors.MsgIdInvalidError:
# logger.debug("message %d not found", message.id)
reactions = {}
else:
reaction_objects = result.reactions
reactions = {
PeerID(
telethon.utils.get_peer_id(reaction_object.peer_id)
): reaction_object.reaction
for reaction_object in reaction_objects
if isinstance(reaction_object.reaction, tl_types.ReactionEmoji)
}
return reactions
async def _get_message_iterator(
self, dialog: DialogMetadata, msg_limit: int
) -> typing.AsyncIterator[TLMessage]:
"""
Utility function to get an async iterator of messages from a dialog.
We can't use plain `TelegramClient.iter_messages` method, because there can be caveats.
"""
logger.debug("dialog #%d: creating message iterator", dialog["id"])
try:
tg_entity = await self.client.get_entity(dialog["id"])
except ValueError as e:
logger.error("dialog #%d: %s", dialog["id"], e)
logger.info("init dialog %d through member username", dialog["id"])
username = None
try:
dialog_metadata = self.dialog_reader.read_dialog(dialog["id"])
except FileNotFoundError:
logger.error("dialog #%d: not found", dialog["id"])
raise
if (
"users" in dialog_metadata
and len(dialog_metadata["users"]) == 1
and "username" in dialog_metadata["users"][0]
):
username = dialog_metadata["users"][0]["username"]
else:
logger.error("dialog #%d: not a private chat", dialog["id"])
return
if not username:
# * user found, but username is empty
logger.error(
"dialog #%d: single user found, but username is empty", dialog["id"]
)
raise ValueError("username is empty") from e
tg_entity = await self.client.get_input_entity(username)
except Exception as e: # pylint: disable=broad-except
logger.error("dialog #%d: %s", dialog["id"], e)
return
if isinstance(tg_entity, list):
tg_entity = tg_entity[0]
async for message in self.client.iter_messages(
tg_entity, limit=msg_limit, wait_time=5
):
yield message
async def _download_dialog(self, dialog: DialogMetadata, msg_limit: int) -> None:
"""
Download messages from a single dialog and save them.
"""
logger.info("dialog #%d: downloading messages...", dialog["id"])
dialog_messages: list[MessageAttributes] = []
is_broadcast_channel: bool | None = None
msg_count = 0
async for m in self._get_message_iterator(dialog, msg_limit):
msg_count += 1
if msg_count % 1000 == 0:
logger.debug(
"dialog #%d: processing message number %d", dialog["id"], msg_count
)
msg_attrs = self._reformat_message(m)
if is_broadcast_channel is None and isinstance(
m.peer_id, tl_types.PeerChannel
):
channel = await self.client.get_entity(m.peer_id)
assert isinstance(channel, tl_types.Channel)
is_broadcast_channel = channel.broadcast
if not is_broadcast_channel:
# * avoid getting reactions for broadcast channels
peer = typing.cast(
tl_types.TypeInputPeer, telethon.utils.get_peer(dialog["id"])
) # * cast because dialog is tl_types.TypeInputPeer
msg_attrs["reactions"] = {
k: v.emoticon
for k, v in (await self._get_message_reactions(m, peer)).items()
}
dialog_messages.append(msg_attrs)
self.message_writer.write_messages(dialog, dialog_messages)
logger.info("dialog #%d: messages downloaded", dialog["id"])
async def _semaphored_download_dialog(self, *args, **kwargs):
"""
A utility function to restrict throughput of `_download_dialog` method.
It is necessary due to Telegram's request rate limits, which produces
"429 Too Many Requests" errors.
"""
async with self._semaphore:
await self._download_dialog(*args, **kwargs)
async def download_dialogs(
self, dialogs: list[DialogMetadata], msg_limit: int
) -> None:
"""
Provided a `dialogs` list, download messages from each dialog and save them.
Specify the maximum number of messages to download per dialog with `msg_limit`.
"""
logger.info("downloading messages from %d dialogs...", len(dialogs))
tasks = []
for dialog in dialogs:
# TODO: up for debate: move semaphored download to a decorator
tasks.append(self._semaphored_download_dialog(dialog, msg_limit))
await asyncio.gather(*tasks)
logger.info("all dialogs downloaded")
return