Skip to content

Commit

Permalink
Add batched disk functionality to route choice again, and style
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Moss committed Aug 1, 2024
1 parent e413cb1 commit e69651f
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 266 deletions.
52 changes: 37 additions & 15 deletions aequilibrae/paths/cython/coo_demand.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ cdef class GeneralisedCOODemand:

new_dfs.append(df.select_dtypes(["float64", "float32"]))

self.df = pd.concat(new_dfs, axis=1).fillna(fill).sort_index()
self.df = pd.concat(new_dfs, axis=1).fillna(fill).sort_index(level=0)

def add_matrix(self, matrix: AequilibraeMatrix, shape=None, fill: float = 0.0):
"""
Expand All @@ -67,43 +67,65 @@ cdef class GeneralisedCOODemand:
self.add_df(dfs, shape=shape, fill=fill)
logging.info(f"There {len(self.df):,} are OD pairs with non-zero flows")

def _initalise_c_data(self):
self.ods = self.df.index # MultiIndex[int, int] -> vector[pair[long long, long long]]
def _initalise_col_names(self):
"""
This function is a bit of a hack to allow keeping the same LinkLoadingResults between batched result
writes. We just need to know how many columns of each type there are
"""
self.f64_names = []
self.f32_names = []
for col in self.df:
if self.df.dtypes[col] == "float64":
self.f64_names.append(col)
elif self.df.dtypes[col] == "float32":
self.f32_names.append(col)
else:
raise TypeError(f"non-floating point column ({col}) in df. Something has gone wrong")

def _initalise_c_data(self, df = None):
if df is None:
df = self.df
else:
assert all(self.df.columns == df.columns)

self.ods = df.index # MultiIndex[int, int] -> vector[pair[long long, long long]]

self.f64.clear()
self.f32.clear()
self.f64_names = []
self.f32_names = []

cdef:
double[::1] f64_array
float[::1] f32_array
vector[double] *f64_vec
vector[float] *f32_vec

for col in self.df:
if self.df.dtypes[col] == "float64":
f64_array = self.df[col].to_numpy()
self.f64_names.append(col)
for col in df:
if df.dtypes[col] == "float64":
f64_array = df[col].to_numpy()

# The unique pointer will take ownership of this allocation
f64_vec = new vector[double]()
f64_vec.insert(f64_vec.begin(), &f64_array[0], &f64_array[0] + len(f64_array))
self.f64.emplace_back(f64_vec) # From here f63_vec should not be accessed. It is owned by the unique pointer
# From here f63_vec should not be accessed. It is owned by the unique pointer
self.f64.emplace_back(f64_vec)

elif self.df.dtypes[col] == "float32":
f32_array = self.df[col].to_numpy()
self.f32_names.append(col)
elif df.dtypes[col] == "float32":
f32_array = df[col].to_numpy()

# The unique pointer will take ownership of this allocation
f32_vec = new vector[float]()
f32_vec.insert(f32_vec.begin(), &f32_array[0], &f32_array[0] + len(f32_array))
self.f32.emplace_back(f32_vec) # From here f32_vec should not be accessed. It is owned by the unique pointer
# From here f32_vec should not be accessed. It is owned by the unique pointer
self.f32.emplace_back(f32_vec)
else:
raise TypeError(f"non-floating point column ({col}) in self.df. Something has gone wrong")
raise TypeError(f"non-floating point column ({col}) in df. Something has gone wrong")

def no_demand(GeneralisedCOODemand self) -> bool:
return len(self.df.columns) == 0

def is_empty(self) -> bool:
return self.df.index.empty

def batches(self):
self.df = self.df.sort_index(level=0)
return self.df.groupby(level=0) # Group by the origin in the multi-index
120 changes: 81 additions & 39 deletions aequilibrae/paths/cython/route_choice_link_loading_results.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ include 'parallel_numpy.pyx'

# See note in route_choice_set.pxd
cdef class LinkLoadingResults:
def __cinit__(self, demand: GeneralisedCOODemand, select_links: Dict[str, FrozenSet[FrozenSet[int]]], num_links: int, threads: int):
def __cinit__(
self,
demand: GeneralisedCOODemand,
select_links: Dict[str, FrozenSet[FrozenSet[int]]],
num_links: int,
threads: int
):
if threads <= 0:
raise ValueError(f"threads must be positive ({threads})")
elif num_links <= 0:
Expand All @@ -30,9 +36,9 @@ cdef class LinkLoadingResults:
self.f64_link_loading_threaded.reserve(threads)
for i in range(threads):
f64_demand_cols = new vector[unique_ptr[vector[double]]]()
f64_demand_cols.reserve(self.demand.f64.size())
f64_demand_cols.reserve(len(self.demand.f64_names))

for j in range(self.demand.f64.size()):
for j in range(len(self.demand.f64_names)):
f64_demand_cols.emplace_back(new vector[double](self.num_links))

self.f64_link_loading_threaded.emplace_back(f64_demand_cols)
Expand All @@ -41,9 +47,9 @@ cdef class LinkLoadingResults:
self.f32_link_loading_threaded.reserve(threads)
for i in range(threads):
f32_demand_cols = new vector[unique_ptr[vector[float]]]()
f32_demand_cols.reserve(self.demand.f32.size())
f32_demand_cols.reserve(len(self.demand.f32_names))

for j in range(self.demand.f32.size()):
for j in range(len(self.demand.f32_names)):
f32_demand_cols.emplace_back(new vector[float](self.num_links))

self.f32_link_loading_threaded.emplace_back(f32_demand_cols)
Expand Down Expand Up @@ -94,9 +100,9 @@ cdef class LinkLoadingResults:

for j in range(self.select_link_sets.size()):
f64_sl_demand_cols = new vector[unique_ptr[vector[double]]]()
f64_sl_demand_cols.reserve(self.demand.f64.size())
f64_sl_demand_cols.reserve(len(self.demand.f64_names))

for k in range(self.demand.f64.size()):
for k in range(len(self.demand.f64_names)):
f64_sl_demand_cols.emplace_back(new vector[double](self.num_links))

f64_sl_select_link_sets.emplace_back(f64_sl_demand_cols)
Expand All @@ -111,9 +117,9 @@ cdef class LinkLoadingResults:

for j in range(self.select_link_sets.size()):
f32_sl_demand_cols = new vector[unique_ptr[vector[float]]]()
f32_sl_demand_cols.reserve(self.demand.f32.size())
f32_sl_demand_cols.reserve(len(self.demand.f32_names))

for k in range(self.demand.f32.size()):
for k in range(len(self.demand.f32_names)):
f32_sl_demand_cols.emplace_back(new vector[float](self.num_links))

f32_sl_select_link_sets.emplace_back(f32_sl_demand_cols)
Expand All @@ -134,12 +140,12 @@ cdef class LinkLoadingResults:
self.f64_sl_od_matrix_threaded.reserve(threads)
for i in range(threads):
f64_sl_od_matrix_sets = new vector[unique_ptr[vector[COO_f64_struct]]]()
f64_sl_od_matrix_sets.reserve(self.demand.f64.size())
f64_sl_od_matrix_sets.reserve(len(self.demand.f64_names))

for j in range(self.select_link_sets.size()):
f64_sl_od_matrix_demand_cols = new vector[COO_f64_struct](self.demand.f64.size())
f64_sl_od_matrix_demand_cols = new vector[COO_f64_struct](len(self.demand.f64_names))

for k in range(self.demand.f64.size()):
for k in range(len(self.demand.f64_names)):
COO.init_f64_struct(d(f64_sl_od_matrix_demand_cols)[k])

f64_sl_od_matrix_sets.emplace_back(f64_sl_od_matrix_demand_cols)
Expand All @@ -150,12 +156,12 @@ cdef class LinkLoadingResults:
self.f32_sl_od_matrix_threaded.reserve(threads)
for i in range(threads):
f32_sl_od_matrix_sets = new vector[unique_ptr[vector[COO_f32_struct]]]()
f32_sl_od_matrix_sets.reserve(self.demand.f32.size())
f32_sl_od_matrix_sets.reserve(len(self.demand.f32_names))

for j in range(self.select_link_sets.size()):
f32_sl_od_matrix_demand_cols = new vector[COO_f32_struct](self.demand.f32.size())
f32_sl_od_matrix_demand_cols = new vector[COO_f32_struct](len(self.demand.f32_names))

for k in range(self.demand.f32.size()):
for k in range(len(self.demand.f32_names)):
COO.init_f32_struct(d(f32_sl_od_matrix_demand_cols)[k])

f32_sl_od_matrix_sets.emplace_back(f32_sl_od_matrix_demand_cols)
Expand All @@ -167,14 +173,18 @@ cdef class LinkLoadingResults:

cdef object link_loading_to_objects(self, long long[:] compressed_id_view, int cores):
if self.link_loading_objects is None:
self.link_loading_objects = dict(zip(*self.apply_generic_link_loading(self.f64_link_loading, self.f32_link_loading, compressed_id_view, cores)))
self.link_loading_objects = dict(zip(*self.apply_generic_link_loading(
self.f64_link_loading, self.f32_link_loading, compressed_id_view, cores
)))
return self.link_loading_objects

cdef object sl_link_loading_to_objects(self, long long[:] compressed_id_view, int cores):
if self.sl_link_loading_objects is None:
results = []
for i in range(self.select_link_sets.size()):
results.append(dict(zip(*self.apply_generic_link_loading(d(self.f64_sl_link_loading[i]), d(self.f32_sl_link_loading[i]), compressed_id_view, cores))))
results.append(dict(zip(*self.apply_generic_link_loading(
d(self.f64_sl_link_loading[i]), d(self.f32_sl_link_loading[i]), compressed_id_view, cores
))))
self.sl_link_loading_objects = dict(zip(self.select_link_set_names, results))
return self.sl_link_loading_objects

Expand All @@ -199,7 +209,7 @@ cdef class LinkLoadingResults:

double f64_demand, f64_load
float f32_demand, f32_load
size_t i, j, k
size_t i, j

# For each demand column
for i in range(self.demand.f64.size()):
Expand Down Expand Up @@ -237,7 +247,8 @@ cdef class LinkLoadingResults:
@cython.initializedcheck(False)
cdef void reduce_link_loading(LinkLoadingResults self):
"""
NOTE: doesn't require the GIL but should NOT be called in a multithreaded environment. Thus the function requires the GIL.
NOTE: doesn't require the GIL but should NOT be called in a multithreaded environment. Thus the function
requires the GIL.
"""
cdef:
vector[unique_ptr[vector[double]]] *f64_ll_cols
Expand Down Expand Up @@ -305,7 +316,6 @@ cdef class LinkLoadingResults:
double[:, :] f64_actual
float[:, :] f32_ll_view
float[:, :] f32_actual
size_t length

f64_ll_vectors = []
for i in range(f64_link_loading.size()):
Expand Down Expand Up @@ -345,7 +355,8 @@ cdef class LinkLoadingResults:

f32_ll_vectors.append(np.asarray(f32_actual).reshape(-1))

return itertools.chain(self.demand.f64_names, self.demand.f32_names), itertools.chain(f64_ll_vectors, f32_ll_vectors)
return itertools.chain(self.demand.f64_names, self.demand.f32_names), \
itertools.chain(f64_ll_vectors, f32_ll_vectors)

@cython.wraparound(False)
@cython.embedsignature(True)
Expand All @@ -364,11 +375,8 @@ cdef class LinkLoadingResults:
"""

