-
Notifications
You must be signed in to change notification settings - Fork 1
/
test.py
80 lines (68 loc) · 2.36 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from notion_caldav import *
from datetime import date, datetime
import logging
import asyncio
# setup logger
root_logger = logging.getLogger()
log_level = logging.DEBUG
root_logger.setLevel(log_level)
console = logging.StreamHandler()
console.setLevel(log_level)
tasks = load_cache()
notion = get_notion_client(log_level=logging.INFO)
async def main():
pages = await query_notion_db(notion)
# for p in pages:
# print(p.get('properties').get('Do Date'))
logging.info(f'Loaded {len(pages)} tasks from notion...')
for page in pages:
cached = next((t for t in tasks if t.notion_id == page.get('id')), None)
if cached is None:
task = Task.from_notion(page)
logging.debug(f'Created {task} from notion')
tasks.append(task)
elif utc_from_notion_stamp(page.get('last_edited_time')) > datetime.fromisoformat(cached.timestamp):
cached.update_with_notion(page)
logging.debug(f'Updated {cached} from notion')
tasks_to_notion = []
tasks_to_caldav = []
tasks_to_cache = []
for task in tasks:
logging.debug(f'Processing {task}')
page = next(
(page for page in pages if task.notion_id == page.get('id')),
None
)
todo = None
if page:
logging.debug(f'{task} found on notion')
if todo:
logging.debug(f'{task} found on caldav')
if (
task.cached and
task.source != 'notion' and
(not page or
datetime.fromisoformat(
task.timestamp
) > utc_from_notion_stamp(
page.get('last_edited_time')
))
):
tasks_to_notion.append(task)
tasks_to_cache.append(task)
logging.debug(f'{task} needs to be pushed to notion')
# elif (
# todo and
# todo
# ):
# pass
elif task.cached and (not page): # or not todo:
logging.debug(f'{task} is deleted, cleaning up')
# delete remotes
pass
else:
tasks_to_cache.append(task)
result = await asyncio.gather(*[tasks_to_notion[i].to_notion(notion) for i in range(len(tasks_to_notion))])
await notion.aclose()
dump_cache(tasks_to_cache)
asyncio.run(main())