Skip to content

Commit

Permalink
Update collection of Breg Enhetsregister
Browse files Browse the repository at this point in the history
  • Loading branch information
oranheim committed Oct 4, 2024
1 parent d78930c commit 24edc61
Showing 1 changed file with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.descoped.dc.samples.enhetsregisteret;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.descoped.config.DynamicConfiguration;
import io.descoped.config.StoreBasedDynamicConfiguration;
import io.descoped.dc.api.Builders;
Expand All @@ -16,9 +17,7 @@
import io.descoped.dc.api.util.JsonParser;
import io.descoped.dc.core.executor.Worker;
import io.descoped.dc.core.handler.Queries;
import io.descoped.rawdata.api.RawdataClient;
import io.descoped.rawdata.api.RawdataClientInitializer;
import io.descoped.rawdata.api.RawdataConsumer;
import io.descoped.rawdata.api.RawdataMessage;
import io.descoped.service.provider.api.ProviderConfigurator;
import org.junit.jupiter.api.Disabled;
Expand All @@ -30,6 +29,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.IntStream;

import static io.descoped.dc.api.Builders.addContent;
import static io.descoped.dc.api.Builders.console;
Expand Down Expand Up @@ -73,7 +74,7 @@ public class EnhetsregisteretUpdatesWorkerTest {
.header("accept", "application/json")
.variable("baseURL", "https://data.brreg.no/enhetsregisteret/api")
//.variable("offsetDate", "2020-10-05T00:00:00.000Z")
.variable("offsetDate", "2021-03-11T00:00:00.000Z")
.variable("offsetDate", "2024-10-01T12:00:00.000Z")
.variable("page", "0")
.variable("pageSize", "20")
)
Expand Down Expand Up @@ -122,7 +123,7 @@ public class EnhetsregisteretUpdatesWorkerTest {
)
.function(get("event-document")
.url("${eventURL}")
.validate(status().success(200).fail(400).fail(404).fail(500))
.validate(status().success(200).fail(400).fail(404).fail(410).fail(500))
.pipe(addContent("${position}", "event"))
);

Expand All @@ -131,7 +132,7 @@ public class EnhetsregisteretUpdatesWorkerTest {
public void thatWorkerCollectEnhetsregisteret() throws InterruptedException {
Worker.newBuilder()
.configuration(configuration.asMap())
.stopAtNumberOfIterations(5)
.stopAtNumberOfIterations(500)
.printConfiguration()
.specification(specificationBuilder)
.build()
Expand All @@ -140,16 +141,23 @@ public void thatWorkerCollectEnhetsregisteret() throws InterruptedException {

@Disabled
@Test
void consumeLocalRawdataStore() {
try (RawdataClient client = ProviderConfigurator.configure(configuration.asMap(), configuration.evaluateToString("rawdata.client.provider"), RawdataClientInitializer.class)) {
try (RawdataConsumer consumer = client.consumer(configuration.evaluateToString("rawdata.topic"))) {
void consumeLocalRawdataStore() throws Exception {
int pos = 0;
JsonParser jsonParser = JsonParser.createJsonParser();
Function<byte[], String> toJson = (bytes) -> jsonParser.toPrettyJSON(jsonParser.fromJson(new String(bytes), ObjectNode.class));

try (var client = ProviderConfigurator.configure(configuration.asMap(), configuration.evaluateToString("rawdata.client.provider"), RawdataClientInitializer.class)) {
try (var consumer = client.consumer(configuration.evaluateToString("rawdata.topic"))) {
RawdataMessage message;
while ((message = consumer.receive(1, TimeUnit.SECONDS)) != null) {
System.out.printf("position: %s%n--> %s%n", message.position(), new String(message.get("enhet")));
while ((message = consumer.receive(1, TimeUnit.SECONDS)) != null && pos++ < 5) {
var buf = new StringBuffer();
buf.append(IntStream.range(0, 80).mapToObj(i -> "-").reduce("", String::concat)).append("\n");
buf.append("position: ").append(message.position()).append("\n");
buf.append("feed-element:\n").append(toJson.apply(message.get("entry"))).append("\n");
buf.append("enhet-document (see feed-element -> href):\n").append(toJson.apply(message.get("event"))).append("\n");
System.out.print(buf);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down

0 comments on commit 24edc61

Please sign in to comment.