cdef:
bool set_present
vector[size_t] link_counts
size_t select_link_set_idx
long long link
unordered_set[long long] *and_set

# We count the number of links within the AND set. Assuming we only see unique links, then if that count goes to
# 0 we have seen all links present within the AND set. A shortest-path will contain only unique links in
Expand Down Expand Up @@ -404,23 +412,31 @@ cdef class LinkLoadingResults:
cdef:
# Cython doesn't allow declaring references to objects outside of function signatures. So we get the raw
# pointer instead. It is still owned by the unique_ptr.
vector[unique_ptr[vector[unique_ptr[vector[double]]]]] *f64_ll_sets_cols = self.f64_sl_link_loading_threaded[thread_id].get()
vector[unique_ptr[vector[unique_ptr[vector[float]]]]] *f32_ll_sets_cols = self.f32_sl_link_loading_threaded[thread_id].get()
vector[unique_ptr[vector[COO_f64_struct]]] *f64_od_sets_cols = self.f64_sl_od_matrix_threaded[thread_id].get()
vector[unique_ptr[vector[COO_f32_struct]]] *f32_od_sets_cols = self.f32_sl_od_matrix_threaded[thread_id].get()
vector[unique_ptr[vector[unique_ptr[vector[double]]]]] *f64_ll_sets_cols = \
self.f64_sl_link_loading_threaded[thread_id].get()
vector[unique_ptr[vector[unique_ptr[vector[float]]]]] *f32_ll_sets_cols = \
self.f32_sl_link_loading_threaded[thread_id].get()
vector[unique_ptr[vector[COO_f64_struct]]] *f64_od_sets_cols = \
self.f64_sl_od_matrix_threaded[thread_id].get()
vector[unique_ptr[vector[COO_f32_struct]]] *f32_od_sets_cols = \
self.f32_sl_od_matrix_threaded[thread_id].get()
vector[double] *f64_ll
vector[float] *f32_ll

