Skip to content

Commit

Permalink
feat: sparse horizon handler for vdf (#276)
Browse files Browse the repository at this point in the history
* feat: sparse horizon handler for vdf

* small fix

* feat: vdf sparse horizon handler - file writing and reading (#278)

* feat: Using VDFSparseHorizonHandler by default

* fix: file writing and reading

* feat: towards testing VDFSparseHorizonHandler VS VDFHorizonHandler

* chore: cleanup

* feat: comparing output events of VDFHorizonHandler and VDFSparseHorizonHandler

---------

Co-authored-by: tkchouaki <[email protected]>

* chore: comparing plans instead of events

---------

Co-authored-by: Tarek Chouaki <[email protected]>
Co-authored-by: tkchouaki <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent 678fbb0 commit 7ed1950
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class VDFConfigGroup extends ReflectiveConfigGroup {
private int writeFlowInterval = 0;

public enum HandlerType {
Horizon, Interpolation
Horizon, Interpolation, SparseHorizon
}

private HandlerType handler = HandlerType.Horizon;
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/eqasim/core/simulation/vdf/VDFModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.eqasim.core.simulation.mode_choice.AbstractEqasimExtension;
import org.eqasim.core.simulation.vdf.handlers.VDFHorizonHandler;
import org.eqasim.core.simulation.vdf.handlers.VDFInterpolationHandler;
import org.eqasim.core.simulation.vdf.handlers.VDFSparseHorizonHandler;
import org.eqasim.core.simulation.vdf.handlers.VDFTrafficHandler;
import org.eqasim.core.simulation.vdf.travel_time.VDFTravelTime;
import org.eqasim.core.simulation.vdf.travel_time.function.BPRFunction;
Expand Down Expand Up @@ -41,6 +42,10 @@ protected void installEqasimExtension() {
bind(VDFTrafficHandler.class).to(VDFHorizonHandler.class);
addEventHandlerBinding().to(VDFHorizonHandler.class);
break;
case SparseHorizon:
bind(VDFTrafficHandler.class).to(VDFSparseHorizonHandler.class);
addEventHandlerBinding().to(VDFSparseHorizonHandler.class);
break;
case Interpolation:
bind(VDFTrafficHandler.class).to(VDFInterpolationHandler.class);
addEventHandlerBinding().to(VDFInterpolationHandler.class);
Expand Down Expand Up @@ -85,6 +90,12 @@ public VDFHorizonHandler provideVDFHorizonHandler(VDFConfigGroup config, Network
return new VDFHorizonHandler(network, scope, config.getHorizon(), getConfig().global().getNumberOfThreads());
}

@Provides
@Singleton
public VDFSparseHorizonHandler provideVDFSparseHorizonHandler(VDFConfigGroup config, Network network, VDFScope scope) {
return new VDFSparseHorizonHandler(network, scope, config.getHorizon(), getConfig().global().getNumberOfThreads());
}

@Provides
@Singleton
public VDFInterpolationHandler provideVDFInterpolationHandler(VDFConfigGroup config, Network network,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
package org.eqasim.core.simulation.vdf.handlers;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eqasim.core.simulation.vdf.VDFScope;
import org.eqasim.core.simulation.vdf.io.VDFReaderInterface;
import org.eqasim.core.simulation.vdf.io.VDFWriterInterface;
import org.matsim.api.core.v01.Id;
import org.matsim.api.core.v01.IdMap;
import org.matsim.api.core.v01.events.LinkEnterEvent;
import org.matsim.api.core.v01.events.handler.LinkEnterEventHandler;
import org.matsim.api.core.v01.network.Link;
import org.matsim.api.core.v01.network.Network;
import org.matsim.core.utils.io.IOUtils;

import com.google.common.base.Verify;

public class VDFSparseHorizonHandler implements VDFTrafficHandler, LinkEnterEventHandler {
private final VDFScope scope;

private final Network network;
private final int horizon;
private final int numberOfThreads;

private final IdMap<Link, List<Double>> counts = new IdMap<>(Link.class);

private final static Logger logger = LogManager.getLogger(VDFSparseHorizonHandler.class);

private record LinkState(List<Integer> time, List<Double> count) {
}

private List<IdMap<Link, LinkState>> state = new LinkedList<>();

public VDFSparseHorizonHandler(Network network, VDFScope scope, int horizon, int numberOfThreads) {
this.scope = scope;
this.network = network;
this.horizon = horizon;
this.numberOfThreads = numberOfThreads;

for (Id<Link> linkId : network.getLinks().keySet()) {
counts.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0)));
}
}

@Override
public synchronized void handleEvent(LinkEnterEvent event) {
processEnterLink(event.getTime(), event.getLinkId());
}

public void processEnterLink(double time, Id<Link> linkId) {
int i = scope.getIntervalIndex(time);
double currentValue = counts.get(linkId).get(i);
counts.get(linkId).set(i, currentValue + 1);
}

@Override
public IdMap<Link, List<Double>> aggregate(boolean ignoreIteration) {
while (state.size() > horizon) {
state.remove(0);
}

logger.info(String.format("Starting aggregation of %d slices", state.size()));

// Transform counts into state object
if (!ignoreIteration) {
IdMap<Link, LinkState> newState = new IdMap<>(Link.class);
state.add(newState);

for (Map.Entry<Id<Link>, List<Double>> entry : counts.entrySet()) {
double total = 0.0;

for (double value : entry.getValue()) {
total += value;
}

if (total > 0.0) {
LinkState linkState = new LinkState(new ArrayList<>(), new ArrayList<>());
newState.put(entry.getKey(), linkState);

int timeIndex = 0;
for (double count : entry.getValue()) {
if (count > 0.0) {
linkState.time.add(timeIndex);
linkState.count.add(count);
}

timeIndex++;
}
}
}
}

IdMap<Link, List<Double>> aggregated = new IdMap<>(Link.class);

for (Id<Link> linkId : network.getLinks().keySet()) {
// Reset current counts
counts.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0)));

// Initialize aggregated counts
aggregated.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0)));
}

