diff --git a/src/main/java/org/torproject/metrics/onionoo/updater/UserStatsStatusUpdater.java b/src/main/java/org/torproject/metrics/onionoo/updater/UserStatsStatusUpdater.java index d48c5a5..ba3f1fe 100644 --- a/src/main/java/org/torproject/metrics/onionoo/updater/UserStatsStatusUpdater.java +++ b/src/main/java/org/torproject/metrics/onionoo/updater/UserStatsStatusUpdater.java @@ -1,5 +1,6 @@ package org.torproject.metrics.onionoo.updater; +import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.torproject.descriptor.*; @@ -9,6 +10,7 @@ import org.torproject.metrics.onionoo.userstats.*; import java.util.*; +import java.util.stream.Collectors; public class UserStatsStatusUpdater implements DescriptorListener, StatusUpdater { @@ -23,6 +25,8 @@ public class UserStatsStatusUpdater implements DescriptorListener, StatusUpdater private List imported = new ArrayList<>(); + private NewAggregator newAggregator = new NewAggregator(); + public UserStatsStatusUpdater() { this.descriptorSource = DescriptorSourceFactory.getDescriptorSource(); this.documentStore = DocumentStoreFactory.getDocumentStore(); @@ -102,8 +106,8 @@ private void parseRelayDirreqWriteHistory(String fingerprint, } else if (i == 1) { break; } - insertIntoImported(fingerprint, nickname, "relay", "bytes", "", - "", "", fromMillis, toMillis, writtenBytes); + insertIntoImported(fingerprint, nickname, "relay", "bytes", null, + null, null, fromMillis, toMillis, writtenBytes); } } } @@ -159,10 +163,10 @@ private void parseRelayDirreqV3Resp(String fingerprint, String country = e.getKey(); double val = resp * intervalFraction * e.getValue() / total; insertIntoImported(fingerprint, nickname, "relay", - "responses", country, "", "", fromMillis, toMillis, val); + "responses", country, null, null, fromMillis, toMillis, val); } insertIntoImported(fingerprint, nickname, "relay", "responses", - "", "", "", fromMillis, toMillis, resp * intervalFraction); + null, null, null, fromMillis, toMillis, resp * intervalFraction); } } } @@ -177,7 +181,7 @@ private void parseRelayNetworkStatusConsensus(RelayNetworkStatusConsensus consen String nickname = statusEntry.getNickname(); if (statusEntry.getFlags().contains("Running")) { insertIntoImported(fingerprint, nickname, "relay", "status", - "", "", "", fromMillis, toMillis, 0.0); + null, null, null, fromMillis, toMillis, 0.0); } } } @@ -205,12 +209,17 @@ void insertIntoImported(String fingerprint, String nickname, String node, @Override public void updateStatuses() { logger.error("Imported size: {}", imported.size()); - List merge = DataProcessor.merge(imported); - logger.error("Merged size: {}", merge.size()); - List aggregated = DataProcessor.aggregate(merge); - logger.error("Aggregated size: {}", aggregated.size()); - logger.error("Aggregated: {}", aggregated); - List estimated = DataProcessor.estimate(aggregated); +// List merge = DataProcessor.merge(imported); + List merge1 = Merger.mergeFirstTime(imported); +// logger.error("Merged size: {}", merge.size()); + logger.error("Merged size1: {}", merge1.size()); +// List aggregated = DataProcessor.aggregate(merge); + List aggregate = newAggregator.aggregate(merge1); +// logger.error("Aggregated size: {}", aggregated.size()); + logger.error("Aggregated1 size: {}", aggregate.size()); +// logger.error("Aggregated: {}", aggregated); + logger.error("Aggregated1: {}", aggregate); + List estimated = DataProcessor.estimate(aggregate); logger.error("Estimated size: {}", estimated.size()); logger.error("Estimated: {}", estimated); this.documentStore.store(new UserStatsStatus(estimated)); diff --git a/src/main/java/org/torproject/metrics/onionoo/userstats/Aggregated.java b/src/main/java/org/torproject/metrics/onionoo/userstats/Aggregated.java index 75864e5..be37c83 100644 --- a/src/main/java/org/torproject/metrics/onionoo/userstats/Aggregated.java +++ b/src/main/java/org/torproject/metrics/onionoo/userstats/Aggregated.java @@ -1,7 +1,9 @@ package org.torproject.metrics.onionoo.userstats; +import java.time.LocalDate; + public class Aggregated { - private long date; + private LocalDate date; private String node; private String country; private String transport; @@ -14,7 +16,7 @@ public class Aggregated { private double nh; private double nrh; - public Aggregated(long date, + public Aggregated(LocalDate date, String node, String country, String transport, @@ -40,11 +42,11 @@ public Aggregated(long date, this.nrh = nrh; } - public long getDate() { + public LocalDate getDate() { return date; } - public void setDate(long date) { + public void setDate(LocalDate date) { this.date = date; } diff --git a/src/main/java/org/torproject/metrics/onionoo/userstats/DataProcessor.java b/src/main/java/org/torproject/metrics/onionoo/userstats/DataProcessor.java index 5c52648..4df23d6 100644 --- a/src/main/java/org/torproject/metrics/onionoo/userstats/DataProcessor.java +++ b/src/main/java/org/torproject/metrics/onionoo/userstats/DataProcessor.java @@ -1,8 +1,6 @@ package org.torproject.metrics.onionoo.userstats; -import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; +import java.time.*; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -13,35 +11,60 @@ public class DataProcessor { // Method to merge imported data into merged data public static List merge(List importedList) { + int idCounter = 1; List mergedList = new ArrayList<>(); - // Group Imported entries by unique keys and process each group - Map> groupedImports = importedList.stream().collect( - Collectors.groupingBy(imp -> String.join("-", - imp.getFingerprint(), imp.getNickname(), imp.getNode(), - imp.getMetric(), imp.getCountry(), imp.getTransport(), imp.getVersion())) - ); + // Step 1: Group by unique fields (fingerprint, nickname, node, metric, country, transport, version) + Map> groupedImported = importedList.stream().collect(Collectors.groupingBy( + imported -> String.join("-", imported.getFingerprint(), imported.getNickname(), + imported.getNode(), imported.getMetric(), + imported.getCountry(), imported.getTransport(), imported.getVersion()) + )); + + // Step 2: Process each group independently + for (List group : groupedImported.values()) { + // Sort each group by startTime to ensure intervals are processed in sequence + group.sort(Comparator.comparing(Imported::getStatsStart)); + + // Initialize variables to track merging within the group + long lastStartTime = group.get(0).getStatsStart(); + long lastEndTime = group.get(0).getStatsEnd(); + double lastVal = group.get(0).getVal(); + + // Use first entry to initialize shared fields for Merged + String fingerprint = group.get(0).getFingerprint(); + String nickname = group.get(0).getNickname(); + String node = group.get(0).getNode(); + String metric = group.get(0).getMetric(); + String country = group.get(0).getCountry(); + String transport = group.get(0).getTransport(); + String version = group.get(0).getVersion(); - int idCounter = 1; - for (List group : groupedImports.values()) { - Merged mergedEntry = new Merged(idCounter++, - group.get(0).getFingerprint(), group.get(0).getNickname(), group.get(0).getNode(), - group.get(0).getMetric(), group.get(0).getCountry(), group.get(0).getTransport(), - group.get(0).getVersion(), group.get(0).getStatsStart(), group.get(0).getStatsEnd(), 0); - - // Merge intervals within each group - for (Imported imp : group) { - if (imp.getStatsStart() > mergedEntry.getStatsEnd()) { - // If there’s a gap, create a new merged entry - mergedEntry.setStatsEnd(imp.getStatsEnd()); - mergedEntry.setVal(mergedEntry.getVal() + imp.getVal()); + // Merge intervals within the sorted group + for (int i = 1; i < group.size(); i++) { + Imported current = group.get(i); + + if (current.getStatsStart() <= lastEndTime) { + // Overlapping or adjacent interval, extend the end time and accumulate the value + lastEndTime = Math.max(lastEndTime, current.getStatsEnd()); + lastVal += current.getVal(); } else { - mergedEntry.setStatsEnd(Math.max(imp.getStatsEnd(), mergedEntry.getStatsEnd())); - mergedEntry.setVal(mergedEntry.getVal() + imp.getVal()); + // No overlap, add the previous merged interval to mergedList + mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country, + transport, version, lastStartTime, lastEndTime, lastVal)); + + // Start a new interval + lastStartTime = current.getStatsStart(); + lastEndTime = current.getStatsEnd(); + lastVal = current.getVal(); } } - mergedList.add(mergedEntry); + + // Add the last merged interval of the group to mergedList + mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country, + transport, version, lastStartTime, lastEndTime, lastVal)); } + return mergedList; } @@ -59,7 +82,7 @@ public static List aggregate(List mergedList) { ); for (List group : groupedMerges.values()) { - long date = group.get(0).getStatsStart(); + LocalDate date = Instant.ofEpochMilli(group.get(0).getStatsStart()).atZone(ZoneOffset.UTC).toLocalDate(); String node = group.get(0).getNode(); String country = group.get(0).getCountry(); String transport = group.get(0).getTransport(); diff --git a/src/main/java/org/torproject/metrics/onionoo/userstats/Estimated.java b/src/main/java/org/torproject/metrics/onionoo/userstats/Estimated.java index ee4eb90..0de8629 100644 --- a/src/main/java/org/torproject/metrics/onionoo/userstats/Estimated.java +++ b/src/main/java/org/torproject/metrics/onionoo/userstats/Estimated.java @@ -1,8 +1,10 @@ package org.torproject.metrics.onionoo.userstats; +import java.time.LocalDate; + public class Estimated { - private long date; + private LocalDate date; private String node; private String country; private String transport; @@ -10,7 +12,7 @@ public class Estimated { private int frac; private int users; - public Estimated(long date, String node, String country, String transport, String version, int frac, int users) { + public Estimated(LocalDate date, String node, String country, String transport, String version, int frac, int users) { this.date = date; this.node = node; this.country = country; @@ -20,11 +22,11 @@ public Estimated(long date, String node, String country, String transport, Strin this.users = users; } - public long getDate() { + public LocalDate getDate() { return date; } - public void setDate(long date) { + public void setDate(LocalDate date) { this.date = date; } diff --git a/src/main/java/org/torproject/metrics/onionoo/userstats/Merger.java b/src/main/java/org/torproject/metrics/onionoo/userstats/Merger.java new file mode 100644 index 0000000..3117523 --- /dev/null +++ b/src/main/java/org/torproject/metrics/onionoo/userstats/Merger.java @@ -0,0 +1,65 @@ +package org.torproject.metrics.onionoo.userstats; + +import java.util.*; +import java.util.stream.Collectors; + +public class Merger { + private static int idCounter = 1; + + public static List mergeFirstTime(List importedList) { + List mergedList = new ArrayList<>(); + + // Step 1: Group by unique fields (fingerprint, nickname, node, metric, country, transport, version) + Map> groupedImported = importedList.stream().collect(Collectors.groupingBy( + imported -> String.join("-", imported.getFingerprint(), imported.getNickname(), + imported.getNode(), imported.getMetric(), + imported.getCountry(), imported.getTransport(), imported.getVersion()) + )); + + // Step 2: Process each group independently + for (List group : groupedImported.values()) { + // Sort each group by startTime to ensure intervals are processed in sequence + group.sort(Comparator.comparing(Imported::getStatsStart)); + + // Initialize variables to track merging within the group + long lastStartTime = group.get(0).getStatsStart(); + long lastEndTime = group.get(0).getStatsEnd(); + double lastVal = group.get(0).getVal(); + + // Use first entry to initialize shared fields for Merged + String fingerprint = group.get(0).getFingerprint(); + String nickname = group.get(0).getNickname(); + String node = group.get(0).getNode(); + String metric = group.get(0).getMetric(); + String country = group.get(0).getCountry(); + String transport = group.get(0).getTransport(); + String version = group.get(0).getVersion(); + + // Merge intervals within the sorted group + for (int i = 1; i < group.size(); i++) { + Imported current = group.get(i); + + if (current.getStatsStart() <= lastEndTime) { + // Overlapping or adjacent interval, extend the end time and accumulate the value + lastEndTime = Math.max(lastEndTime, current.getStatsEnd()); + lastVal += current.getVal(); + } else { + // No overlap, add the previous merged interval to mergedList + mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country, + transport, version, lastStartTime, lastEndTime, lastVal)); + + // Start a new interval + lastStartTime = current.getStatsStart(); + lastEndTime = current.getStatsEnd(); + lastVal = current.getVal(); + } + } + + // Add the last merged interval of the group to mergedList + mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country, + transport, version, lastStartTime, lastEndTime, lastVal)); + } + + return mergedList; + } +} \ No newline at end of file diff --git a/src/main/java/org/torproject/metrics/onionoo/userstats/NewAggregator.java b/src/main/java/org/torproject/metrics/onionoo/userstats/NewAggregator.java new file mode 100644 index 0000000..4faee18 --- /dev/null +++ b/src/main/java/org/torproject/metrics/onionoo/userstats/NewAggregator.java @@ -0,0 +1,179 @@ +package org.torproject.metrics.onionoo.userstats; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.*; +import java.util.stream.Collectors; + +public class NewAggregator { + + public List aggregate(List merged) { + + Map updateTemp = merged.stream().collect( + Collectors.groupingBy( + this::groupKey, + Collectors.collectingAndThen(Collectors.toList(), this::aggregateToUpdateTemp) + ) + ); + + List aggregatedList = new ArrayList<>(); + + Collection first = updateTemp.values().stream() + .filter(ut -> "responses".equals(ut.getMetric())) + .collect( + Collectors.groupingBy(UpdateTemp::key, + Collectors.collectingAndThen(Collectors.toList(), this::aggregateToAggregated) + ) + ).values(); + + aggregatedList.addAll(first); + + List noDimension = updateTemp.values().stream() + .filter(ut -> ut.getCountry() == null && ut.getTransport() == null && ut.getVersion() == null) + .collect(Collectors.toList()); + + Map collect = noDimension.stream() + .filter(nd -> "bytes".equals(nd.getMetric())) + .collect(Collectors.groupingBy( + UpdateTemp::getDate, + Collectors.collectingAndThen(Collectors.toList(), this::aggregateToBytes) + ) + ); + + aggregatedList.forEach(a -> { + Aggregated bytes = collect.get(a.getDate()); + if (bytes != null) { + a.setHh(bytes.getHh()); + a.setNh(bytes.getNh()); + } + }); + + Map status = noDimension.stream() + .filter(nd -> "status".equals(nd.getMetric())) + .collect(Collectors.groupingBy(UpdateTemp::getDate, Collectors.summingDouble(UpdateTemp::getSeconds))); + + aggregatedList.forEach(a -> { + Double aDouble = status.get(a.getDate()); + if (aDouble != null) { + a.setNn(aDouble); + } + }); + + Map hrh = noDimension.stream() + .collect(Collectors.groupingBy(UpdateTemp::anotherKey)) + .entrySet().stream() + .map(entry -> filterBoth(entry.getValue())) + .filter(Objects::nonNull) + .collect(Collectors.groupingBy(Aggregated::getDate, Collectors.summingDouble(Aggregated::getHrh))); + + aggregatedList.forEach(a -> { + Double aDouble = hrh.get(a.getDate()); + if (aDouble != null) { + a.setHrh(aDouble); + } + }); + + Map nrh = noDimension.stream() + .collect(Collectors.groupingBy(UpdateTemp::anotherKey)) + .entrySet().stream() + .map(entry -> filterBothLeft(entry.getValue())) + .filter(Objects::nonNull) + .collect(Collectors.groupingBy(Aggregated::getDate, Collectors.summingDouble(Aggregated::getNrh))); + + aggregatedList.forEach(a -> { + Double aDouble = nrh.get(a.getDate()); + if (aDouble != null) { + a.setNrh(aDouble); + } + }); + + return aggregatedList; + } + + private String groupKey(Merged m) { + return m.getFingerprint() + "-" + m.getNickname() + "-" + m.getNode() + "-" + m.getMetric() + "-" + + m.getCountry() + "-" + m.getTransport() + "-" + m.getVersion() + "-" + epochToLocalDate(m.getStatsStart()); + } + + // Helper function to check if dates are the same based on `long` timestamps + private boolean sameDate(long startTime1, long startTime2) { + return epochToLocalDate(startTime1).equals(epochToLocalDate(startTime2)); + } + + // Helper function to convert epoch timestamp to LocalDate for date comparisons + private LocalDate epochToLocalDate(long epochMillis) { + return Instant.ofEpochMilli(epochMillis).atZone(ZoneOffset.UTC).toLocalDate(); + } + + // Helper function to aggregate a list of `Merged` entries into an `UpdateTemp` + private UpdateTemp aggregateToUpdateTemp(List list) { + Merged first = list.get(0); + long totalSeconds = list.stream().mapToLong(m -> (m.getStatsEnd() - m.getStatsStart()) / 1000).sum(); + double totalVal = list.stream().mapToDouble(Merged::getVal).sum(); + + return new UpdateTemp(first.getFingerprint(), first.getNickname(), first.getNode(), first.getMetric(), + first.getCountry(), first.getTransport(), first.getVersion(), + epochToLocalDate(first.getStatsStart()), totalVal, totalSeconds); + } + + private Aggregated aggregateToAggregated(List list) { + UpdateTemp first = list.get(0); + double rrx = list.stream().mapToDouble(UpdateTemp::getVal).sum(); + long nrx = list.stream().mapToLong(UpdateTemp::getSeconds).sum(); + + return new Aggregated(first.getDate(), first.getNode(), first.getCountry(), first.getTransport(), + first.getVersion(), rrx, nrx, 0, 0, 0, 0, 0); + } + + private Aggregated aggregateToBytes(List list) { + UpdateTemp first = list.get(0); + double hh = list.stream().mapToDouble(UpdateTemp::getVal).sum(); + long nh = list.stream().mapToLong(UpdateTemp::getSeconds).sum(); + + return new Aggregated(first.getDate(), first.getNode(), first.getCountry(), first.getTransport(), + first.getVersion(), 0, 0, hh, 0, 0, nh, 0); + } + + private Aggregated filterBoth(List list) { + UpdateTemp ut = list.get(0); + Optional bytes = list.stream().filter(it -> "bytes".equals(it.getMetric()) && it.getSeconds() > 0).findFirst(); + Optional responses = list.stream().filter(it -> "responses".equals(it.getMetric())).findFirst(); + + if (bytes.isPresent() && responses.isPresent()) { + UpdateTemp b = bytes.get(); + UpdateTemp r = responses.get(); + + double hrh = (Math.min(b.getSeconds(), r.getSeconds()) * b.getVal()) / b.getSeconds(); + + return new Aggregated(ut.getDate(), ut.getNode(), ut.getCountry(), ut.getTransport(), ut.getVersion(), + 0, 0, 0, 0, hrh, 0, 0); + } else { + return null; + } + + } + + private Aggregated filterBothLeft(List list) { + UpdateTemp ut = list.get(0); + Optional bytes = list.stream().filter(it -> "bytes".equals(it.getMetric())).findFirst(); + Optional responses = list.stream().filter(it -> "responses".equals(it.getMetric())).findFirst(); + + if (responses.isPresent() && bytes.isPresent() && bytes.get().getSeconds() > 0) { + return null; + } else { + if (responses.isPresent()) { + UpdateTemp r = responses.get(); + long bytesSeconds = 0; + double nrh = Math.max(0, r.getSeconds() - bytesSeconds); + return new Aggregated(r.getDate(), r.getNode(), r.getCountry(), r.getTransport(), r.getVersion(), + 0, 0, 0, 0, 0, 0, nrh); + } else { + return null; + } + } + + } + + +} diff --git a/src/main/java/org/torproject/metrics/onionoo/userstats/UpdateTemp.java b/src/main/java/org/torproject/metrics/onionoo/userstats/UpdateTemp.java new file mode 100644 index 0000000..003b0ea --- /dev/null +++ b/src/main/java/org/torproject/metrics/onionoo/userstats/UpdateTemp.java @@ -0,0 +1,122 @@ +package org.torproject.metrics.onionoo.userstats; + +import java.time.LocalDate; + +public class UpdateTemp { + private String fingerprint; + private String nickname; + private String node; + private String metric; + private String country; + private String transport; + private String version; + private LocalDate date; + private double val; + private long seconds; + + public UpdateTemp(String fingerprint, String nickname, String node, String metric, String country, + String transport, String version, LocalDate date, double val, long seconds) { + this.fingerprint = fingerprint; + this.nickname = nickname; + this.node = node; + this.metric = metric; + this.country = country; + this.transport = transport; + this.version = version; + this.date = date; + this.val = val; + this.seconds = seconds; + } + + public String getFingerprint() { + return fingerprint; + } + + public void setFingerprint(String fingerprint) { + this.fingerprint = fingerprint; + } + + public String getNickname() { + return nickname; + } + + public void setNickname(String nickname) { + this.nickname = nickname; + } + + public String getNode() { + return node; + } + + public void setNode(String node) { + this.node = node; + } + + public String getMetric() { + return metric; + } + + public void setMetric(String metric) { + this.metric = metric; + } + + public String getCountry() { + return country; + } + + public void setCountry(String country) { + this.country = country; + } + + public String getTransport() { + return transport; + } + + public void setTransport(String transport) { + this.transport = transport; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public LocalDate getDate() { + return date; + } + + public void setDate(LocalDate date) { + this.date = date; + } + + public double getVal() { + return val; + } + + public void setVal(double val) { + this.val = val; + } + + public long getSeconds() { + return seconds; + } + + public void setSeconds(long seconds) { + this.seconds = seconds; + } + + public String getGroupKey() { + return fingerprint + "-" + nickname + "-" + node + "-" + metric + "-" + country + "-" + transport + "-" + version + "-" + date; + } + + public String key() { + return date + "-" + node + "-" + country + "-" + transport + "-" + version; + } + + public String anotherKey() { + return date + "-" + fingerprint + "-" + nickname + "-" + node; + } +} \ No newline at end of file