Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply BSA diffs to the database #2229

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions common/src/main/java/google/registry/util/BatchedStreams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.util;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterators.partition;
import static com.google.common.collect.Iterators.transform;
import static com.google.common.collect.Streams.stream;
import static java.lang.Math.min;

import com.google.common.collect.ImmutableList;
import java.util.stream.Stream;

/** Utilities for breaking up a {@link Stream} into batches. */
public final class BatchedStreams {

static final int MAX_BATCH = 1024 * 1024;

private BatchedStreams() {}

/**
* Transform a flat {@link Stream} into a {@code Stream} of batches.
*
* <p>Closing the returned stream does not close the original stream.
*/
public static <T> Stream<ImmutableList<T>> batch(Stream<T> stream, int batchSize) {
checkArgument(batchSize > 0, "batchSize must be a positive integer.");
return stream(
transform(partition(stream.iterator(), min(MAX_BATCH, batchSize)), ImmutableList::copyOf));
}
}
59 changes: 59 additions & 0 deletions common/src/test/java/google/registry/util/BatchedStreamsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.util;

import static com.google.common.truth.Truth.assertThat;
import static google.registry.util.BatchedStreams.batch;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableList;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;

/** Unit tests for {@link BatchedStreams}. */
public class BatchedStreamsTest {

@Test
void invalidBatchSize() {
assertThat(assertThrows(IllegalArgumentException.class, () -> batch(Stream.of(), 0)))
.hasMessageThat()
.contains("must be a positive integer");
}

@Test
void batch_success() {
Stream<Integer> data = IntStream.rangeClosed(0, 1_000_000).boxed();
assertThat(batch(data, 1000).map(ImmutableList::size).collect(groupingBy(x -> x, counting())))
.containsExactly(1000, 1000L, 1, 1L);
}

@Test
void batch_partialBatch() {
Stream<Integer> data = Stream.of(1, 2, 3);
assertThat(batch(data, 1000).map(ImmutableList::size).collect(groupingBy(x -> x, counting())))
.containsExactly(3, 1L);
}

@Test
void batch_truncateBatchSize() {
Stream<Integer> data = IntStream.range(0, 1024 * 2048).boxed();
assertThat(
batch(data, 2_000_000).map(ImmutableList::size).collect(groupingBy(x -> x, counting())))
.containsExactly(1024 * 1024, 2L);
}
}
130 changes: 130 additions & 0 deletions core/src/main/java/google/registry/bsa/BlockListFetcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package google.registry.bsa;

import static java.nio.charset.StandardCharsets.UTF_8;
import static javax.servlet.http.HttpServletResponse.SC_OK;

import com.google.api.client.http.HttpMethods;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import google.registry.bsa.api.BsaCredential;
import google.registry.bsa.api.BsaException;
import google.registry.config.RegistryConfig.Config;
import google.registry.request.UrlConnectionService;
import google.registry.util.Retrier;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.util.function.BiConsumer;
import javax.inject.Inject;
import javax.net.ssl.HttpsURLConnection;

/** Fetches data from the BSA API. */
public class BlockListFetcher {

private final UrlConnectionService urlConnectionService;
private final BsaCredential credential;

private final ImmutableMap<String, String> blockListUrls;
private final Retrier retrier;

@Inject
BlockListFetcher(
UrlConnectionService urlConnectionService,
BsaCredential credential,
@Config("bsaDataUrls") ImmutableMap<String, String> blockListUrls,
Retrier retrier) {
this.urlConnectionService = urlConnectionService;
this.credential = credential;
this.blockListUrls = blockListUrls;
this.retrier = retrier;
}

LazyBlockList fetch(BlockList blockList) {
// TODO: use more informative exceptions to describe retriable errors
return retrier.callWithRetry(
() -> tryFetch(blockList),
e -> e instanceof BsaException && ((BsaException) e).isRetriable());
}

LazyBlockList tryFetch(BlockList blockList) {
try {
HttpsURLConnection connection =
(HttpsURLConnection)
urlConnectionService.createConnection(
new java.net.URL(blockListUrls.get(blockList.name())));
connection.setRequestMethod(HttpMethods.GET);
connection.setRequestProperty("Authorization", "Bearer " + credential.getAuthToken());
int code = connection.getResponseCode();
if (code != SC_OK) {
String errorDetails = "";
try (InputStream errorStream = connection.getErrorStream()) {
errorDetails = new String(ByteStreams.toByteArray(errorStream), UTF_8);
} catch (NullPointerException e) {
// No error message.
} catch (Exception e) {
errorDetails = "Failed to retrieve error message: " + e.getMessage();
}
throw new BsaException(
String.format(
"Status code: [%s], error: [%s], details: [%s]",
code, connection.getResponseMessage(), errorDetails),
/* retriable= */ true);
}
return new LazyBlockList(blockList, connection);
} catch (IOException e) {
throw new BsaException(e, /* retriable= */ true);
} catch (GeneralSecurityException e) {
throw new BsaException(e, /* retriable= */ false);
}
}

static class LazyBlockList implements Closeable {

private final BlockList blockList;

private final HttpsURLConnection connection;

LazyBlockList(BlockList blockList, HttpsURLConnection connection) {
this.blockList = blockList;
this.connection = connection;
}

BlockList getName() {
return blockList;
}

String peekChecksum() {
return "TODO"; // Depends on BSA impl: header or first line of file
}

void consumeAll(BiConsumer<byte[], Integer> consumer) throws IOException {
try (InputStream inputStream = connection.getInputStream()) {
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
consumer.accept(buffer, bytesRead);
}
}
}

@Override
public void close() {
connection.disconnect();
}
}
}
Loading