Skip to content

Commit

Permalink
Fix Rollback NRTMv4 Client (#1605)
Browse files Browse the repository at this point in the history
* feat: nrtmv4 client setup

* feat: add whois-nrtm4-client everywhere

* feat: get common dependencies in whois-nrtm-client (aspectJ in them)

* feat: read unf and store main information

* feat: fix repository

* feat: clarify reader

* feat: remove unnecesary value injection

* feat: add schema creation changes

* feat: fix schema syntax error

* feat: hardcode for now nrtm baseUrl

* feat: hardcode RC

* feat: fix compilation issues

* feat: add logs and fix the name

* feat: change properties and put logs in the condition

* feat: add logs

* feat: fix log

* feat: create nrtm rest client

* feat: fix typo

* feat: run schedule task each minute

* feat: add nrtm4 client to api db endtoend modules

* feat: add source log

* feat: try catch duplicated key exception when the version already exist

* feat: refactor

* feat: remove the changes of rest client

* feat: remove unused import

* feat: change nrtm data soure configurations name

* feat: use a rowmapper in between sources call and jsonnode

* feat: fix compilation issue

* feat: add retrn statment

* feat: fix get resources

* feat: fix baseUrl

* feat: remove the slash from url

* feat: ignore unknown properties

* feat: filter by version and fix the mapper

* feat: handle when table is null

* feat: refactor for clarifications

* feat: fix sql

* feat: check not null

* feat: add logs

* feat: remove logs and add the group by to fix the query

* feat: refactor

* feat: put the dependency again in whois-api

* feat: do no restart if notification is the same

* feat: remove initializeNRTMClient module body

* feat: create nrtm dummy server

* feat: remove unused import and add the conditional

* feat: rename it by client

* feat: rename the condition

* feat: add dependsOn dependency and conditional just for the config

* feat: do not use dependsOn use conditional

* feat: add UNF client ITs

* feat: add snapshot support

* feat: add ITs of snapshot file creation

* feat: fix ITs

* feat: add hash validation

* feat: test fake hash

* feat: remove unrelated changes

* feat: unrelated changes

* feat: unrelated changes

* feat: move into confition packets and fix ITs

* feat: refactor

* feat: little refactor

* feat: catch all exceptions

* feat: add rollback

* feat: add deltas import in NRTM client, ITs remaining

* feat: refactor

* feat: refactor deltas and add ITs

* feat: rename

* feat: add stopwatch

* feat: log miliseconds

* feat: add log for loading snapshot

* feat: add log

* feat: refactor decompress and put a bigger buffer

* feat: add logs

* feat: refactor and use parallel stream

* feat: move the stopwatch

* feat: fix indentation

* feat: some performance improvements

* feat: fix logs and timer issues

* feat: save the update notification after creating the snapshot

* feat: redo previous change

* feat: add logs

* feat: refactor code

* feat: parallelise

* feat: do not process the decompression in parallel, use parallel are record level

* feat: use batches and reduce the threads

* feat: reduce threads usage

* feat: shudown the executor

* parallelise the batches

* feat: try with batches of 1k objects

* feat: use library

* feat: refactor and increase the size of the batches

* feat: remove unecessary code and logs

* feat: remove unecessary changes

* feat: update tests

* feat: fix tests

* feat: add content type as a header

* feat: add correct public key in main resources and move test public key to test resources

* feat: put the correct key

* feat: add data to serials and remaining tables

* feat: do not ignore error when inserting deltas and fix the Mocks instead

* feat: refactors

* feat: comments

* feat: process record directly without reading whole payload beforehand

* feat: change public key

* feat: rename

* feat: add ITs of reinitialisation

* feat: add transactional to repository layer for nrtm4 client

* feat: add transactional annotation covering all nrtm4 changes

* feat: fix issue with dependencies

* feat: cange the transaction name, it is colisioning with another

* feat: add conditional

* feat: remove up transactiona

* feat: add transactions to upper methods

* feat: use the correct public key

* feat: fix rollback when version info

* feat: refactor

* feat: add logs for the transaction manager

* feat: remove requires new, it is committing each row

* feat: fix transaction, do it per record

* feat: transactions are processes by each record when Parallel

* feat: clean up tables in case of error

* feat: just delete tables when snap fails, relay on transaction when deltas

* feat: remove uneccessary changes

* feat: rename transaction

* feat: manually manage the transaction for snapshots

* feat: refactor

* feat: refactor using ExecutorService and a transaction per batch

* feat: refactor

* feat: little naming refactors

* feat: use future to handle errors

* feat: Do catch exception and send error if database unreachable for truncate

* feat: change log

* feat: use repeatable read

* feat: pr change
  • Loading branch information
MiguelAHM authored Dec 17, 2024
1 parent 61bf6ed commit 93f7301
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package net.ripe.db.nrtm4.client.config;

import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.AdviceMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;


@Configuration
@Conditional(Nrtm4ClientCondition.class)
@EnableTransactionManagement(mode = AdviceMode.ASPECTJ)
public class NrtmClientTransactionConfiguration {

public static final String NRTM_CLIENT_UPDATE_TRANSACTION = "nrtm-client-update-transaction-manager";
public static final String NRTM_CLIENT_INFO_TRANSACTION = "nrtm-client-info-transaction-manager";

@Bean(name = NRTM_CLIENT_UPDATE_TRANSACTION)
public PlatformTransactionManager transactionManagerNrtmClientUpdate(@Qualifier("nrtmClientMasterDataSource") final DataSource masterDataSource) {
return new DataSourceTransactionManager(masterDataSource);
}

@Bean(name = NRTM_CLIENT_INFO_TRANSACTION)
public PlatformTransactionManager transactionManagerNrtmClientInfo(@Qualifier("nrtmClientMasterInfoSource") final DataSource masterDataSource) {
return new DataSourceTransactionManager(masterDataSource);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.ripe.db.nrtm4.client.dao;

import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration;
import net.ripe.db.whois.common.DateTimeProvider;
import net.ripe.db.whois.common.dao.jdbc.JdbcRpslObjectOperations;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -9,13 +10,15 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Nullable;
import javax.sql.DataSource;
import java.util.List;

@Repository
@Conditional(Nrtm4ClientCondition.class)
@Transactional(transactionManager = NrtmClientTransactionConfiguration.NRTM_CLIENT_INFO_TRANSACTION)
public class Nrtm4ClientInfoRepository {

private final JdbcTemplate jdbcMasterTemplate;
Expand Down Expand Up @@ -56,6 +59,7 @@ SELECT id, source, MAX(version), session_id, type, hostname, created
WHERE type = ?
GROUP BY source
""";

return jdbcSlaveTemplate.query(sql,
nrtmClientVersionRowMapper(),
NrtmClientDocumentType.NOTIFICATION.getFileNamePrefix());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package net.ripe.db.nrtm4.client.dao;

import com.fasterxml.jackson.core.JsonProcessingException;
import net.ripe.db.nrtm4.client.client.MirrorSnapshotInfo;
import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration;
import net.ripe.db.whois.common.DateTimeProvider;
import net.ripe.db.whois.common.dao.RpslObjectUpdateInfo;
import net.ripe.db.whois.common.rpsl.ObjectType;
import net.ripe.db.whois.common.rpsl.RpslObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Conditional;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Nullable;
Expand All @@ -33,10 +30,9 @@

@Repository
@Conditional(Nrtm4ClientCondition.class)
@Transactional(transactionManager = NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION)
public class Nrtm4ClientRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(Nrtm4ClientInfoRepository.class);

private final JdbcTemplate jdbcMasterTemplate;
private final JdbcTemplate jdbcSlaveTemplate;
private final DateTimeProvider dateTimeProvider;
Expand Down Expand Up @@ -69,7 +65,7 @@ public void truncateTables(){
@Nullable
public RpslObjectUpdateInfo getMirroredObjectId(final ObjectType type, final String primaryKey){
try {
return lookupRpslObjectUpdateInfo(jdbcSlaveTemplate, type, primaryKey);
return lookupRpslObjectUpdateInfo(jdbcMasterTemplate, type, primaryKey);
} catch (EmptyResultDataAccessException ex){
return null;
}
Expand All @@ -94,8 +90,7 @@ public void createIndexes(final RpslObject rpslObject, final RpslObjectUpdateInf
insertIntoTablesIgnoreMissing(jdbcMasterTemplate, rpslObjectUpdateInfo, rpslObject);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public Map.Entry<RpslObject, RpslObjectUpdateInfo> processSnapshotRecord(final MirrorSnapshotInfo mirrorSnapshotInfo) throws JsonProcessingException {
public Map.Entry<RpslObject, RpslObjectUpdateInfo> processSnapshotRecord(final MirrorSnapshotInfo mirrorSnapshotInfo) {
return Map.entry(mirrorSnapshotInfo.getRpslObject(), persistRpslObject(mirrorSnapshotInfo.getRpslObject()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import net.ripe.db.nrtm4.client.client.NrtmRestClient;
import net.ripe.db.nrtm4.client.client.UpdateNotificationFileResponse;
import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration;
import net.ripe.db.nrtm4.client.dao.Nrtm4ClientInfoRepository;
import net.ripe.db.nrtm4.client.dao.Nrtm4ClientRepository;
import net.ripe.db.whois.common.dao.RpslObjectUpdateInfo;
Expand All @@ -14,6 +15,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -66,25 +69,43 @@ public void doImport(final String source,
}

private void processPayload(final byte[] deltaFilePayload, final String sessionId, final String source) {
final Metadata metadata = persistDeltas(deltaFilePayload, sessionId);
persistDeltaVersion(source, metadata.version, metadata.sessionId);
}

private Metadata persistDeltas(final byte[] deltaFilePayload, String sessionId) {
ByteBuffer buffer = ByteBuffer.wrap(deltaFilePayload);
InputStream inputStream = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.remaining());
Metadata metadata = null;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String record;
boolean isFirstRecord = true;
while ((record = reader.readLine()) != null) {
if (isFirstRecord){
processFirstDeltaRecord(record, sessionId, source);
metadata = extractMetadata(record);
validateSession(metadata.sessionId, sessionId);
isFirstRecord = false;
continue;
}
processDeltaRecord(record);
}

if (metadata == null){
LOGGER.error("Metadata is missing in some delta");
throw new IllegalArgumentException("Metadata is missing in some delta");
}
} catch (IOException ex){
LOGGER.error("Unable to process deltas");
throw new IllegalStateException(ex);
}
return metadata;
}

private void persistDeltaVersion(final String source, final int version, final String sessionId) throws IllegalArgumentException {
nrtm4ClientInfoRepository.saveDeltaFileVersion(source, version, sessionId);
}

@Transactional(transactionManager = NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION, isolation = Isolation.REPEATABLE_READ)
private void applyDeltaRecord(final MirrorDeltaInfo deltaInfo){
if (deltaInfo.getAction().equals(MirrorDeltaInfo.Action.ADD_MODIFY)){

Expand Down Expand Up @@ -131,7 +152,6 @@ private boolean serialExist(final RpslObjectUpdateInfo rpslObjectUpdateInfo) {
return false;
}


private void processDeltaRecord(final String records) {
final JSONObject jsonObject = new JSONObject(records);
final String deltaAction = jsonObject.getString("action");
Expand All @@ -147,14 +167,15 @@ private void processDeltaRecord(final String records) {
deltaPrimaryKey));
}

private void processFirstDeltaRecord(final String firstRecord, final String sessionId, final String source){
final Metadata metadata = getMetadata(firstRecord);
if (!metadata.sessionId().equals(sessionId)){
LOGGER.error("The session {} is not the same in the UNF and snapshot {}", metadata.sessionId(), sessionId);
truncateTables();
throw new IllegalArgumentException("The session is not the same in the UNF and snapshot");
private Metadata extractMetadata(final String firstRecord){
return getMetadata(firstRecord);
}

private void validateSession(final String metadataSessionId, final String sessionId) throws IllegalArgumentException{
if (!metadataSessionId.equals(sessionId)){
LOGGER.error("The session {} is not the same in the UNF and delta {}", metadataSessionId, sessionId);
throw new IllegalArgumentException("The session is not the same in the UNF and delta");
}
nrtm4ClientInfoRepository.saveDeltaFileVersion(source, metadata.version, metadata.sessionId());
}

private static Metadata getMetadata(final String records) {
Expand Down
Loading

0 comments on commit 93f7301

Please sign in to comment.