Skip to content

Commit

Permalink
NRTM Client Persist Deltas (#1577)
Browse files Browse the repository at this point in the history
* feat: nrtmv4 client setup

* feat: add whois-nrtm4-client everywhere

* feat: get common dependencies in whois-nrtm-client (aspectJ in them)

* feat: read unf and store main information

* feat: fix repository

* feat: clarify reader

* feat: remove unnecesary value injection

* feat: add schema creation changes

* feat: fix schema syntax error

* feat: hardcode for now nrtm baseUrl

* feat: hardcode RC

* feat: fix compilation issues

* feat: add logs and fix the name

* feat: change properties and put logs in the condition

* feat: add logs

* feat: fix log

* feat: create nrtm rest client

* feat: fix typo

* feat: run schedule task each minute

* feat: add nrtm4 client to api db endtoend modules

* feat: add source log

* feat: try catch duplicated key exception when the version already exist

* feat: refactor

* feat: remove the changes of rest client

* feat: remove unused import

* feat: change nrtm data soure configurations name

* feat: use a rowmapper in between sources call and jsonnode

* feat: fix compilation issue

* feat: add retrn statment

* feat: fix get resources

* feat: fix baseUrl

* feat: remove the slash from url

* feat: ignore unknown properties

* feat: filter by version and fix the mapper

* feat: handle when table is null

* feat: refactor for clarifications

* feat: fix sql

* feat: check not null

* feat: add logs

* feat: remove logs and add the group by to fix the query

* feat: refactor

* feat: put the dependency again in whois-api

* feat: do no restart if notification is the same

* feat: remove initializeNRTMClient module body

* feat: create nrtm dummy server

* feat: remove unused import and add the conditional

* feat: rename it by client

* feat: rename the condition

* feat: add dependsOn dependency and conditional just for the config

* feat: do not use dependsOn use conditional

* feat: add UNF client ITs

* feat: add snapshot support

* feat: add ITs of snapshot file creation

* feat: fix ITs

* feat: add hash validation

* feat: test fake hash

* feat: remove unrelated changes

* feat: unrelated changes

* feat: unrelated changes

* feat: move into confition packets and fix ITs

* feat: refactor

* feat: little refactor

* feat: catch all exceptions

* feat: add rollback

* feat: add deltas import in NRTM client, ITs remaining

* feat: refactor

* feat: refactor deltas and add ITs

* feat: rename

* feat: add stopwatch

* feat: log miliseconds

* feat: add log for loading snapshot

* feat: add log

* feat: refactor decompress and put a bigger buffer

* feat: add logs

* feat: refactor and use parallel stream

* feat: move the stopwatch

* feat: fix indentation

* feat: some performance improvements

* feat: fix logs and timer issues

* feat: save the update notification after creating the snapshot

* feat: redo previous change

* feat: add logs

* feat: refactor code

* feat: parallelise

* feat: do not process the decompression in parallel, use parallel are record level

* feat: use batches and reduce the threads

* feat: reduce threads usage

* feat: shudown the executor

* parallelise the batches

* feat: try with batches of 1k objects

* feat: use library

* feat: refactor and increase the size of the batches

* feat: remove unecessary code and logs

* feat: remove unecessary changes

* feat: update tests

* feat: fix tests

* feat: add content type as a header

* feat: add correct public key in main resources and move test public key to test resources

* feat: put the correct key

* feat: add data to serials and remaining tables

* feat: do not ignore error when inserting deltas and fix the Mocks instead

* feat: refactors

* feat: comments

* feat: process record directly without reading whole payload beforehand

* feat: change public key

* feat: rename

* feat: add ITs of reinitialisation

* feat: refactor

* feat: rename and remove unecessary import
  • Loading branch information
MiguelAHM authored Dec 5, 2024
1 parent 6d467a9 commit 46ad52f
Show file tree
Hide file tree
Showing 27 changed files with 703 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package net.ripe.db.nrtm4.client.client;

import com.fasterxml.jackson.annotation.JsonValue;
import net.ripe.db.whois.common.rpsl.ObjectType;
import net.ripe.db.whois.common.rpsl.RpslObject;

public class MirrorDeltaInfo extends MirrorObjectInfo {

public enum Action {
DELETE,
ADD_MODIFY;

@JsonValue
public String toLowerCaseName() {
return name().toLowerCase();
}
}

private final Action action;
private final ObjectType objectType;
private final String primaryKey;

public MirrorDeltaInfo() {
super(null);
this.objectType = null;
this.primaryKey = null;
this.action = null;
}

public MirrorDeltaInfo(final RpslObject rpslObject) {
super(rpslObject);
this.objectType = null;
this.primaryKey = null;
this.action = null;
}

public MirrorDeltaInfo(final RpslObject object,
final String action,
final String objectType,
final String primaryKey) {
super(object);
this.action = Action.valueOf(action.toUpperCase());
this.objectType = objectType != null ? ObjectType.valueOf(objectType) : null;
this.primaryKey = primaryKey;
}

public Action getAction() {
return action;
}

public ObjectType getObjectType() {
return objectType;
}

public String getPrimaryKey() {
return primaryKey;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package net.ripe.db.nrtm4.client.client;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import net.ripe.db.whois.common.rpsl.RpslObject;

public abstract class MirrorObjectInfo {

@JsonDeserialize(using = RpslObjectDeserializer.class)
private final RpslObject object;

public MirrorObjectInfo(RpslObject object) {
this.object = object;
}

public RpslObject getRpslObject() {
return object;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package net.ripe.db.nrtm4.client.client;


public class MirrorSnapshotInfo extends MirrorObjectInfo {

public MirrorSnapshotInfo() {
super(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider;
import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
import com.google.common.net.HttpHeaders;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.core.MediaType;
Expand Down Expand Up @@ -82,7 +83,8 @@ public List<String> getNrtmAvailableSources(){
public String getNotificationFileSignature(final String source){
return client.target(String.format("%s/%s", baseUrl, source))
.path("update-notification-file.jose")
.request("application/jose+json")
.request()
.header(HttpHeaders.CONTENT_TYPE, "application/jose+json")
.get(String.class);
}

Expand All @@ -102,6 +104,21 @@ public byte[] getSnapshotFile(final String url){
}
}

@Nullable
public byte[] getDeltaFile(final String url){
try {
final Response response = client.target(baseUrl)
.path(url)
.request(MediaType.APPLICATION_JSON_TYPE)
.get(Response.class);

return response.readEntity(byte[].class);
} catch (Exception ex){
LOGGER.error("Unable to get the records from the snapshot", ex);
return null;
}
}

private static List<String> extractSources(final String html) {
final List<String> sources = com.google.common.collect.Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import com.google.common.collect.Lists;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -13,11 +14,14 @@ public class UpdateNotificationFileResponse {
private final String sessionID;
private final long version;
private final NrtmFileLink snapshot;
private final List<NrtmFileLink> deltas;


public UpdateNotificationFileResponse() {
this.sessionID = null;
this.version = 0L;
this.snapshot = null;
this.deltas = Lists.newArrayList();
}

@Nullable
Expand All @@ -34,6 +38,10 @@ public NrtmFileLink getSnapshot(){
return snapshot;
}

public List<NrtmFileLink> getDeltas() {
return deltas;
}

public static class NrtmFileLink {

private final long version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,21 @@
import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import net.ripe.db.whois.common.DateTimeProvider;
import net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Conditional;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

import javax.annotation.Nullable;
import javax.sql.DataSource;
import java.util.List;

@Repository
@Conditional(Nrtm4ClientCondition.class)
public class Nrtm4ClientInfoRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(Nrtm4ClientInfoRepository.class);

private final JdbcTemplate jdbcMasterTemplate;
private final JdbcTemplate jdbcSlaveTemplate;
private final DateTimeProvider dateTimeProvider;
Expand All @@ -38,6 +37,12 @@ public void saveUpdateNotificationFileVersion(final String source,
saveVersionInfo(source, version, sessionID, serviceName, NrtmClientDocumentType.NOTIFICATION);
}

public void saveDeltaFileVersion(final String source,
final long version,
final String sessionID){
saveVersionInfo(source, version, sessionID, null, NrtmClientDocumentType.DELTA);
}

public void saveSnapshotFileVersion(final String source,
final long version,
final String sessionID){
Expand All @@ -52,15 +57,24 @@ SELECT id, source, MAX(version), session_id, type, hostname, created
GROUP BY source
""";
return jdbcSlaveTemplate.query(sql,
(rs, rn) -> new NrtmClientVersionInfo(
rs.getLong(1),
rs.getString(2),
rs.getLong(3),
rs.getString(4),
NrtmClientDocumentType.fromValue(rs.getString(5)),
rs.getString(6),
rs.getLong(7)
), NrtmClientDocumentType.NOTIFICATION.getFileNamePrefix());
nrtmClientVersionRowMapper(),
NrtmClientDocumentType.NOTIFICATION.getFileNamePrefix());
}

@Nullable
public NrtmClientVersionInfo getNrtmLastVersionInfoForDeltasPerSource(final String source){
try {
return jdbcMasterTemplate.queryForObject("""
SELECT id, source, MAX(version), session_id, type, hostname, created
FROM version_info
WHERE type = ?
AND source = ?
""",
nrtmClientVersionRowMapper(),
NrtmClientDocumentType.DELTA.getFileNamePrefix(), source);
} catch (EmptyResultDataAccessException e) {
return null;
}
}

public void truncateTables(){
Expand All @@ -83,4 +97,16 @@ private void saveVersionInfo(
serviceName,
now);
}

private static RowMapper<NrtmClientVersionInfo> nrtmClientVersionRowMapper() {
return (rs, rn) -> new NrtmClientVersionInfo(
rs.getLong(1),
rs.getString(2),
rs.getLong(3),
rs.getString(4),
NrtmClientDocumentType.fromValue(rs.getString(5)),
rs.getString(6),
rs.getLong(7)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package net.ripe.db.nrtm4.client.dao;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.ripe.db.nrtm4.client.client.MirrorRpslObject;
import net.ripe.db.nrtm4.client.client.MirrorSnapshotInfo;
import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import net.ripe.db.whois.common.DateTimeProvider;
import net.ripe.db.whois.common.dao.RpslObjectUpdateInfo;
import net.ripe.db.whois.common.rpsl.ObjectType;
import net.ripe.db.whois.common.rpsl.RpslObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,8 +22,14 @@
import java.util.List;
import java.util.Map;

import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.copyToHistoryAndUpdateSerials;
import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.deleteFromLastAndUpdateSerials;
import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.deleteFromTables;
import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.insertIntoLastAndUpdateSerials;
import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.insertIntoTables;
import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.insertIntoTablesIgnoreMissing;
import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.lookupRpslObjectUpdateInfo;
import static net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations.updateLastAndUpdateSerials;

@Repository
@Conditional(Nrtm4ClientCondition.class)
Expand Down Expand Up @@ -61,13 +67,20 @@ public void truncateTables(){
}

@Nullable
public Integer getMirroredObjectId(final String primaryKey){
// TODO: There can be two objects with same primaryKey, we don't have single identifier for it
public RpslObjectUpdateInfo getMirroredObjectId(final ObjectType type, final String primaryKey){
try {
return jdbcMasterTemplate.queryForObject("SELECT object_id FROM last WHERE pkey = ?",
Integer.class,
primaryKey);
} catch (EmptyResultDataAccessException e) {
return lookupRpslObjectUpdateInfo(jdbcSlaveTemplate, type, primaryKey);
} catch (EmptyResultDataAccessException ex){
return null;
}
}

@Nullable
public Integer getSerialByObjectId(final int objectId, final int sequenceId) {
final String query = "SELECT serial_id FROM serials WHERE object_id = ? AND sequence_id = ?";
try {
return jdbcSlaveTemplate.queryForObject(query, Integer.class, objectId, sequenceId);
} catch (EmptyResultDataAccessException ex){
return null;
}
}
Expand All @@ -82,8 +95,20 @@ public void createIndexes(final RpslObject rpslObject, final RpslObjectUpdateInf
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public Map.Entry<RpslObject, RpslObjectUpdateInfo> processObject(final String record) throws JsonProcessingException {
final MirrorRpslObject mirrorRpslObject = new ObjectMapper().readValue(record, MirrorRpslObject.class);
return Map.entry(mirrorRpslObject.getObject(), persistRpslObject(mirrorRpslObject.getObject()));
public Map.Entry<RpslObject, RpslObjectUpdateInfo> processSnapshotRecord(final MirrorSnapshotInfo mirrorSnapshotInfo) throws JsonProcessingException {
return Map.entry(mirrorSnapshotInfo.getRpslObject(), persistRpslObject(mirrorSnapshotInfo.getRpslObject()));
}

public void removeMirroredObjectAndUpdateSerials(final RpslObjectUpdateInfo rpslObjectInfo){
deleteFromTables(jdbcMasterTemplate, rpslObjectInfo);
copyToHistoryAndUpdateSerials(jdbcMasterTemplate, rpslObjectInfo);
deleteFromLastAndUpdateSerials(dateTimeProvider, jdbcMasterTemplate, rpslObjectInfo);
}

public void updateMirroredObject(final RpslObject rpslObject, final RpslObjectUpdateInfo rpslObjectUpdateInfo){
deleteFromTables(jdbcMasterTemplate, rpslObjectUpdateInfo);
insertIntoTables(jdbcMasterTemplate, rpslObjectUpdateInfo, rpslObject);
copyToHistoryAndUpdateSerials(jdbcMasterTemplate, rpslObjectUpdateInfo);
updateLastAndUpdateSerials(dateTimeProvider, jdbcMasterTemplate, rpslObjectUpdateInfo, rpslObject);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package net.ripe.db.nrtm4.client.dao;

import io.netty.util.internal.StringUtil;

import javax.annotation.Nullable;

public enum NrtmClientDocumentType {
SNAPSHOT("nrtm-snapshot"),
DELTA("nrtm-delta"),
NOTIFICATION("update-notification-file");

private final String fileNamePrefix;
Expand All @@ -14,7 +19,12 @@ public String getFileNamePrefix() {
return fileNamePrefix;
}

@Nullable
public static NrtmClientDocumentType fromValue(String value) {
if (StringUtil.isNullOrEmpty(value)){
return null;
}

for (NrtmClientDocumentType enumConstant : NrtmClientDocumentType.values()) {
if (enumConstant.getFileNamePrefix().equals(value)) {
return enumConstant;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package net.ripe.db.nrtm4.client.importer;

import net.ripe.db.nrtm4.client.dao.Nrtm4ClientInfoRepository;
import net.ripe.db.nrtm4.client.dao.Nrtm4ClientRepository;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import static org.apache.commons.codec.binary.Hex.encodeHexString;

public abstract class AbstractMirrorImporter {

final Nrtm4ClientInfoRepository nrtm4ClientInfoRepository;

final Nrtm4ClientRepository nrtm4ClientRepository;

public AbstractMirrorImporter(final Nrtm4ClientInfoRepository nrtm4ClientInfoRepository,
final Nrtm4ClientRepository nrtm4ClientRepository){
this.nrtm4ClientInfoRepository = nrtm4ClientInfoRepository;
this.nrtm4ClientRepository = nrtm4ClientRepository;
}

String calculateSha256(final byte[] bytes) {
try {
final MessageDigest digest = MessageDigest.getInstance("SHA-256");
final byte[] encodedSha256hex = digest.digest(bytes);
return encodeHexString(encodedSha256hex);
} catch (final NoSuchAlgorithmException e) {
throw new IllegalStateException(e);
}
}

public void truncateTables(){
nrtm4ClientInfoRepository.truncateTables();
nrtm4ClientRepository.truncateTables();
}
}
Loading

0 comments on commit 46ad52f

Please sign in to comment.