-
Notifications
You must be signed in to change notification settings - Fork 1
/
cqm_solution.py
78 lines (61 loc) · 2.36 KB
/
cqm_solution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from dimod import cqm_to_bqm, Binary, Integer, ConstrainedQuadraticModel
from dwave.system import LeapHybridBQMSampler, LeapHybridCQMSampler
import numpy as np
from parse import parse
from pprint import pprint
import time
def cqm_solution(cost_df, jobs, runtimes, paths, deadline, debug = False):
cqm = ConstrainedQuadraticModel()
xs = []
job_count = len(jobs)
for machines in range(len(cost_df.columns)):
x = [Binary(f'm{machines}_x{i}') for i in range(job_count)]
xs.extend(x)
# Problem definition
model = None
for cost, variable in zip(np.array(cost_df).flatten(), xs):
if model:
model = model + cost * variable
else:
model = cost * variable
cqm.set_objective(model)
# Must use exactly one machine
for i in range(job_count):
one_machine = None
for j in range(i, len(xs), job_count):
if one_machine:
one_machine = one_machine + xs[j]
else:
one_machine = xs[j]
cqm.add_constraint( one_machine == 1)
# All paths finish before dedline
flat_runtimes = [(runtime, name) for n, machine_runtimes in runtimes.items() for runtime, name in zip(machine_runtimes, [j.name for j in jobs])]
for path in paths:
path_runtime = None
for var, (runtime, name) in zip(xs, flat_runtimes):
if name not in path:
continue
if path_runtime:
path_runtime = path_runtime + runtime * var
else:path_runtime = runtime * var
cqm.add_constraint(path_runtime <= deadline)
sampler_cqm = LeapHybridCQMSampler()
start = time.time()
solution = sampler_cqm.sample_cqm(cqm,time_limit=5)
end = time.time()
# if debug:
# pprint(solution.info)
def is_correct_solution(cqm, sol):
return len(cqm.violations(sol, skip_satisfied = True)) == 0
correct_solutions = [ s for s in solution if is_correct_solution(cqm, s)]
# if debug:
# pprint(solution.info)
best_solution = correct_solutions[0]
machine_names = cost_df.columns
job_names = cost_df.index
actual_solution = {}
for k, is_used in best_solution.items():
machine, var = parse('m{}_x{}', k)
if is_used:
actual_solution[jobs[int(var)].name] = machine_names[int(machine)]
return actual_solution