-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor_cookies.py
executable file
·158 lines (127 loc) · 3.92 KB
/
monitor_cookies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python3
from datetime import datetime
from datetime import timezone
from datetime import timedelta
import dataclasses
import typing as t
import os
from flareio import FlareApiClient
import time
import json
class Cookie(t.TypedDict):
domain: str
name: str
path: str
value: str
class CursorData(t.TypedDict):
cursor: str
@dataclasses.dataclass
class MonitorContext:
api_client: FlareApiClient
####################
## Cursor Methods ##
####################
get_cursor: t.Callable[[], str | None]
save_cursor: t.Callable[[str], None]
#######################
## Cookie Management ##
#######################
verify_cookie: t.Callable[[Cookie], bool]
invalidate_cookie: t.Callable[[Cookie], None]
def get_cursor() -> str | None:
# TODO: Implement a method here that loads your cursor,
# perhaps from a database.
data: str = ""
if not os.path.exists("cursor.txt"):
return None
with open("cursor.txt", "r", encoding="utf-8") as f:
data = f.read().strip()
if not data:
return None
cursor_data: CursorData = json.loads(data)
return cursor_data["cursor"]
def save_cursor(cursor: str) -> None:
# TODO: Implement a method here that saves the cursor.
print(f"Would save {cursor=}")
cursor_data: CursorData = CursorData(cursor=cursor)
with open("cursor.txt", "w") as f:
f.write(json.dumps(cursor_data))
f.write("\n")
def verify_cookie(cookie: Cookie) -> bool:
print(f"Would verify {cookie=}...")
return True
def invalidate_cookie(cookie: Cookie) -> None:
# TODO: Implement a method that would invalidate the cookie.
print(f"Would invalidate {cookie=}...")
pass
def run_monitor_cookies(
*,
context: MonitorContext,
domain: str,
cookie_name: str,
include_expired: bool,
time_since_imported: timedelta | None = None,
) -> None:
cursor: str | None = context.get_cursor()
query: dict = {
"from": cursor,
"domain": domain,
"names": [cookie_name],
"size": 50,
}
if not include_expired:
query["expires_after"] = datetime.now(tz=timezone.utc).isoformat()
if time_since_imported is not None:
query["imported_after"] = (
datetime.now(tz=timezone.utc) - time_since_imported
).isoformat()
for resp in context.api_client.scroll(
method="POST",
url="/leaksdb/v2/cookies/_search",
json=query,
):
# Ratelimit
time.sleep(1)
resp_data: dict = resp.json()
# Save the cursor
next_cursor: str | None = resp_data["next"]
if next_cursor:
context.save_cursor(next_cursor)
cookies: list[Cookie] = resp_data["items"]
print(f"Fetched {len(cookies)} cookies...")
for cookie in cookies:
# If this cookie is no longer valid, continue.
if not context.verify_cookie(cookie):
continue
# This cookie is valid, let's invalidate it.
context.invalidate_cookie(cookie)
def main() -> None:
api_key: str = os.environ["FLARE_API_KEY"]
tenant_id: str | None = os.environ.get("FLARE_TENANT_ID")
context: MonitorContext = MonitorContext(
api_client=FlareApiClient(
api_key=api_key,
tenant_id=int(tenant_id) if tenant_id is not None else None,
),
# Cursors
get_cursor=get_cursor,
save_cursor=save_cursor,
# Cookies
verify_cookie=verify_cookie,
invalidate_cookie=invalidate_cookie,
)
run_monitor_cookies(
context=context,
domain=os.environ.get(
"FLARE_COOKIE_DOMAIN",
"scatterholt.com",
),
cookie_name=os.environ.get(
"FLARE_COOKIE_NAME",
"session",
),
include_expired=True,
time_since_imported=timedelta(days=90),
)
if __name__ == "__main__":
main()