diff --git a/core/src/main/java/org/eqasim/core/simulation/vdf/VDFConfigGroup.java b/core/src/main/java/org/eqasim/core/simulation/vdf/VDFConfigGroup.java index caf36943a..d4a16499b 100644 --- a/core/src/main/java/org/eqasim/core/simulation/vdf/VDFConfigGroup.java +++ b/core/src/main/java/org/eqasim/core/simulation/vdf/VDFConfigGroup.java @@ -49,7 +49,7 @@ public class VDFConfigGroup extends ReflectiveConfigGroup { private int writeFlowInterval = 0; public enum HandlerType { - Horizon, Interpolation + Horizon, Interpolation, SparseHorizon } private HandlerType handler = HandlerType.Horizon; diff --git a/core/src/main/java/org/eqasim/core/simulation/vdf/VDFModule.java b/core/src/main/java/org/eqasim/core/simulation/vdf/VDFModule.java index 862d48de3..73035b482 100644 --- a/core/src/main/java/org/eqasim/core/simulation/vdf/VDFModule.java +++ b/core/src/main/java/org/eqasim/core/simulation/vdf/VDFModule.java @@ -11,6 +11,7 @@ import org.eqasim.core.simulation.mode_choice.AbstractEqasimExtension; import org.eqasim.core.simulation.vdf.handlers.VDFHorizonHandler; import org.eqasim.core.simulation.vdf.handlers.VDFInterpolationHandler; +import org.eqasim.core.simulation.vdf.handlers.VDFSparseHorizonHandler; import org.eqasim.core.simulation.vdf.handlers.VDFTrafficHandler; import org.eqasim.core.simulation.vdf.travel_time.VDFTravelTime; import org.eqasim.core.simulation.vdf.travel_time.function.BPRFunction; @@ -41,6 +42,10 @@ protected void installEqasimExtension() { bind(VDFTrafficHandler.class).to(VDFHorizonHandler.class); addEventHandlerBinding().to(VDFHorizonHandler.class); break; + case SparseHorizon: + bind(VDFTrafficHandler.class).to(VDFSparseHorizonHandler.class); + addEventHandlerBinding().to(VDFSparseHorizonHandler.class); + break; case Interpolation: bind(VDFTrafficHandler.class).to(VDFInterpolationHandler.class); addEventHandlerBinding().to(VDFInterpolationHandler.class); @@ -85,6 +90,12 @@ public VDFHorizonHandler provideVDFHorizonHandler(VDFConfigGroup config, Network return new VDFHorizonHandler(network, scope, config.getHorizon(), getConfig().global().getNumberOfThreads()); } + @Provides + @Singleton + public VDFSparseHorizonHandler provideVDFSparseHorizonHandler(VDFConfigGroup config, Network network, VDFScope scope) { + return new VDFSparseHorizonHandler(network, scope, config.getHorizon(), getConfig().global().getNumberOfThreads()); + } + @Provides @Singleton public VDFInterpolationHandler provideVDFInterpolationHandler(VDFConfigGroup config, Network network, diff --git a/core/src/main/java/org/eqasim/core/simulation/vdf/handlers/VDFSparseHorizonHandler.java b/core/src/main/java/org/eqasim/core/simulation/vdf/handlers/VDFSparseHorizonHandler.java new file mode 100644 index 000000000..2e8eb5d5d --- /dev/null +++ b/core/src/main/java/org/eqasim/core/simulation/vdf/handlers/VDFSparseHorizonHandler.java @@ -0,0 +1,292 @@ +package org.eqasim.core.simulation.vdf.handlers; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eqasim.core.simulation.vdf.VDFScope; +import org.eqasim.core.simulation.vdf.io.VDFReaderInterface; +import org.eqasim.core.simulation.vdf.io.VDFWriterInterface; +import org.matsim.api.core.v01.Id; +import org.matsim.api.core.v01.IdMap; +import org.matsim.api.core.v01.events.LinkEnterEvent; +import org.matsim.api.core.v01.events.handler.LinkEnterEventHandler; +import org.matsim.api.core.v01.network.Link; +import org.matsim.api.core.v01.network.Network; +import org.matsim.core.utils.io.IOUtils; + +import com.google.common.base.Verify; + +public class VDFSparseHorizonHandler implements VDFTrafficHandler, LinkEnterEventHandler { + private final VDFScope scope; + + private final Network network; + private final int horizon; + private final int numberOfThreads; + + private final IdMap> counts = new IdMap<>(Link.class); + + private final static Logger logger = LogManager.getLogger(VDFSparseHorizonHandler.class); + + private record LinkState(List time, List count) { + } + + private List> state = new LinkedList<>(); + + public VDFSparseHorizonHandler(Network network, VDFScope scope, int horizon, int numberOfThreads) { + this.scope = scope; + this.network = network; + this.horizon = horizon; + this.numberOfThreads = numberOfThreads; + + for (Id linkId : network.getLinks().keySet()) { + counts.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0))); + } + } + + @Override + public synchronized void handleEvent(LinkEnterEvent event) { + processEnterLink(event.getTime(), event.getLinkId()); + } + + public void processEnterLink(double time, Id linkId) { + int i = scope.getIntervalIndex(time); + double currentValue = counts.get(linkId).get(i); + counts.get(linkId).set(i, currentValue + 1); + } + + @Override + public IdMap> aggregate(boolean ignoreIteration) { + while (state.size() > horizon) { + state.remove(0); + } + + logger.info(String.format("Starting aggregation of %d slices", state.size())); + + // Transform counts into state object + if (!ignoreIteration) { + IdMap newState = new IdMap<>(Link.class); + state.add(newState); + + for (Map.Entry, List> entry : counts.entrySet()) { + double total = 0.0; + + for (double value : entry.getValue()) { + total += value; + } + + if (total > 0.0) { + LinkState linkState = new LinkState(new ArrayList<>(), new ArrayList<>()); + newState.put(entry.getKey(), linkState); + + int timeIndex = 0; + for (double count : entry.getValue()) { + if (count > 0.0) { + linkState.time.add(timeIndex); + linkState.count.add(count); + } + + timeIndex++; + } + } + } + } + + IdMap> aggregated = new IdMap<>(Link.class); + + for (Id linkId : network.getLinks().keySet()) { + // Reset current counts + counts.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0))); + + // Initialize aggregated counts + aggregated.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0))); + } + + // Aggregate + Iterator> linkIterator = network.getLinks().keySet().iterator(); + + Runnable worker = () -> { + Id currentLinkId = null; + + while (true) { + // Fetch new link in queue + synchronized (linkIterator) { + if (linkIterator.hasNext()) { + currentLinkId = linkIterator.next(); + } else { + break; // Done + } + } + + // Go through history for this link and aggregate by time slot + for (int k = 0; k < state.size(); k++) { + LinkState historyItem = state.get(k).get(currentLinkId); + List linkAggregator = aggregated.get(currentLinkId); + + if (historyItem != null) { + for (int i = 0; i < historyItem.count.size(); i++) { + int timeIndex = historyItem.time.get(i); + linkAggregator.set(timeIndex, + linkAggregator.get(timeIndex) + historyItem.count.get(i) / (double) state.size()); + } + } + } + } + }; + + if (numberOfThreads < 2) { + worker.run(); + } else { + List threads = new ArrayList<>(numberOfThreads); + + for (int k = 0; k < numberOfThreads; k++) { + threads.add(new Thread(worker)); + } + + for (int k = 0; k < numberOfThreads; k++) { + threads.get(k).start(); + } + + try { + for (int k = 0; k < numberOfThreads; k++) { + threads.get(k).join(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + logger.info(String.format(" Finished aggregation")); + + return aggregated; + } + + @Override + public VDFReaderInterface getReader() { + return new Reader(); + } + + @Override + public VDFWriterInterface getWriter() { + return new Writer(); + } + + public class Reader implements VDFReaderInterface { + @Override + public void readFile(URL inputFile) { + state.clear(); + + try { + DataInputStream inputStream = new DataInputStream(IOUtils.getInputStream(inputFile)); + + Verify.verify(inputStream.readDouble() == scope.getStartTime()); + Verify.verify(inputStream.readDouble() == scope.getEndTime()); + Verify.verify(inputStream.readDouble() == scope.getIntervalTime()); + Verify.verify(inputStream.readInt() == scope.getIntervals()); + Verify.verify(inputStream.readInt() == horizon); + + int slices = (int) inputStream.readInt(); + int links = (int) inputStream.readInt(); + + List> linkIds = new ArrayList<>(links); + for (int linkIndex = 0; linkIndex < links; linkIndex++) { + linkIds.add(Id.createLinkId(inputStream.readUTF())); + } + + logger.info(String.format("Loading %d slices with %d links", slices, links)); + + for (int sliceIndex = 0; sliceIndex < slices; sliceIndex++) { + IdMap slice = new IdMap<>(Link.class); + state.add(slice); + + int sliceLinkCount = inputStream.readInt(); + + logger.info(String.format("Slice %d/%d, Reading %d link states", sliceIndex+1, slices, sliceLinkCount)); + + for (int sliceLinkIndex = 0; sliceLinkIndex < sliceLinkCount; sliceLinkIndex++) { + int linkIndex = inputStream.readInt(); + int linkStateSize = inputStream.readInt(); + + LinkState linkState = new LinkState(new ArrayList<>(linkStateSize), + new ArrayList<>(linkStateSize)); + slice.put(linkIds.get(linkIndex), linkState); + + for (int i = 0; i < linkStateSize; i++) { + linkState.time.add(inputStream.readInt()); + linkState.count.add(inputStream.readDouble()); + } + } + + logger.info(String.format(" Slice %d: %d obs", sliceIndex, + sliceLinkCount)); + } + + Verify.verify(inputStream.available() == 0); + inputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public class Writer implements VDFWriterInterface { + @Override + public void writeFile(File outputFile) { + try { + DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(outputFile.toString())); + + outputStream.writeDouble(scope.getStartTime()); + outputStream.writeDouble(scope.getEndTime()); + outputStream.writeDouble(scope.getIntervalTime()); + outputStream.writeInt(scope.getIntervals()); + outputStream.writeInt(horizon); + outputStream.writeInt(state.size()); + outputStream.writeInt(counts.size()); + + List> linkIds = new ArrayList<>(counts.keySet()); + for (int linkIndex = 0; linkIndex < linkIds.size(); linkIndex++) { + outputStream.writeUTF(linkIds.get(linkIndex).toString()); + } + + logger.info(String.format("About to write %d slices", state.size())); + + for (int sliceIndex = 0; sliceIndex < state.size(); sliceIndex++) { + IdMap slice = state.get(sliceIndex); + outputStream.writeInt(slice.size()); + + int sliceLinkIndex = 0; + for (Id linkId : linkIds) { + LinkState linkState = slice.get(linkId); + if(linkState == null) { + continue; + } + outputStream.writeInt(linkIds.indexOf(linkId)); + outputStream.writeInt(linkState.count.size()); + + for (int i = 0; i < linkState.count.size(); i++) { + outputStream.writeInt(linkState.time.get(i)); + outputStream.writeDouble(linkState.count.get(i)); + } + sliceLinkIndex += 1; + } + assert sliceLinkIndex == slice.size(); + } + + outputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/core/src/main/java/org/eqasim/core/simulation/vdf/utils/AdaptConfigForVDF.java b/core/src/main/java/org/eqasim/core/simulation/vdf/utils/AdaptConfigForVDF.java index dfb0ea555..ac63f6df3 100644 --- a/core/src/main/java/org/eqasim/core/simulation/vdf/utils/AdaptConfigForVDF.java +++ b/core/src/main/java/org/eqasim/core/simulation/vdf/utils/AdaptConfigForVDF.java @@ -1,6 +1,5 @@ package org.eqasim.core.simulation.vdf.utils; -import org.eqasim.core.components.config.EqasimConfigGroup; import org.eqasim.core.simulation.EqasimConfigurator; import org.eqasim.core.simulation.vdf.VDFConfigGroup; import org.eqasim.core.simulation.vdf.engine.VDFEngineConfigGroup; @@ -25,9 +24,8 @@ public static void adaptConfigForVDF(Config config, boolean engine) { VDFConfigGroup.getOrCreate(config).setWriteInterval(1); VDFConfigGroup.getOrCreate(config).setWriteFlowInterval(1); - // VDF: Set capacity factor instead (We retrieve it form the Eqasim config group) - EqasimConfigGroup eqasimConfigGroup = (EqasimConfigGroup) config.getModules().get(EqasimConfigGroup.GROUP_NAME); - VDFConfigGroup.getOrCreate(config).setCapacityFactor(eqasimConfigGroup.getSampleSize()); + VDFConfigGroup vdfConfigGroup = VDFConfigGroup.getOrCreate(config); + vdfConfigGroup.setHandler(VDFConfigGroup.HandlerType.SparseHorizon); if(engine) { // VDF Engine: Add config group diff --git a/core/src/test/java/org/eqasim/TestSimulationPipeline.java b/core/src/test/java/org/eqasim/TestSimulationPipeline.java index 8b8d5e798..e692fd53f 100644 --- a/core/src/test/java/org/eqasim/TestSimulationPipeline.java +++ b/core/src/test/java/org/eqasim/TestSimulationPipeline.java @@ -55,6 +55,8 @@ import com.google.inject.Inject; import com.google.inject.Provider; +import org.matsim.utils.eventsfilecomparison.ComparisonResult; +import org.matsim.utils.eventsfilecomparison.EventsFileComparator; public class TestSimulationPipeline { @@ -377,6 +379,7 @@ public void testTransitWithAbstractAccess() throws CommandLine.ConfigurationExce } public void runVdf() throws CommandLine.ConfigurationException, IOException, InterruptedException { + // This one will use the SparseHorizon handler AdaptConfigForVDF.main(new String[] { "--input-config-path", "melun_test/input/config.xml", "--output-config-path", "melun_test/input/config_vdf.xml", @@ -387,6 +390,20 @@ public void runVdf() throws CommandLine.ConfigurationException, IOException, Int runMelunSimulation("melun_test/input/config_vdf.xml", "melun_test/output_vdf"); + + // We force this one to use the legacy horizon handler + AdaptConfigForVDF.main(new String[] { + "--input-config-path", "melun_test/input/config.xml", + "--output-config-path", "melun_test/input/config_vdf_horizon.xml", + "--engine", "true", + "--config:eqasim:vdf_engine.generateNetworkEvents", "true", + "--config:eqasim:vdf.handler", "Horizon" + }); + + runMelunSimulation("melun_test/input/config_vdf_horizon.xml", "melun_test/output_vdf_horizon"); + + assert CRCChecksum.getCRCFromFile("melun_test/output_vdf_horizon/output_plans.xml.gz") == CRCChecksum.getCRCFromFile("melun_test/output_vdf/output_plans.xml.gz"); + RunStandaloneModeChoice.main(new String[]{ "--config-path", "melun_test/input/config_vdf.xml", "--config:standaloneModeChoice.outputDirectory", "melun_test/output_mode_choice_vdf", @@ -425,7 +442,6 @@ public void testPipeline() throws Exception { @Test public void testBaseDeterminism() throws Exception { - Logger logger = LogManager.getLogger(TestSimulationPipeline.class); Config config = ConfigUtils.loadConfig("melun_test/input/config.xml"); runMelunSimulation(config, "melun_test/output_determinism_1", null, 2);