// Aggregate
Iterator<Id<Link>> linkIterator = network.getLinks().keySet().iterator();

Runnable worker = () -> {
Id<Link> currentLinkId = null;

while (true) {
// Fetch new link in queue
synchronized (linkIterator) {
if (linkIterator.hasNext()) {
currentLinkId = linkIterator.next();
} else {
break; // Done
}
}

// Go through history for this link and aggregate by time slot
for (int k = 0; k < state.size(); k++) {
LinkState historyItem = state.get(k).get(currentLinkId);
List<Double> linkAggregator = aggregated.get(currentLinkId);

if (historyItem != null) {
for (int i = 0; i < historyItem.count.size(); i++) {
int timeIndex = historyItem.time.get(i);
linkAggregator.set(timeIndex,
linkAggregator.get(timeIndex) + historyItem.count.get(i) / (double) state.size());
}
}
}
}
};

if (numberOfThreads < 2) {
worker.run();
} else {
List<Thread> threads = new ArrayList<>(numberOfThreads);

for (int k = 0; k < numberOfThreads; k++) {
threads.add(new Thread(worker));
}

for (int k = 0; k < numberOfThreads; k++) {
threads.get(k).start();
}

try {
for (int k = 0; k < numberOfThreads; k++) {
threads.get(k).join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

logger.info(String.format(" Finished aggregation"));

return aggregated;
}

@Override
public VDFReaderInterface getReader() {
return new Reader();
}

@Override
public VDFWriterInterface getWriter() {
return new Writer();
}

public class Reader implements VDFReaderInterface {
@Override
public void readFile(URL inputFile) {
state.clear();

try {
DataInputStream inputStream = new DataInputStream(IOUtils.getInputStream(inputFile));

Verify.verify(inputStream.readDouble() == scope.getStartTime());
Verify.verify(inputStream.readDouble() == scope.getEndTime());
Verify.verify(inputStream.readDouble() == scope.getIntervalTime());
Verify.verify(inputStream.readInt() == scope.getIntervals());
Verify.verify(inputStream.readInt() == horizon);

int slices = (int) inputStream.readInt();
int links = (int) inputStream.readInt();

List<Id<Link>> linkIds = new ArrayList<>(links);
for (int linkIndex = 0; linkIndex < links; linkIndex++) {
linkIds.add(Id.createLinkId(inputStream.readUTF()));
}

logger.info(String.format("Loading %d slices with %d links", slices, links));

for (int sliceIndex = 0; sliceIndex < slices; sliceIndex++) {
IdMap<Link, LinkState> slice = new IdMap<>(Link.class);
state.add(slice);

int sliceLinkCount = inputStream.readInt();

logger.info(String.format("Slice %d/%d, Reading %d link states", sliceIndex+1, slices, sliceLinkCount));

for (int sliceLinkIndex = 0; sliceLinkIndex < sliceLinkCount; sliceLinkIndex++) {
int linkIndex = inputStream.readInt();
int linkStateSize = inputStream.readInt();

LinkState linkState = new LinkState(new ArrayList<>(linkStateSize),
new ArrayList<>(linkStateSize));
slice.put(linkIds.get(linkIndex), linkState);

for (int i = 0; i < linkStateSize; i++) {
linkState.time.add(inputStream.readInt());
linkState.count.add(inputStream.readDouble());
}
}

logger.info(String.format(" Slice %d: %d obs", sliceIndex,
sliceLinkCount));
}

Verify.verify(inputStream.available() == 0);
inputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

public class Writer implements VDFWriterInterface {
@Override
public void writeFile(File outputFile) {
try {
DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(outputFile.toString()));

outputStream.writeDouble(scope.getStartTime());
outputStream.writeDouble(scope.getEndTime());
outputStream.writeDouble(scope.getIntervalTime());
outputStream.writeInt(scope.getIntervals());
outputStream.writeInt(horizon);
outputStream.writeInt(state.size());
outputStream.writeInt(counts.size());

List<Id<Link>> linkIds = new ArrayList<>(counts.keySet());
for (int linkIndex = 0; linkIndex < linkIds.size(); linkIndex++) {
outputStream.writeUTF(linkIds.get(linkIndex).toString());
}

logger.info(String.format("About to write %d slices", state.size()));

for (int sliceIndex = 0; sliceIndex < state.size(); sliceIndex++) {
IdMap<Link, LinkState> slice = state.get(sliceIndex);
outputStream.writeInt(slice.size());

int sliceLinkIndex = 0;
for (Id<Link> linkId : linkIds) {
LinkState linkState = slice.get(linkId);
if(linkState == null) {
continue;
}
outputStream.writeInt(linkIds.indexOf(linkId));
outputStream.writeInt(linkState.count.size());

for (int i = 0; i < linkState.count.size(); i++) {
outputStream.writeInt(linkState.time.get(i));
outputStream.writeDouble(linkState.count.get(i));
}
sliceLinkIndex += 1;
}
assert sliceLinkIndex == slice.size();
}

outputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.eqasim.core.simulation.vdf.utils;

import org.eqasim.core.components.config.EqasimConfigGroup;
import org.eqasim.core.simulation.EqasimConfigurator;
import org.eqasim.core.simulation.vdf.VDFConfigGroup;
import org.eqasim.core.simulation.vdf.engine.VDFEngineConfigGroup;
Expand All @@ -25,9 +24,8 @@ public static void adaptConfigForVDF(Config config, boolean engine) {
VDFConfigGroup.getOrCreate(config).setWriteInterval(1);
VDFConfigGroup.getOrCreate(config).setWriteFlowInterval(1);

// VDF: Set capacity factor instead (We retrieve it form the Eqasim config group)
EqasimConfigGroup eqasimConfigGroup = (EqasimConfigGroup) config.getModules().get(EqasimConfigGroup.GROUP_NAME);
VDFConfigGroup.getOrCreate(config).setCapacityFactor(eqasimConfigGroup.getSampleSize());
VDFConfigGroup vdfConfigGroup = VDFConfigGroup.getOrCreate(config);
vdfConfigGroup.setHandler(VDFConfigGroup.HandlerType.SparseHorizon);

if(engine) {
// VDF Engine: Add config group
Expand Down
18 changes: 17 additions & 1 deletion core/src/test/java/org/eqasim/TestSimulationPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

import com.google.inject.Inject;
import com.google.inject.Provider;
import org.matsim.utils.eventsfilecomparison.ComparisonResult;
import org.matsim.utils.eventsfilecomparison.EventsFileComparator;

public class TestSimulationPipeline {

Expand Down Expand Up @@ -377,6 +379,7 @@ public void testTransitWithAbstractAccess() throws CommandLine.ConfigurationExce
}

public void runVdf() throws CommandLine.ConfigurationException, IOException, InterruptedException {
// This one will use the SparseHorizon handler
AdaptConfigForVDF.main(new String[] {
"--input-config-path", "melun_test/input/config.xml",
"--output-config-path", "melun_test/input/config_vdf.xml",
Expand All @@ -387,6 +390,20 @@ public void runVdf() throws CommandLine.ConfigurationException, IOException, Int

runMelunSimulation("melun_test/input/config_vdf.xml", "melun_test/output_vdf");


// We force this one to use the legacy horizon handler
AdaptConfigForVDF.main(new String[] {
"--input-config-path", "melun_test/input/config.xml",
"--output-config-path", "melun_test/input/config_vdf_horizon.xml",
"--engine", "true",
"--config:eqasim:vdf_engine.generateNetworkEvents", "true",
"--config:eqasim:vdf.handler", "Horizon"
});

runMelunSimulation("melun_test/input/config_vdf_horizon.xml", "melun_test/output_vdf_horizon");

assert CRCChecksum.getCRCFromFile("melun_test/output_vdf_horizon/output_plans.xml.gz") == CRCChecksum.getCRCFromFile("melun_test/output_vdf/output_plans.xml.gz");

RunStandaloneModeChoice.main(new String[]{
"--config-path", "melun_test/input/config_vdf.xml",
"--config:standaloneModeChoice.outputDirectory", "melun_test/output_mode_choice_vdf",
Expand Down Expand Up @@ -425,7 +442,6 @@ public void testPipeline() throws Exception {

@Test
public void testBaseDeterminism() throws Exception {
Logger logger = LogManager.getLogger(TestSimulationPipeline.class);
Config config = ConfigUtils.loadConfig("melun_test/input/config.xml");
runMelunSimulation(config, "melun_test/output_determinism_1", null, 2);

Expand Down

0 comments on commit 7ed1950

Please sign in to comment.