-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslasher.py
177 lines (140 loc) · 6.76 KB
/
slasher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from dataclasses import dataclass
from datetime import timedelta
from functools import reduce
from math import floor
from operator import itemgetter
from typing import TextIO, List
from xml.dom import minidom
from chat_downloader import ChatDownloader
from .types import Chat, TimeDict
ZERO_SECONDS = timedelta(seconds=0)
TEN_SECONDS = timedelta(seconds=10)
chat_dl = ChatDownloader()
impl = minidom.getDOMImplementation()
@dataclass
class Slasher:
url: str
chat: Chat
intervals: TimeDict
duration: timedelta = TEN_SECONDS
delay: timedelta = ZERO_SECONDS
@staticmethod
def from_url(url: str,
duration: timedelta = TEN_SECONDS,
delay: timedelta = ZERO_SECONDS) -> 'Slasher':
"""Generate a Slasher from a `chat-downloader`-compatible URL."""
chat = chat_dl.get_chat(url)
intervals: TimeDict = {}
for item in chat:
temp_time_in_seconds = item.get('time_in_seconds')
if temp_time_in_seconds == None:
continue
floored_time_in_seconds = floor(temp_time_in_seconds)
# If the delay causes the message to go under 0, drop it
if floored_time_in_seconds - delay.seconds <= 0:
continue
# Now its a mulitple of {duration}!
time_in_seconds = floored_time_in_seconds - (
(floored_time_in_seconds - delay.seconds) % duration.seconds)
if time_in_seconds in intervals:
intervals[time_in_seconds] += 1
else:
intervals[time_in_seconds] = 1
return Slasher(url, chat, intervals, duration, delay)
def average(self) -> int:
"""Get the floored average number of messages in an interval."""
return reduce(
lambda x, y: x + y,
self.intervals.values(),
) // len(self.intervals)
def filter(self, multiplier: float = 2.0) -> 'Slasher':
"""Remove intervals whose number of messages are smaller than the average times {multiplier}."""
average = self.average()
filtered_intervals = {
interval: messages
for interval, messages in self.intervals.items()
if messages >= floor(average * multiplier)
}
return Slasher(self.url, self.chat, filtered_intervals, self.duration,
self.delay)
def top(self, amount: int = 5) -> 'Slasher':
"""Sort all intervals by message intensity, save the top {amount}, and then resort chronologically."""
sorted_by_number_of_messages = sorted(self.intervals.items(), key=itemgetter(1), reverse=True)
truncated_sorted_by_number_of_messages = sorted_by_number_of_messages[:amount]
chronologically_sorted_items = sorted(truncated_sorted_by_number_of_messages, key=itemgetter(0))
return Slasher(self.url, self.chat, dict(chronologically_sorted_items), self.duration, self.delay)
def clip(self, start: timedelta = ZERO_SECONDS, end: timedelta = timedelta.max):
"""Remove intervals outside of a certain time period."""
clipped_intervals = {
interval: messages
for interval, messages in self.intervals.items()
if interval >= start.seconds and interval <= end.seconds
}
return Slasher(self.url, self.chat, clipped_intervals, self.duration,
self.delay)
def to_ffmpeg_filter_complex(self, f: TextIO):
"""Write a FFmpeg filter complex: https://ffmpeg.org/ffmpeg-filters.html.
Use something like: `ffmpeg -hide_banner -i $IN -filter_complex_script $SCRIPT -map [outv] -map [outa] $OUT`."""
pairs: List[str] = []
for index, interval in enumerate(self.intervals.keys()):
f.write(f"[0:v]trim=start={interval}:end={interval + self.duration.seconds},setpts=PTS-STARTPTS,format=yuv420p[{index}v];\n")
pairs.append(f"[{index}v]")
f.write(f"[0:a]atrim=start={interval}:end={interval + self.duration.seconds},asetpts=PTS-STARTPTS[{index}a];\n")
pairs.append(f"[{index}a]")
f.write(f"{''.join(pairs)}concat=n={len(self.intervals)}:v=1:a=1[outv][outa]\n")
def to_ffsilencer(self, f: TextIO):
"""Write a space-separated time and duration format to a text file
semi-compatible with FFsilencer: https://github.com/supersonichub1/ffsilencer."""
for interval in self.intervals.keys():
f.write(f"{interval} {self.duration.seconds}\n")
def to_mlt(self, f: TextIO, resource: str = "vod.mp4"):
"""Write an XML document compatible with the MLT Framework and
associated editors: https://www.mltframework.org/docs/mltxml/."""
document = impl.createDocument(None, "mlt", None)
def create_producer(id_: str = "producer0"):
producer = document.createElement("producer")
producer.setAttribute("id", id_)
property_el = document.createElement("property")
property_el.setAttribute("name", "resource")
property_el.appendChild(document.createTextNode(resource))
producer.appendChild(property_el)
return producer
def create_entry(in_: timedelta,
out: timedelta,
producer: str):
# print("in create_entry:", producer)
entry = document.createElement("entry")
entry.setAttribute("in", str(in_) + ".000")
entry.setAttribute("out", str(out) + ".000")
entry.setAttribute("producer", producer)
return entry
def intervals_to_entries(producer: str = "producer0") -> list:
entries = []
for interval in self.intervals.keys():
in_ = timedelta(seconds=interval)
out = in_ + self.duration
entries.append(create_entry(in_, out, producer))
return entries
def create_playlist(entries: list, id_: str = "playlist0"):
playlist = document.createElement("playlist")
playlist.setAttribute("id", id_)
for entry in entries:
playlist.appendChild(entry)
return playlist
entries = intervals_to_entries()
playlist = create_playlist(entries)
mlt = document.documentElement
mlt.setAttribute("title", "VODinator verson 2021.07.14")
mlt.appendChild(create_producer())
mlt.appendChild(playlist)
document.writexml(f)
# Extra guffins from when this was a script
"""
# Duration for Twitch extractor is in seconds
print(chat.duration)
messages = list(chat)
number_of_messages = len(messages)
message_rate = number_of_messages / (chat.duration or 1)
# About 1.7 messages a second! That's crazy!
print(f"Number of messages: {number_of_messages} || Rate: {message_rate} messages per second")
"""