Skip to content

Commit

Permalink
Apply BSA diffs to the database
Browse files Browse the repository at this point in the history
Add a step in BsaDownloadAction to apply the diffs from the most recent
download. This includes:

- Add new labels and remove deleted labels to the `BsaLabel` table.
- Add new entries to the `BsaDomainInUse` table if applicable. These are
  registered/reserved domains that match the new labels.
- Generate list of domains that are not blocked and save them in a GCS
  file. These include all domains from the previous step as well as
  domains derived from invalid labels.

More tests are needed but this PR successfully processed a real
download.
  • Loading branch information
weiminyu committed Nov 29, 2023
1 parent 73de4c5 commit 42eb420
Show file tree
Hide file tree
Showing 30 changed files with 1,250 additions and 94 deletions.
43 changes: 43 additions & 0 deletions common/src/main/java/google/registry/util/BatchedStreams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.util;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterators.partition;
import static com.google.common.collect.Iterators.transform;
import static com.google.common.collect.Streams.stream;
import static java.lang.Math.min;

import com.google.common.collect.ImmutableList;
import java.util.stream.Stream;

/** Utilities for breaking up a {@link Stream} into batches. */
public final class BatchedStreams {

static final int MAX_BATCH = 1024 * 1024;

private BatchedStreams() {}

/**
* Transform a flat {@link Stream} into a {@code Stream} of batches.
*
* <p>Closing the returned stream does not close the original stream.
*/
public static <T> Stream<ImmutableList<T>> batch(Stream<T> stream, int batchSize) {
checkArgument(batchSize > 0, "batchSize must be a positive integer.");
return stream(
transform(partition(stream.iterator(), min(MAX_BATCH, batchSize)), ImmutableList::copyOf));
}
}
59 changes: 59 additions & 0 deletions common/src/test/java/google/registry/util/BatchedStreamsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.util;

import static com.google.common.truth.Truth.assertThat;
import static google.registry.util.BatchedStreams.batch;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableList;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;

