diff --git a/src/main/java/org/torproject/metrics/onionoo/updater/OnionperfStatusUpdater.java b/src/main/java/org/torproject/metrics/onionoo/updater/OnionperfStatusUpdater.java index 1b4e266..2627d05 100644 --- a/src/main/java/org/torproject/metrics/onionoo/updater/OnionperfStatusUpdater.java +++ b/src/main/java/org/torproject/metrics/onionoo/updater/OnionperfStatusUpdater.java @@ -7,14 +7,15 @@ import org.torproject.metrics.onionoo.docs.DocumentStore; import org.torproject.metrics.onionoo.docs.DocumentStoreFactory; import org.torproject.metrics.onionoo.docs.OnionperfStatus; -import org.torproject.metrics.onionoo.docs.onionperf.*; import org.torproject.metrics.onionoo.onionperf.Measurement; import org.torproject.metrics.onionoo.onionperf.TorperfResultConverter; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; public class OnionperfStatusUpdater implements DescriptorListener, StatusUpdater { @@ -56,18 +57,25 @@ private void processTorPerfResult(TorperfResult descriptor) { @Override public void updateStatuses() { - logger.info("Updating Onionperf statuses. Size: {}", measurements.size()); -// OnionperfStatus status = new OnionperfStatus(measurements); -// documentStore.store(status); - documentStore.store(new CircuitDocument(measurements)); - documentStore.store(new DownloadDocument(measurements)); - documentStore.store(new FailureDocument(measurements)); - documentStore.store(new LatencyDocument(measurements)); - documentStore.store(new ThroughputDocument(measurements)); - logger.info("Onionperf statuses updated."); + logger.info("Updating Onionperf new measurements size: {}", measurements.size()); + OnionperfStatus statusOld = documentStore.retrieve(OnionperfStatus.class, true); + if (statusOld != null) { + logger.info("OnionperfStatus old measurements: {}", statusOld.getMeasurements().size()); + Long threshold = LocalDateTime.now().atOffset(ZoneOffset.UTC).minusDays(2).toEpochSecond(); + measurements.addAll(filter(statusOld.getMeasurements(), threshold)); + logger.info("OnionperfStatus merged measurements size: {}", measurements.size()); + } + OnionperfStatus status = new OnionperfStatus(measurements); + documentStore.store(status); + logger.info("Measurements clearing"); measurements.clear(); - logger.info("Measurements cleared."); + } + + private Collection filter(List measurements, Long threshold) { + return measurements.stream() + .filter(m -> m.getStart().toLocalDateTime().atZone(ZoneOffset.UTC).toEpochSecond() > threshold) + .collect(Collectors.toList()); } @Override diff --git a/src/main/java/org/torproject/metrics/onionoo/writer/OnionperfDocumentWriter.java b/src/main/java/org/torproject/metrics/onionoo/writer/OnionperfDocumentWriter.java index 5d96ccb..66911d0 100644 --- a/src/main/java/org/torproject/metrics/onionoo/writer/OnionperfDocumentWriter.java +++ b/src/main/java/org/torproject/metrics/onionoo/writer/OnionperfDocumentWriter.java @@ -19,14 +19,15 @@ public OnionperfDocumentWriter() { @Override public void writeDocuments(long mostRecentStatusMillis) { -// logger.info("Writing onionperf documents time: {}", mostRecentStatusMillis); -// OnionperfStatus status = documentStore.retrieve(OnionperfStatus.class, true); -// logger.info("OnionperfStatus: {}", status.getMeasurements().size()); -// documentStore.store(new CircuitDocument(status.getMeasurements())); -// documentStore.store(new DownloadDocument(status.getMeasurements())); -// documentStore.store(new FailureDocument(status.getMeasurements())); -// documentStore.store(new LatencyDocument(status.getMeasurements())); -// documentStore.store(new ThroughputDocument(status.getMeasurements())); + logger.info("Writing onionperf documents time: {}", mostRecentStatusMillis); + OnionperfStatus status = documentStore.retrieve(OnionperfStatus.class, true); + logger.info("OnionperfStatus: {}", status.getMeasurements().size()); + documentStore.store(new CircuitDocument(status.getMeasurements())); + documentStore.store(new DownloadDocument(status.getMeasurements())); + documentStore.store(new FailureDocument(status.getMeasurements())); + documentStore.store(new LatencyDocument(status.getMeasurements())); + documentStore.store(new ThroughputDocument(status.getMeasurements())); + logger.info("Performance documents saved"); } @Override