double f64_demand, f64_load
float f32_demand, f32_load
double f64_load
float f32_load
size_t i, j, k

# For each select link set
for i in range(self.select_link_sets.size()):
# for each route in the route set
for j in range(route_set.size()):
# if this route satisfies this link set, then we link load
if not LinkLoadingResults.is_in_select_link_set(d(route_set[j]), d(self.select_link_sets[i]), d(self.select_link_set_lengths[i])):
if not LinkLoadingResults.is_in_select_link_set(
d(route_set[j]),
d(self.select_link_sets[i]),
d(self.select_link_set_lengths[i])
):
continue

# For each demand column
Expand Down Expand Up @@ -457,7 +473,8 @@ cdef class LinkLoadingResults:
@cython.initializedcheck(False)
cdef void reduce_sl_link_loading(LinkLoadingResults self):
"""
NOTE: doesn't require the GIL but should NOT be called in a multithreaded environment. Thus the function requires the GIL.
NOTE: doesn't require the GIL but should NOT be called in a multithreaded environment. Thus the function
requires the GIL.
"""
cdef:
vector[unique_ptr[vector[unique_ptr[vector[double]]]]] *f64_sl_ll_sets_cols
Expand Down Expand Up @@ -538,7 +555,8 @@ cdef class LinkLoadingResults:
@cython.initializedcheck(False)
cdef void reduce_sl_od_matrix(LinkLoadingResults self):
"""
NOTE: doesn't require the GIL but should NOT be called in a multithreaded environment. Thus the function requires the GIL.
NOTE: doesn't require the GIL but should NOT be called in a multithreaded environment. Thus the function
requires the GIL.
"""
cdef:
vector[unique_ptr[vector[COO_f64_struct]]] *f64_sl_od_sets_cols
Expand Down Expand Up @@ -587,9 +605,21 @@ cdef class LinkLoadingResults:
f64_sl_od_result = &d(self.f64_sl_od_matrix[i])[j]
f64_sl_od = &d(f64_sl_od_cols)[j]

