Skip to content

Commit

Permalink
ANON-221 - Fix aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
yumirkov committed Nov 7, 2024
1 parent 0e59443 commit 7f178af
Show file tree
Hide file tree
Showing 7 changed files with 447 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.torproject.metrics.onionoo.updater;

import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.torproject.descriptor.*;
Expand All @@ -9,6 +10,7 @@
import org.torproject.metrics.onionoo.userstats.*;

import java.util.*;
import java.util.stream.Collectors;

public class UserStatsStatusUpdater implements DescriptorListener, StatusUpdater {

Expand All @@ -23,6 +25,8 @@ public class UserStatsStatusUpdater implements DescriptorListener, StatusUpdater

private List<Imported> imported = new ArrayList<>();

private NewAggregator newAggregator = new NewAggregator();

public UserStatsStatusUpdater() {
this.descriptorSource = DescriptorSourceFactory.getDescriptorSource();
this.documentStore = DocumentStoreFactory.getDocumentStore();
Expand Down Expand Up @@ -102,8 +106,8 @@ private void parseRelayDirreqWriteHistory(String fingerprint,
} else if (i == 1) {
break;
}
insertIntoImported(fingerprint, nickname, "relay", "bytes", "",
"", "", fromMillis, toMillis, writtenBytes);
insertIntoImported(fingerprint, nickname, "relay", "bytes", null,
null, null, fromMillis, toMillis, writtenBytes);
}
}
}
Expand Down Expand Up @@ -159,10 +163,10 @@ private void parseRelayDirreqV3Resp(String fingerprint,
String country = e.getKey();
double val = resp * intervalFraction * e.getValue() / total;
insertIntoImported(fingerprint, nickname, "relay",
"responses", country, "", "", fromMillis, toMillis, val);
"responses", country, null, null, fromMillis, toMillis, val);
}
insertIntoImported(fingerprint, nickname, "relay", "responses",
"", "", "", fromMillis, toMillis, resp * intervalFraction);
null, null, null, fromMillis, toMillis, resp * intervalFraction);
}
}
}
Expand All @@ -177,7 +181,7 @@ private void parseRelayNetworkStatusConsensus(RelayNetworkStatusConsensus consen
String nickname = statusEntry.getNickname();
if (statusEntry.getFlags().contains("Running")) {
insertIntoImported(fingerprint, nickname, "relay", "status",
"", "", "", fromMillis, toMillis, 0.0);
null, null, null, fromMillis, toMillis, 0.0);
}
}
}
Expand Down Expand Up @@ -205,12 +209,17 @@ void insertIntoImported(String fingerprint, String nickname, String node,
@Override
public void updateStatuses() {
logger.error("Imported size: {}", imported.size());
List<Merged> merge = DataProcessor.merge(imported);
logger.error("Merged size: {}", merge.size());
List<Aggregated> aggregated = DataProcessor.aggregate(merge);
logger.error("Aggregated size: {}", aggregated.size());
logger.error("Aggregated: {}", aggregated);
List<Estimated> estimated = DataProcessor.estimate(aggregated);
// List<Merged> merge = DataProcessor.merge(imported);
List<Merged> merge1 = Merger.mergeFirstTime(imported);
// logger.error("Merged size: {}", merge.size());
logger.error("Merged size1: {}", merge1.size());
// List<Aggregated> aggregated = DataProcessor.aggregate(merge);
List<Aggregated> aggregate = newAggregator.aggregate(merge1);
// logger.error("Aggregated size: {}", aggregated.size());
logger.error("Aggregated1 size: {}", aggregate.size());
// logger.error("Aggregated: {}", aggregated);
logger.error("Aggregated1: {}", aggregate);
List<Estimated> estimated = DataProcessor.estimate(aggregate);
logger.error("Estimated size: {}", estimated.size());
logger.error("Estimated: {}", estimated);
this.documentStore.store(new UserStatsStatus(estimated));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.torproject.metrics.onionoo.userstats;

import java.time.LocalDate;

public class Aggregated {
private long date;
private LocalDate date;
private String node;
private String country;
private String transport;
Expand All @@ -14,7 +16,7 @@ public class Aggregated {
private double nh;
private double nrh;

public Aggregated(long date,
public Aggregated(LocalDate date,
String node,
String country,
String transport,
Expand All @@ -40,11 +42,11 @@ public Aggregated(long date,
this.nrh = nrh;
}

public long getDate() {
public LocalDate getDate() {
return date;
}

public void setDate(long date) {
public void setDate(LocalDate date) {
this.date = date;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.torproject.metrics.onionoo.userstats;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.*;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand All @@ -13,35 +11,60 @@ public class DataProcessor {

// Method to merge imported data into merged data
public static List<Merged> merge(List<Imported> importedList) {
int idCounter = 1;
List<Merged> mergedList = new ArrayList<>();

// Group Imported entries by unique keys and process each group
Map<String, List<Imported>> groupedImports = importedList.stream().collect(
Collectors.groupingBy(imp -> String.join("-",
imp.getFingerprint(), imp.getNickname(), imp.getNode(),
imp.getMetric(), imp.getCountry(), imp.getTransport(), imp.getVersion()))
);
// Step 1: Group by unique fields (fingerprint, nickname, node, metric, country, transport, version)
Map<String, List<Imported>> groupedImported = importedList.stream().collect(Collectors.groupingBy(
imported -> String.join("-", imported.getFingerprint(), imported.getNickname(),
imported.getNode(), imported.getMetric(),
imported.getCountry(), imported.getTransport(), imported.getVersion())
));

// Step 2: Process each group independently
for (List<Imported> group : groupedImported.values()) {
// Sort each group by startTime to ensure intervals are processed in sequence
group.sort(Comparator.comparing(Imported::getStatsStart));

// Initialize variables to track merging within the group
long lastStartTime = group.get(0).getStatsStart();
long lastEndTime = group.get(0).getStatsEnd();
double lastVal = group.get(0).getVal();

// Use first entry to initialize shared fields for Merged
String fingerprint = group.get(0).getFingerprint();
String nickname = group.get(0).getNickname();
String node = group.get(0).getNode();
String metric = group.get(0).getMetric();
String country = group.get(0).getCountry();
String transport = group.get(0).getTransport();
String version = group.get(0).getVersion();

int idCounter = 1;
for (List<Imported> group : groupedImports.values()) {
Merged mergedEntry = new Merged(idCounter++,
group.get(0).getFingerprint(), group.get(0).getNickname(), group.get(0).getNode(),
group.get(0).getMetric(), group.get(0).getCountry(), group.get(0).getTransport(),
group.get(0).getVersion(), group.get(0).getStatsStart(), group.get(0).getStatsEnd(), 0);

// Merge intervals within each group
for (Imported imp : group) {
if (imp.getStatsStart() > mergedEntry.getStatsEnd()) {
// If there’s a gap, create a new merged entry
mergedEntry.setStatsEnd(imp.getStatsEnd());
mergedEntry.setVal(mergedEntry.getVal() + imp.getVal());
// Merge intervals within the sorted group
for (int i = 1; i < group.size(); i++) {
Imported current = group.get(i);

if (current.getStatsStart() <= lastEndTime) {
// Overlapping or adjacent interval, extend the end time and accumulate the value
lastEndTime = Math.max(lastEndTime, current.getStatsEnd());
lastVal += current.getVal();
} else {
mergedEntry.setStatsEnd(Math.max(imp.getStatsEnd(), mergedEntry.getStatsEnd()));
mergedEntry.setVal(mergedEntry.getVal() + imp.getVal());
// No overlap, add the previous merged interval to mergedList
mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country,
transport, version, lastStartTime, lastEndTime, lastVal));

// Start a new interval
lastStartTime = current.getStatsStart();
lastEndTime = current.getStatsEnd();
lastVal = current.getVal();
}
}
mergedList.add(mergedEntry);

// Add the last merged interval of the group to mergedList
mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country,
transport, version, lastStartTime, lastEndTime, lastVal));
}

return mergedList;
}

Expand All @@ -59,7 +82,7 @@ public static List<Aggregated> aggregate(List<Merged> mergedList) {
);

for (List<Merged> group : groupedMerges.values()) {
long date = group.get(0).getStatsStart();
LocalDate date = Instant.ofEpochMilli(group.get(0).getStatsStart()).atZone(ZoneOffset.UTC).toLocalDate();
String node = group.get(0).getNode();
String country = group.get(0).getCountry();
String transport = group.get(0).getTransport();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package org.torproject.metrics.onionoo.userstats;

import java.time.LocalDate;

public class Estimated {

private long date;
private LocalDate date;
private String node;
private String country;
private String transport;
private String version;
private int frac;
private int users;

public Estimated(long date, String node, String country, String transport, String version, int frac, int users) {
public Estimated(LocalDate date, String node, String country, String transport, String version, int frac, int users) {
this.date = date;
this.node = node;
this.country = country;
Expand All @@ -20,11 +22,11 @@ public Estimated(long date, String node, String country, String transport, Strin
this.users = users;
}

public long getDate() {
public LocalDate getDate() {
return date;
}

public void setDate(long date) {
public void setDate(LocalDate date) {
this.date = date;
}

Expand Down
65 changes: 65 additions & 0 deletions src/main/java/org/torproject/metrics/onionoo/userstats/Merger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.torproject.metrics.onionoo.userstats;

import java.util.*;
import java.util.stream.Collectors;

public class Merger {
private static int idCounter = 1;

public static List<Merged> mergeFirstTime(List<Imported> importedList) {
List<Merged> mergedList = new ArrayList<>();

// Step 1: Group by unique fields (fingerprint, nickname, node, metric, country, transport, version)
Map<String, List<Imported>> groupedImported = importedList.stream().collect(Collectors.groupingBy(
imported -> String.join("-", imported.getFingerprint(), imported.getNickname(),
imported.getNode(), imported.getMetric(),
imported.getCountry(), imported.getTransport(), imported.getVersion())
));

// Step 2: Process each group independently
for (List<Imported> group : groupedImported.values()) {
// Sort each group by startTime to ensure intervals are processed in sequence
group.sort(Comparator.comparing(Imported::getStatsStart));

// Initialize variables to track merging within the group
long lastStartTime = group.get(0).getStatsStart();
long lastEndTime = group.get(0).getStatsEnd();
double lastVal = group.get(0).getVal();

// Use first entry to initialize shared fields for Merged
String fingerprint = group.get(0).getFingerprint();
String nickname = group.get(0).getNickname();
String node = group.get(0).getNode();
String metric = group.get(0).getMetric();
String country = group.get(0).getCountry();
String transport = group.get(0).getTransport();
String version = group.get(0).getVersion();

// Merge intervals within the sorted group
for (int i = 1; i < group.size(); i++) {
Imported current = group.get(i);

if (current.getStatsStart() <= lastEndTime) {
// Overlapping or adjacent interval, extend the end time and accumulate the value
lastEndTime = Math.max(lastEndTime, current.getStatsEnd());
lastVal += current.getVal();
} else {
// No overlap, add the previous merged interval to mergedList
mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country,
transport, version, lastStartTime, lastEndTime, lastVal));

// Start a new interval
lastStartTime = current.getStatsStart();
lastEndTime = current.getStatsEnd();
lastVal = current.getVal();
}
}

// Add the last merged interval of the group to mergedList
mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country,
transport, version, lastStartTime, lastEndTime, lastVal));
}

return mergedList;
}
}
Loading

0 comments on commit 7f178af

Please sign in to comment.