-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
353 lines (300 loc) · 12.4 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
from prettytable import PrettyTable
import random
import heapq
import json
import csv
# ========================================
class PrioritySampler:
"""
A container that can sample based on priority.
"""
def __init__(self):
self._students = dict()
def add(self, student_id, priority):
# heapq is min heap, so we need to invert the priority
self._students[student_id] = -priority
def sample(self, num):
assert num > 0, "sample num must be positive int"
if num >= len(self._students):
return self.popall()
# group students by priority
priority_group = dict()
for student_id, priority in self._students.items():
priority_group.setdefault(priority, list()).append(student_id)
# create a priority queue
priorities = list(priority_group.keys())
heapq.heapify(priorities)
sampled = list()
while num > 0:
students = priority_group[heapq.heappop(priorities)]
if len(students) > num:
students = random.sample(students, num)
sampled.extend(students)
num -= len(students)
for student_id in sampled:
del self._students[student_id]
return sampled
def popall(self):
ret = list(self._students.keys())
self._students = dict()
return ret
class Option:
def __init__(self, name, limit, priority):
"""
self._full: 是否已經飽和不能再加更多學生
self._students: 已經分發中此選項的學生們
self._priority_sampler: 準備分發到此選項的學生們
"""
assert type(limit) == int and limit > 0, "limit must be positive int"
assert type(priority) == bool or \
(type(priority) == int and priority in [1, 2, 3, 4]), \
"priority must be bool or 1, 2, 3, 4"
self._name = name
self._limit = limit
self._priority = priority
self._full = False
self._students = list()
self._priority_sampler = PrioritySampler()
def add_student(self, student_id, student_data):
if self._full:
return False
# priority越大越優先
priority = 0
# 超過4年級算4年級
grade = min(student_data["grade"], 4)
# 調整priority
if self._priority is True:
priority += grade
if type(self._priority) == int and grade == self._priority:
priority += 1
# 如果已經中某些選項的話則降priority
priority -= 10 * student_data["num"]
self._priority_sampler.add(student_id, priority)
return True
def recover_from_full(self):
if self._full:
return []
space_left = self._limit - len(self._students)
students_to_be_add = self._priority_sampler.sample(space_left)
self._students.extend(students_to_be_add)
if len(self._students) == self._limit:
self._full = True
reject = self._priority_sampler.popall()
return reject
class Course:
def __init__(self, course):
"""
self._max_select(int): 每個人最多可以選到幾個option
"""
self._id = course["id"]
self._name = course["name"]
self._options = dict()
self._max_select = 1 # 一般課程每個人最多只會中一個選項
for name, config in course["options"].items():
# 必修全部都有高年級保障
if course["type"] == "必修":
option = Option(name, config["limit"], True)
# 電電實驗全部都無高年級保障
elif course["type"] == "電電實驗":
option = Option(name, config["limit"], False)
# 十選二的priority自己決定(寫在config裡)
elif course["type"] == "十選二":
option = Option(name, config["limit"], config["priority"])
self._max_select = 2 # 十選二每個人最多可以中兩個選項
else:
raise ValueError("Invalid course type!")
self._options[name] = option
def distribute(self, students, preselect):
competitors = dict()
for student in students:
selection = student["selections"][self._id]
if selection:
competitors[student["userID"]] = {
"grade": student["grade"],
"selection": selection.copy(),
"num": 0, # 這個學生已經選中幾個選項
}
# 處理三保一、數電等先抽的課
if self._id == "Ten-Select-Two":
self.deal_with_preselect(preselect, competitors)
# 分發開始
wish_index = 0 # 現在正在排的是第幾志願
while competitors:
# 把大家的志願填進選項裡
for student_id, student_data in competitors.items():
wish = student_data["selection"][wish_index]
# 以下這兩行是三保一的特殊處理
if wish is None:
continue
if self._options[wish].add_student(student_id, student_data):
student_data["num"] += 1
# 如果選項爆了的話要抽籤
for option in self._options.values():
reject = option.recover_from_full()
for student_id in reject:
competitors[student_id]["num"] -= 1
# 把已經得到最大可中選項的人或已經用完志願的人移除
to_be_delete = list()
for student_id, student_data in competitors.items():
if student_data["num"] > self._max_select:
raise ValueError("num exceeds max_select")
if (student_data["num"] == self._max_select or
len(student_data["selection"]) <= wish_index + 1):
to_be_delete.append(student_id)
for student_id in to_be_delete:
del competitors[student_id]
wish_index += 1
def deal_with_preselect(self, preselect, competitors):
"""
處理三保一、數電等先抽的課
Note: competitors will be modified
"""
# 把中數電的人填進數電實驗
self._options["數電實驗"] = Option("數電實驗", 99999, False)
to_be_delete = list()
for student_id, option_names in preselect.items():
if "數電實驗" in option_names:
self._options["數電實驗"]._students.append(student_id)
if student_id in competitors:
competitors[student_id]["num"] += 1
option_names.remove("數電實驗")
for student_id, option_names in preselect.items():
if not option_names:
to_be_delete.append(student_id)
for student_id in to_be_delete:
del preselect[student_id]
# 處理三保一
for student_id, option_names in preselect.items():
student_data = competitors[student_id]
for option_name in option_names:
self._options[option_name].add_student(student_id, student_data)
student_data["num"] += 1
# 把原志願序的該格變成None
idx = student_data["selection"].index(option_name)
student_data["selection"][idx] = None
for option in self._options.values():
reject = option.recover_from_full()
assert reject == [], "preselect should not be rejected"
# 清理(把已經得到最大可中選項的人或已經用完志願的人移除)
to_be_delete = list()
for student_id, student_data in competitors.items():
if student_data["num"] > self._max_select:
raise ValueError("num exceeds max_select")
if student_data["num"] == self._max_select:
to_be_delete.append(student_id)
for student_id in to_be_delete:
del competitors[student_id]
# ========================================
def read_courses():
with open('./data/courses.json') as fin:
courses = json.load(fin)
return [Course(course) for course in courses]
def read_selections():
with open('./secret-data/selections.json') as fin:
students = json.load(fin)
for student in students:
student["userID"] = student["userID"].upper()
return students
def read_preselect():
with open('./secret-data/preselect.json') as fin:
preselect = json.load(fin)
return preselect
# ========================================
def analyze(courses):
students = read_selections()
preselect = read_preselect()
students_dict = dict()
for student in students:
students_dict[student["userID"]] = student
course_analyses = dict()
for course in courses:
competitors = dict()
for student_id, student in students_dict.items():
selection = student["selections"][course._id]
if selection:
competitors[student_id] = set()
course_analyses[course._name] = competitors
for course in courses:
for option in course._options.values():
for student_id in option._students:
if (student_id in preselect and
option._name in preselect[student_id]):
continue
student = students_dict[student_id]
selection = student["selections"][course._id]
wish_idx = selection.index(option._name)
course_analyses[course._name][student_id].add(wish_idx + 1)
rows = list()
# 有人中到的最大志願
max_wish = 1
for students in course_analyses.values():
for wishes in students.values():
if not wishes:
continue
max_wish = max(max_wish, max(wishes))
header = ["課程名稱", "中0個", "中1個", "中2個"]
header.extend([f"第{i}志願" for i in range(1, max_wish + 1)])
rows.append(header)
if "十選二實驗" in course_analyses:
print("不包含數電、三保一:")
for course_name, students in course_analyses.items():
row = [0] * len(header)
row[0] = course_name
for wishes in students.values():
assert len(wishes) <= 2, "max wish is 2"
row[len(wishes) + 1] += 1
for wish in wishes:
row[wish + 3] += 1
rows.append(row)
print_table(rows)
if "十選二實驗" in course_analyses:
print("包含數電、三保一之後的十選二(不含只報名數電沒選十選二的):")
ten_select_two = course_analyses["十選二實驗"]
for student_id in preselect:
for option in preselect[student_id]:
if option == "數電實驗":
continue
wishes = students_dict[student_id]["selections"]["Ten-Select-Two"]
wish_idx = wishes.index(option)
assert (wish_idx + 1) not in ten_select_two[student_id]
ten_select_two[student_id].add(wish_idx + 1)
rows = list()
rows.append(header)
row = [0] * len(header)
row[0] = "十選二實驗"
for wishes in ten_select_two.values():
assert len(wishes) <= 2, "max wish is 2"
row[len(wishes) + 1] += 1
for wish in wishes:
row[wish + 3] += 1
rows.append(row)
print_table(rows)
def print_table(table):
x = PrettyTable()
x.field_names = table[0]
for row in table[1:]:
x.add_row(row)
print(x)
def main():
courses = read_courses()
students = read_selections()
preselect = read_preselect()
for course in courses:
course.distribute(students, preselect)
# export
rows = list()
rows.append(["student_id", "course_name", "teacher"])
for course in courses:
for option in course._options.values():
for student_id in sorted(option._students):
if course._id == "Ten-Select-Two":
rows.append([student_id, option._name, ""])
else:
rows.append([student_id, course._name, option._name])
with open("secret-data/result.csv", "w", newline="") as fout:
writer = csv.writer(fout)
writer.writerows(rows)
analyze(courses)
# ========================================
if __name__ == '__main__':
main()