/** Unit tests for {@link BatchedStreams}. */
public class BatchedStreamsTest {

@Test
void invalidBatchSize() {
assertThat(assertThrows(IllegalArgumentException.class, () -> batch(Stream.of(), 0)))
.hasMessageThat()
.contains("must be a positive integer");
}

@Test
void batch_success() {
Stream<Integer> data = IntStream.rangeClosed(0, 1_000_000).boxed();
assertThat(batch(data, 1000).map(ImmutableList::size).collect(groupingBy(x -> x, counting())))
.containsExactly(1000, 1000L, 1, 1L);
}

@Test
void batch_partialBatch() {
Stream<Integer> data = Stream.of(1, 2, 3);
assertThat(batch(data, 1000).map(ImmutableList::size).collect(groupingBy(x -> x, counting())))
.containsExactly(3, 1L);
}

@Test
void batch_truncateBatchSize() {
Stream<Integer> data = IntStream.range(0, 1024 * 2048).boxed();
assertThat(
batch(data, 2_000_000).map(ImmutableList::size).collect(groupingBy(x -> x, counting())))
.containsExactly(1024 * 1024, 2L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ LazyBlockList tryFetch(BlockList blockList) {
try (InputStream errorStream = connection.getErrorStream()) {
errorDetails = new String(ByteStreams.toByteArray(errorStream), UTF_8);
} catch (Exception e) {
// ignore
errorDetails = "Failed to retrieve error message: " + e.getMessage();
}
throw new BsaException(
String.format(
Expand Down
46 changes: 26 additions & 20 deletions core/src/main/java/google/registry/bsa/BsaDiffCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package google.registry.bsa;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.newHashMap;
Expand Down Expand Up @@ -198,25 +197,32 @@ Stream<Order> getOrders() {
}

Stream<Label> getLabels() {
return Stream.concat(
newAndRemaining.asMap().entrySet().stream()
.filter(e -> e.getValue().size() > 1 || !e.getValue().contains(ORDER_ID_SENTINEL))
.map(
entry -> {
verify(!entry.getValue().isEmpty(), "Unexpected empty set");
LabelType labelType =
entry.getValue().contains(ORDER_ID_SENTINEL)
? LabelType.NEW_ORDER_ASSOCIATION
: LabelType.CREATE;
return Label.of(
entry.getKey(),
labelType,
idnChecker.getAllValidIdns(entry.getKey()).stream()
.map(IdnTableEnum::name)
.collect(toImmutableSet()));
}),
Sets.difference(deleted.keySet(), newAndRemaining.keySet()).stream()
.map(label -> Label.of(label, LabelType.DELETE, ImmutableSet.of())));
return Stream.of(
newAndRemaining.asMap().entrySet().stream()
.filter(e -> e.getValue().size() > 1 || !e.getValue().contains(ORDER_ID_SENTINEL))
.filter(entry -> entry.getValue().contains(ORDER_ID_SENTINEL))
.map(
entry ->
Label.of(
entry.getKey(),
LabelType.NEW_ORDER_ASSOCIATION,
idnChecker.getAllValidIdns(entry.getKey()).stream()
.map(IdnTableEnum::name)
.collect(toImmutableSet()))),
newAndRemaining.asMap().entrySet().stream()
.filter(e -> e.getValue().size() > 1 || !e.getValue().contains(ORDER_ID_SENTINEL))
.filter(entry -> !entry.getValue().contains(ORDER_ID_SENTINEL))
.map(
entry ->
Label.of(
entry.getKey(),
LabelType.CREATE,
idnChecker.getAllValidIdns(entry.getKey()).stream()
.map(IdnTableEnum::name)
.collect(toImmutableSet()))),
Sets.difference(deleted.keySet(), newAndRemaining.keySet()).stream()
.map(label -> Label.of(label, LabelType.DELETE, ImmutableSet.of())))
.flatMap(x -> x);
}
}

Expand Down
84 changes: 83 additions & 1 deletion core/src/main/java/google/registry/bsa/BsaDownloadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,34 @@

import static google.registry.bsa.BlockList.BLOCK;
import static google.registry.bsa.BlockList.BLOCK_PLUS;
import static google.registry.bsa.api.JsonSerializations.toCompletedOrdersReport;
import static google.registry.bsa.api.JsonSerializations.toInProgressOrdersReport;
import static google.registry.bsa.api.JsonSerializations.toUnblockableDomainsReport;
import static google.registry.bsa.persistence.LabelDiffs.applyLabelDiff;
import static google.registry.request.Action.Method.POST;
import static google.registry.util.BatchedStreams.batch;
import static javax.servlet.http.HttpServletResponse.SC_OK;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.FluentLogger;
import dagger.Lazy;
import google.registry.bsa.BlockListFetcher.LazyBlockList;
import google.registry.bsa.BsaDiffCreator.BsaDiff;
import google.registry.bsa.api.BsaReportSender;
import google.registry.bsa.api.Label;
import google.registry.bsa.api.NonBlockedDomain;
import google.registry.bsa.api.Order;
import google.registry.bsa.persistence.DownloadSchedule;
import google.registry.bsa.persistence.DownloadScheduler;
import google.registry.config.RegistryConfig.Config;
import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import java.util.Optional;
import java.util.stream.Stream;
import javax.inject.Inject;

@Action(
Expand All @@ -44,29 +57,40 @@ public class BsaDownloadAction implements Runnable {

static final String PATH = "/_dr/task/bsaDownload";

private static final Splitter LINE_SPLITTER = Splitter.on('\n');

private final DownloadScheduler downloadScheduler;
private final BlockListFetcher blockListFetcher;
private final BsaDiffCreator diffCreator;
private final BsaReportSender bsaReportSender;
private final GcsClient gcsClient;
private final Lazy<IdnChecker> lazyIdnChecker;
private final BsaLock bsaLock;
private final Clock clock;
private final int labelTxnBatchSize;
private final Response response;

@Inject
BsaDownloadAction(
DownloadScheduler downloadScheduler,
BlockListFetcher blockListFetcher,
BsaDiffCreator diffCreator,
BsaReportSender bsaReportSender,
GcsClient gcsClient,
Lazy<IdnChecker> lazyIdnChecker,
BsaLock bsaLock,
Clock clock,
@Config("bsaLabelTxnBatchSize") int labelTxnBatchSize,
Response response) {
this.downloadScheduler = downloadScheduler;
this.blockListFetcher = blockListFetcher;
this.diffCreator = diffCreator;
this.bsaReportSender = bsaReportSender;
this.gcsClient = gcsClient;
this.lazyIdnChecker = lazyIdnChecker;
this.bsaLock = bsaLock;
this.clock = clock;
this.labelTxnBatchSize = labelTxnBatchSize;
this.response = response;
}

Expand All @@ -89,6 +113,7 @@ Void runWithinLock() {
logger.atInfo().log("Nothing to do.");
return null;
}
BsaDiff diff = null;
DownloadSchedule schedule = scheduleOptional.get();
switch (schedule.stage()) {
case DOWNLOAD:
Expand Down Expand Up @@ -121,15 +146,72 @@ Void runWithinLock() {
}
// Fall through
case MAKE_DIFF:
BsaDiff diff = diffCreator.createDiff(schedule, lazyIdnChecker.get());
diff = diffCreator.createDiff(schedule, lazyIdnChecker.get());
gcsClient.writeOrderDiffs(schedule.jobName(), diff.getOrders());
gcsClient.writeLabelDiffs(schedule.jobName(), diff.getLabels());
schedule.updateJobStage(DownloadStage.APPLY_DIFF);
// Fall through
case APPLY_DIFF:
try (Stream<Label> labels =
diff != null ? diff.getLabels() : gcsClient.readLabelDiffs(schedule.jobName())) {
Stream<ImmutableList<Label>> batches = batch(labels, labelTxnBatchSize);
gcsClient.writeUnblockableDomains(
schedule.jobName(),
batches
.map(
batch ->
applyLabelDiff(batch, lazyIdnChecker.get(), schedule, clock.nowUtc()))
.flatMap(ImmutableList::stream));
}
schedule.updateJobStage(DownloadStage.START_UPLOADING);
// Fall through
case START_UPLOADING:
try (Stream<Order> orders = gcsClient.readOrderDiffs(schedule.jobName())) {
// We expect that all order instances and the json string can fit in memory.
Optional<String> report = toInProgressOrdersReport(orders);
if (report.isPresent()) {
// Log report data
gcsClient.logInProgressOrderReport(
schedule.jobName(), LINE_SPLITTER.splitToStream(report.get()));
bsaReportSender.sendOrderStatusReport(report.get());
} else {
logger.atInfo().log("No new or deleted orders in this round.");
}
}
schedule.updateJobStage(DownloadStage.UPLOAD_DOMAINS_IN_USE);
// Fall through
case UPLOAD_DOMAINS_IN_USE:
try (Stream<NonBlockedDomain> unblockables =
gcsClient.readUnblockableDomains(schedule.jobName())) {
/* The number of unblockable domains may be huge in theory (label x ~50 tlds), but in
* practice should be relatively small (tens of thousands?). Batches can be introduced
* if size becomes a problem.
*/
Optional<String> report = toUnblockableDomainsReport(unblockables);
if (report.isPresent()) {
gcsClient.logUnblockableDomainsReport(
schedule.jobName(), LINE_SPLITTER.splitToStream(report.get()));
// During downloads, unblockable domains are only added, not removed.
bsaReportSender.addUnblockableDomainsUpdates(report.get());
} else {
logger.atInfo().log("No changes in the set of unblockable domains in this round.");
}
}
schedule.updateJobStage(DownloadStage.FINISH_UPLOADING);
// Fall through
case FINISH_UPLOADING:
try (Stream<Order> orders = gcsClient.readOrderDiffs(schedule.jobName())) {
// Orders are expected to be few, so the report can be kept in memory.
Optional<String> report = toCompletedOrdersReport(orders);
if (report.isPresent()) {
// Log report data
gcsClient.logCompletedOrderReport(
schedule.jobName(), LINE_SPLITTER.splitToStream(report.get()));
bsaReportSender.sendOrderStatusReport(report.get());
}
}
schedule.updateJobStage(DownloadStage.DONE);
return null;
case DONE:
case NOP:
case CHECKSUMS_NOT_MATCH:
Expand Down
Loading

0 comments on commit 42eb420

Please sign in to comment.