d(f64_sl_od_result.row).insert(d(f64_sl_od_result.row).end(), d(f64_sl_od.row).cbegin(), d(f64_sl_od.row).cend())
d(f64_sl_od_result.col).insert(d(f64_sl_od_result.col).end(), d(f64_sl_od.col).cbegin(), d(f64_sl_od.col).cend())
d(f64_sl_od_result.f64_data).insert(d(f64_sl_od_result.f64_data).end(), d(f64_sl_od.f64_data).begin(), d(f64_sl_od.f64_data).end())
d(f64_sl_od_result.row).insert(
d(f64_sl_od_result.row).end(),
d(f64_sl_od.row).cbegin(),
d(f64_sl_od.row).cend()
)
d(f64_sl_od_result.col).insert(
d(f64_sl_od_result.col).end(),
d(f64_sl_od.col).cbegin(),
d(f64_sl_od.col).cend()
)
d(f64_sl_od_result.f64_data).insert(
d(f64_sl_od_result.f64_data).end(),
d(f64_sl_od.f64_data).begin(),
d(f64_sl_od.f64_data).end()
)

# Then we do it all over again for the floats.
for thread_id in range(self.f32_sl_od_matrix_threaded.size()):
Expand All @@ -605,9 +635,21 @@ cdef class LinkLoadingResults:
f32_sl_od_result = &d(self.f32_sl_od_matrix[i])[j]
f32_sl_od = &d(f32_sl_od_cols)[j]

d(f32_sl_od_result.row).insert(d(f32_sl_od_result.row).end(), d(f32_sl_od.row).cbegin(), d(f32_sl_od.row).cend())
d(f32_sl_od_result.col).insert(d(f32_sl_od_result.col).end(), d(f32_sl_od.col).cbegin(), d(f32_sl_od.col).cend())
d(f32_sl_od_result.f32_data).insert(d(f32_sl_od_result.f32_data).end(), d(f32_sl_od.f32_data).begin(), d(f32_sl_od.f32_data).end())
d(f32_sl_od_result.row).insert(
d(f32_sl_od_result.row).end(),
d(f32_sl_od.row).cbegin(),
d(f32_sl_od.row).cend()
)
d(f32_sl_od_result.col).insert(
d(f32_sl_od_result.col).end(),
d(f32_sl_od.col).cbegin(),
d(f32_sl_od.col).cend()
)
d(f32_sl_od_result.f32_data).insert(
d(f32_sl_od_result.f32_data).end(),
d(f32_sl_od.f32_data).begin(),
d(f32_sl_od.f32_data).end()
)

# Here we discard all the intermediate results
self.f64_sl_od_matrix_threaded.clear()
Expand Down
11 changes: 0 additions & 11 deletions aequilibrae/paths/cython/route_choice_set.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,6 @@ cdef class RouteChoiceSet:
# unsigned int cores
# ) noexcept nogil

# cdef vector[double] *apply_link_loading(RouteChoiceSet self, double[:, :] matrix_view) noexcept nogil
# cdef vector[double] *apply_link_loading_from_path_files(RouteChoiceSet self, double[:, :] matrix_view, vector[vector[double] *] &path_files) noexcept nogil
# cdef apply_link_loading_func(RouteChoiceSet self, vector[double] *ll, int cores)

# cdef vector[double] *apply_select_link_loading(
# RouteChoiceSet self,
# COO sparse_mat,
# double[:, :] matrix_view,
# unordered_set[long] &select_link_set
# ) noexcept nogil


cdef class Checkpoint:
cdef:
Expand Down
Loading

0 comments on commit e69651f

Please sign in to comment.