Skip to content

Commit

Permalink
ANON-221 - Refactor aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
yumirkov committed Nov 8, 2024
1 parent 7f178af commit ac42e58
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 218 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.torproject.metrics.onionoo.updater;

import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.torproject.descriptor.*;
Expand All @@ -10,7 +9,6 @@
import org.torproject.metrics.onionoo.userstats.*;

import java.util.*;
import java.util.stream.Collectors;

public class UserStatsStatusUpdater implements DescriptorListener, StatusUpdater {

Expand All @@ -25,7 +23,7 @@ public class UserStatsStatusUpdater implements DescriptorListener, StatusUpdater

private List<Imported> imported = new ArrayList<>();

private NewAggregator newAggregator = new NewAggregator();
private Aggregator newAggregator = new Aggregator();

public UserStatsStatusUpdater() {
this.descriptorSource = DescriptorSourceFactory.getDescriptorSource();
Expand Down Expand Up @@ -106,7 +104,7 @@ private void parseRelayDirreqWriteHistory(String fingerprint,
} else if (i == 1) {
break;
}
insertIntoImported(fingerprint, nickname, "relay", "bytes", null,
insertIntoImported(fingerprint, nickname, Metric.BYTES, null,
null, null, fromMillis, toMillis, writtenBytes);
}
}
Expand Down Expand Up @@ -162,10 +160,10 @@ private void parseRelayDirreqV3Resp(String fingerprint,
for (Map.Entry<String, Double> e : requestsCopy.entrySet()) {
String country = e.getKey();
double val = resp * intervalFraction * e.getValue() / total;
insertIntoImported(fingerprint, nickname, "relay",
"responses", country, null, null, fromMillis, toMillis, val);
insertIntoImported(fingerprint, nickname,
Metric.RESPONSES, country, null, null, fromMillis, toMillis, val);
}
insertIntoImported(fingerprint, nickname, "relay", "responses",
insertIntoImported(fingerprint, nickname, Metric.RESPONSES,
null, null, null, fromMillis, toMillis, resp * intervalFraction);
}
}
Expand All @@ -180,22 +178,21 @@ private void parseRelayNetworkStatusConsensus(RelayNetworkStatusConsensus consen
.toUpperCase();
String nickname = statusEntry.getNickname();
if (statusEntry.getFlags().contains("Running")) {
insertIntoImported(fingerprint, nickname, "relay", "status",
insertIntoImported(fingerprint, nickname,Metric.STATUS,
null, null, null, fromMillis, toMillis, 0.0);
}
}
}

void insertIntoImported(String fingerprint, String nickname, String node,
String metric, String country, String transport, String version,
void insertIntoImported(String fingerprint, String nickname,
Metric metric, String country, String transport, String version,
long fromMillis, long toMillis, double val) {
if (fromMillis > toMillis) {
return;
}
imported.add(new Imported(
fingerprint,
nickname,
node,
metric,
country,
transport,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

public class Aggregated {
private LocalDate date;
private String node;
private String country;
private String transport;
private String version;
Expand All @@ -17,7 +16,6 @@ public class Aggregated {
private double nrh;

public Aggregated(LocalDate date,
String node,
String country,
String transport,
String version,
Expand All @@ -29,7 +27,6 @@ public Aggregated(LocalDate date,
double nh,
double nrh) {
this.date = date;
this.node = node;
this.country = country;
this.transport = transport;
this.version = version;
Expand All @@ -50,14 +47,6 @@ public void setDate(LocalDate date) {
this.date = date;
}

public String getNode() {
return node;
}

public void setNode(String node) {
this.node = node;
}

public String getCountry() {
return country;
}
Expand Down Expand Up @@ -142,7 +131,6 @@ public void setNrh(double nrh) {
public String toString() {
return "Aggregated{" +
"date=" + date +
", node=" + node +
", country='" + country + '\'' +
", transport='" + transport + '\'' +
", version='" + version + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.*;
import java.util.stream.Collectors;

public class NewAggregator {
public class Aggregator {

public List<Aggregated> aggregate(List<Merged> merged) {

Expand All @@ -20,7 +20,7 @@ public List<Aggregated> aggregate(List<Merged> merged) {
List<Aggregated> aggregatedList = new ArrayList<>();

Collection<Aggregated> first = updateTemp.values().stream()
.filter(ut -> "responses".equals(ut.getMetric()))
.filter(ut -> ut.getMetric() == Metric.RESPONSES)
.collect(
Collectors.groupingBy(UpdateTemp::key,
Collectors.collectingAndThen(Collectors.toList(), this::aggregateToAggregated)
Expand All @@ -34,7 +34,7 @@ public List<Aggregated> aggregate(List<Merged> merged) {
.collect(Collectors.toList());

Map<LocalDate, Aggregated> collect = noDimension.stream()
.filter(nd -> "bytes".equals(nd.getMetric()))
.filter(nd -> nd.getMetric() == Metric.BYTES)
.collect(Collectors.groupingBy(
UpdateTemp::getDate,
Collectors.collectingAndThen(Collectors.toList(), this::aggregateToBytes)
Expand All @@ -50,7 +50,7 @@ public List<Aggregated> aggregate(List<Merged> merged) {
});

Map<LocalDate, Double> status = noDimension.stream()
.filter(nd -> "status".equals(nd.getMetric()))
.filter(nd -> nd.getMetric() == Metric.STATUS)
.collect(Collectors.groupingBy(UpdateTemp::getDate, Collectors.summingDouble(UpdateTemp::getSeconds)));

aggregatedList.forEach(a -> {
Expand Down Expand Up @@ -92,7 +92,7 @@ public List<Aggregated> aggregate(List<Merged> merged) {
}

private String groupKey(Merged m) {
return m.getFingerprint() + "-" + m.getNickname() + "-" + m.getNode() + "-" + m.getMetric() + "-"
return m.getFingerprint() + "-" + m.getNickname() + "-" + m.getMetric() + "-"
+ m.getCountry() + "-" + m.getTransport() + "-" + m.getVersion() + "-" + epochToLocalDate(m.getStatsStart());
}

Expand All @@ -112,7 +112,7 @@ private UpdateTemp aggregateToUpdateTemp(List<Merged> list) {
long totalSeconds = list.stream().mapToLong(m -> (m.getStatsEnd() - m.getStatsStart()) / 1000).sum();
double totalVal = list.stream().mapToDouble(Merged::getVal).sum();

return new UpdateTemp(first.getFingerprint(), first.getNickname(), first.getNode(), first.getMetric(),
return new UpdateTemp(first.getFingerprint(), first.getNickname(), first.getMetric(),
first.getCountry(), first.getTransport(), first.getVersion(),
epochToLocalDate(first.getStatsStart()), totalVal, totalSeconds);
}
Expand All @@ -122,7 +122,7 @@ private Aggregated aggregateToAggregated(List<UpdateTemp> list) {
double rrx = list.stream().mapToDouble(UpdateTemp::getVal).sum();
long nrx = list.stream().mapToLong(UpdateTemp::getSeconds).sum();

return new Aggregated(first.getDate(), first.getNode(), first.getCountry(), first.getTransport(),
return new Aggregated(first.getDate(), first.getCountry(), first.getTransport(),
first.getVersion(), rrx, nrx, 0, 0, 0, 0, 0);
}

Expand All @@ -131,22 +131,22 @@ private Aggregated aggregateToBytes(List<UpdateTemp> list) {
double hh = list.stream().mapToDouble(UpdateTemp::getVal).sum();
long nh = list.stream().mapToLong(UpdateTemp::getSeconds).sum();

return new Aggregated(first.getDate(), first.getNode(), first.getCountry(), first.getTransport(),
return new Aggregated(first.getDate(), first.getCountry(), first.getTransport(),
first.getVersion(), 0, 0, hh, 0, 0, nh, 0);
}

private Aggregated filterBoth(List<UpdateTemp> list) {
UpdateTemp ut = list.get(0);
Optional<UpdateTemp> bytes = list.stream().filter(it -> "bytes".equals(it.getMetric()) && it.getSeconds() > 0).findFirst();
Optional<UpdateTemp> responses = list.stream().filter(it -> "responses".equals(it.getMetric())).findFirst();
Optional<UpdateTemp> bytes = list.stream().filter(it -> it.getMetric() == Metric.BYTES && it.getSeconds() > 0).findFirst();
Optional<UpdateTemp> responses = list.stream().filter(it -> it.getMetric() == Metric.RESPONSES).findFirst();

if (bytes.isPresent() && responses.isPresent()) {
UpdateTemp b = bytes.get();
UpdateTemp r = responses.get();

double hrh = (Math.min(b.getSeconds(), r.getSeconds()) * b.getVal()) / b.getSeconds();

return new Aggregated(ut.getDate(), ut.getNode(), ut.getCountry(), ut.getTransport(), ut.getVersion(),
return new Aggregated(ut.getDate(), ut.getCountry(), ut.getTransport(), ut.getVersion(),
0, 0, 0, 0, hrh, 0, 0);
} else {
return null;
Expand All @@ -155,9 +155,8 @@ private Aggregated filterBoth(List<UpdateTemp> list) {
}

private Aggregated filterBothLeft(List<UpdateTemp> list) {
UpdateTemp ut = list.get(0);
Optional<UpdateTemp> bytes = list.stream().filter(it -> "bytes".equals(it.getMetric())).findFirst();
Optional<UpdateTemp> responses = list.stream().filter(it -> "responses".equals(it.getMetric())).findFirst();
Optional<UpdateTemp> bytes = list.stream().filter(it -> it.getMetric() == Metric.BYTES).findFirst();
Optional<UpdateTemp> responses = list.stream().filter(it -> it.getMetric() == Metric.RESPONSES).findFirst();

if (responses.isPresent() && bytes.isPresent() && bytes.get().getSeconds() > 0) {
return null;
Expand All @@ -166,7 +165,7 @@ private Aggregated filterBothLeft(List<UpdateTemp> list) {
UpdateTemp r = responses.get();
long bytesSeconds = 0;
double nrh = Math.max(0, r.getSeconds() - bytesSeconds);
return new Aggregated(r.getDate(), r.getNode(), r.getCountry(), r.getTransport(), r.getVersion(),
return new Aggregated(r.getDate(), r.getCountry(), r.getTransport(), r.getVersion(),
0, 0, 0, 0, 0, 0, nrh);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,121 +1,13 @@
package org.torproject.metrics.onionoo.userstats;

import java.time.*;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DataProcessor {

// Method to merge imported data into merged data
public static List<Merged> merge(List<Imported> importedList) {
int idCounter = 1;
List<Merged> mergedList = new ArrayList<>();

// Step 1: Group by unique fields (fingerprint, nickname, node, metric, country, transport, version)
Map<String, List<Imported>> groupedImported = importedList.stream().collect(Collectors.groupingBy(
imported -> String.join("-", imported.getFingerprint(), imported.getNickname(),
imported.getNode(), imported.getMetric(),
imported.getCountry(), imported.getTransport(), imported.getVersion())
));

// Step 2: Process each group independently
for (List<Imported> group : groupedImported.values()) {
// Sort each group by startTime to ensure intervals are processed in sequence
group.sort(Comparator.comparing(Imported::getStatsStart));

// Initialize variables to track merging within the group
long lastStartTime = group.get(0).getStatsStart();
long lastEndTime = group.get(0).getStatsEnd();
double lastVal = group.get(0).getVal();

// Use first entry to initialize shared fields for Merged
String fingerprint = group.get(0).getFingerprint();
String nickname = group.get(0).getNickname();
String node = group.get(0).getNode();
String metric = group.get(0).getMetric();
String country = group.get(0).getCountry();
String transport = group.get(0).getTransport();
String version = group.get(0).getVersion();

// Merge intervals within the sorted group
for (int i = 1; i < group.size(); i++) {
Imported current = group.get(i);

if (current.getStatsStart() <= lastEndTime) {
// Overlapping or adjacent interval, extend the end time and accumulate the value
lastEndTime = Math.max(lastEndTime, current.getStatsEnd());
lastVal += current.getVal();
} else {
// No overlap, add the previous merged interval to mergedList
mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country,
transport, version, lastStartTime, lastEndTime, lastVal));

// Start a new interval
lastStartTime = current.getStatsStart();
lastEndTime = current.getStatsEnd();
lastVal = current.getVal();
}
}

// Add the last merged interval of the group to mergedList
mergedList.add(new Merged(idCounter++, fingerprint, nickname, node, metric, country,
transport, version, lastStartTime, lastEndTime, lastVal));
}

return mergedList;
}

public static List<Aggregated> aggregate(List<Merged> mergedList) {
List<Aggregated> aggregatedList = new ArrayList<>();

// Group merged entries by date and unique key attributes
Map<String, List<Merged>> groupedMerges = mergedList.stream().collect(
Collectors.groupingBy(merge -> String.join("-",
Instant.ofEpochMilli(merge.getStatsStart())
.atZone(ZoneId.of("UTC"))
.toLocalDateTime()
.toLocalDate().toString(), merge.getNode(),
merge.getCountry(), merge.getTransport(), merge.getVersion()))
);

for (List<Merged> group : groupedMerges.values()) {
LocalDate date = Instant.ofEpochMilli(group.get(0).getStatsStart()).atZone(ZoneOffset.UTC).toLocalDate();
String node = group.get(0).getNode();
String country = group.get(0).getCountry();
String transport = group.get(0).getTransport();
String version = group.get(0).getVersion();

// Initialize aggregates
double rrx = 0, nrx = 0, hh = 0, nn = 0, hrh = 0, nh = 0, nrh = 0;

// Sum values for each metric type
for (Merged merged : group) {
if (merged.getMetric().equals("responses")) {
rrx += merged.getVal();
nrx += getDurationSeconds(merged.getStatsStart(), merged.getStatsEnd());
} else if (merged.getMetric().equals("bytes")) {
hh += merged.getVal();
nh += getDurationSeconds(merged.getStatsStart(), merged.getStatsEnd());
} else if (merged.getMetric().equals("status")) {
nn += getDurationSeconds(merged.getStatsStart(), merged.getStatsEnd());
}
if (merged.getMetric().equals("bytes") && rrx > 0) {
hrh += Math.min(hh, rrx);
}
if (merged.getMetric().equals("responses") && hh == 0) {
nrh += nrx;
}
}

Aggregated aggregatedEntry = new Aggregated(date, node, country, transport, version, rrx, nrx, hh, nn, hrh, nh, nrh);
aggregatedList.add(aggregatedEntry);
}
return aggregatedList;
}

public static List<Estimated> estimate(List<Aggregated> aggregatedList) {
List<Estimated> estimatedList = new ArrayList<>();

Expand All @@ -136,7 +28,6 @@ public static List<Estimated> estimate(List<Aggregated> aggregatedList) {
// if (agg.getDate().isBefore(LocalDate.now().minusDays(1))) {
Estimated estimated = new Estimated(
agg.getDate(),
agg.getNode(),
agg.getCountry(),
agg.getTransport(),
agg.getVersion(),
Expand All @@ -151,7 +42,6 @@ public static List<Estimated> estimate(List<Aggregated> aggregatedList) {

// Sort results by date, node, version, transport, and country (similar to SQL ORDER BY clause)
estimatedList.sort(Comparator.comparing(Estimated::getDate)
.thenComparing(Estimated::getNode)
.thenComparing(Estimated::getVersion)
.thenComparing(Estimated::getTransport)
.thenComparing(Estimated::getCountry));
Expand Down
Loading

0 comments on commit ac42e58

Please sign in to comment.