-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathday07_dag.py
139 lines (106 loc) · 4.51 KB
/
day07_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from collections import defaultdict
from queue import PriorityQueue
import re
from typing import Any, Dict, List, NamedTuple, Set
STEP_PATTERN = r"Step (?P<curr>[A-Z]).*step (?P<next>[A-Z]).*."
TEST_INPUT = """Step C must be finished before step A can begin.
Step C must be finished before step F can begin.
Step A must be finished before step B can begin.
Step A must be finished before step D can begin.
Step B must be finished before step E can begin.
Step D must be finished before step E can begin.
Step F must be finished before step E can begin.""".split(
"\n"
)
def load_input(lines: List[str]):
downstream: Dict[str, List[str]] = defaultdict(list)
upstream: Dict[str, List[str]] = defaultdict(list)
p = re.compile(STEP_PATTERN)
for line in lines:
m = p.match(line.strip())
downstream[m.group("curr")].append(m.group("next"))
upstream[m.group("next")].append(m.group("curr"))
return upstream, downstream
# @_print("part1")
def process_steps_synchronous(upstream, downstream) -> str:
# find the one that isn't in any of the descendents
no_upstream = set(downstream.keys()) - set(upstream.keys())
# make a priority queue
queue: PriorityQueue = PriorityQueue()
for move in no_upstream:
queue.put((ord(move), move))
result = ""
visited: Set[str] = set()
to_visit: Set[str] = set()
while not queue.empty():
curr_item = queue.get()[1]
visited.add(curr_item)
result += curr_item
for next_step in downstream[curr_item]:
# are its upstream tasks done?
all_visisted = True
for prev_step in upstream[next_step]:
if prev_step not in visited:
all_visisted = False
if all_visisted and next_step not in to_visit:
to_visit.add(next_step)
queue.put((ord(next_step), next_step))
return result
def time_to_complete(letter: str, task_constant) -> int:
ASCII_DIFF = ord("A") - 1
return ord(letter) - ASCII_DIFF + task_constant
assert time_to_complete("A", task_constant=0) == 1
assert time_to_complete("B", task_constant=60) == 62
class Task(NamedTuple):
end_time: int
item: Any
def peek(queue: PriorityQueue) -> Task:
item = queue.queue[0]
return Task(item[0], item[1])
def process_time_concurrent(upstream, downstream, num_workers, const) -> int:
job_queue: PriorityQueue = PriorityQueue()
workers: PriorityQueue = PriorityQueue(maxsize=num_workers)
# run initial set of tasks (-1 b/c we start at 0)
kickoff_tasks = set(downstream.keys()) - set(upstream.keys())
for task in kickoff_tasks:
task_time = time_to_complete(task, const)
workers.put((task_time - 1, task))
curr_time = peek(workers).end_time
completed_tasks = set()
queued_tasks = set()
all_tasks = set(list(downstream) + list(upstream))
while completed_tasks != all_tasks:
# process completed tasks
while not workers.empty():
if peek(workers).end_time != curr_time:
break
finished_task = workers.get()[1]
completed_tasks.add(finished_task)
# find tasks we can add to queue
for next_task in downstream[finished_task]:
upstream_complete = True
for dependency in upstream[next_task]:
if dependency not in completed_tasks:
upstream_complete = False
break
if upstream_complete and next_task not in queued_tasks:
job_queue.put((ord(next_task), next_task))
queued_tasks.add(next_task)
# assign the workers tasks
while not job_queue.empty() and not workers.full():
next_task = job_queue.get()[1]
task_time = time_to_complete(next_task, const)
workers.put((curr_time + task_time, next_task))
# advance to time that next event is done
if not workers.empty():
curr_time = peek(workers).end_time
return curr_time + 1
test_up, test_down = load_input(TEST_INPUT)
assert process_steps_synchronous(test_up, test_down) == "CABDFE"
assert process_time_concurrent(test_up, test_down, num_workers=2, const=0) == 15
if __name__ == "__main__":
with open("data/day07_input.txt") as f:
lines = f.readlines()
upstream, downstream = load_input(lines)
print(process_steps_synchronous(upstream, downstream))
print(process_time_concurrent(upstream, downstream, num_workers=5, const=60))