From 15efc0d50cb96e421b127cd0f0973e27d7594c09 Mon Sep 17 00:00:00 2001 From: Alex Almanza <115671044+earocorn@users.noreply.github.com> Date: Fri, 13 Sep 2024 16:09:09 -0500 Subject: [PATCH] Allow more flexible database purge policy configuration (#268) --- .../database/IObsSystemDbAutoPurgePolicy.java | 4 ++- .../system/HistoricalObsAutoPurgeConfig.java | 8 ++++- .../system/MaxAgeAutoPurgePolicy.java | 35 ++++++++++++++----- .../database/system/SystemDriverDatabase.java | 30 +++++++++------- .../system/SystemDriverDatabaseConfig.java | 4 ++- 5 files changed, 58 insertions(+), 23 deletions(-) diff --git a/sensorhub-core/src/main/java/org/sensorhub/api/database/IObsSystemDbAutoPurgePolicy.java b/sensorhub-core/src/main/java/org/sensorhub/api/database/IObsSystemDbAutoPurgePolicy.java index a98a20675..6e484727f 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/api/database/IObsSystemDbAutoPurgePolicy.java +++ b/sensorhub-core/src/main/java/org/sensorhub/api/database/IObsSystemDbAutoPurgePolicy.java @@ -16,6 +16,7 @@ import org.slf4j.Logger; +import java.util.Collection; /** *

@@ -35,6 +36,7 @@ public interface IObsSystemDbAutoPurgePolicy * for this aging policy * @param db * @param log + * @param systemUIDs */ - public void trimStorage(IObsSystemDatabase db, Logger log); + public void trimStorage(IObsSystemDatabase db, Logger log, Collection systemUIDs); } diff --git a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/HistoricalObsAutoPurgeConfig.java b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/HistoricalObsAutoPurgeConfig.java index 160d40a76..1b58fdc5f 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/HistoricalObsAutoPurgeConfig.java +++ b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/HistoricalObsAutoPurgeConfig.java @@ -17,6 +17,8 @@ import org.sensorhub.api.config.DisplayInfo; import org.sensorhub.api.database.IObsSystemDbAutoPurgePolicy; +import java.util.ArrayList; +import java.util.List; /** *

@@ -36,6 +38,10 @@ public abstract class HistoricalObsAutoPurgeConfig @DisplayInfo(label="Purge Execution Period", desc="Execution period of the purge policy (in seconds)") public double purgePeriod = 3600.0; - + @DisplayInfo.Required + @DisplayInfo.FieldType(DisplayInfo.FieldType.Type.SYSTEM_UID) + @DisplayInfo(label="System UIDs", desc="Unique IDs of system drivers to purge") + public List systemUIDs = new ArrayList<>(List.of("*")); + public abstract IObsSystemDbAutoPurgePolicy getPolicy(); } diff --git a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/MaxAgeAutoPurgePolicy.java b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/MaxAgeAutoPurgePolicy.java index fb3bebe89..cdff13b3b 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/MaxAgeAutoPurgePolicy.java +++ b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/MaxAgeAutoPurgePolicy.java @@ -25,6 +25,7 @@ import org.sensorhub.api.datastore.system.SystemFilter; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Collection; import org.sensorhub.api.database.IObsSystemDatabase; import org.slf4j.Logger; import org.vast.util.DateTimeFormat; @@ -52,17 +53,19 @@ public class MaxAgeAutoPurgePolicy implements IObsSystemDbAutoPurgePolicy @Override - public void trimStorage(IObsSystemDatabase db, Logger log) + public void trimStorage(IObsSystemDatabase db, Logger log, Collection systemUIDs) { // remove all systems, datastreams, commandstreams and fois whose validity time period // ended before (now - max age) var oldestRecordTime = Instant.now().minusSeconds((long)config.maxRecordAge); - + long numProcRemoved = db.getSystemDescStore().removeEntries(new SystemFilter.Builder() .withValidTime(new TemporalFilter.Builder() .withOperator(RangeOp.CONTAINS) .withRange(Instant.MIN, oldestRecordTime) .build()) + .withUniqueIDs(systemUIDs) + .includeMembers(true) .build()); long numFoisRemoved = db.getFoiStore().removeEntries(new FoiFilter.Builder() @@ -70,6 +73,8 @@ public void trimStorage(IObsSystemDatabase db, Logger log) .withOperator(RangeOp.CONTAINS) .withRange(Instant.MIN, oldestRecordTime) .build()) + .withUniqueIDs(systemUIDs) + .includeMembers(true) .build()); long numDsRemoved = db.getDataStreamStore().removeEntries(new DataStreamFilter.Builder() @@ -77,6 +82,10 @@ public void trimStorage(IObsSystemDatabase db, Logger log) .withOperator(RangeOp.CONTAINS) .withRange(Instant.MIN, oldestRecordTime) .build()) + .withSystems(new SystemFilter.Builder() + .withUniqueIDs(systemUIDs) + .includeMembers(true) + .build()) .build()); long numCsRemoved = db.getCommandStreamStore().removeEntries(new CommandStreamFilter.Builder() @@ -84,15 +93,22 @@ public void trimStorage(IObsSystemDatabase db, Logger log) .withOperator(RangeOp.CONTAINS) .withRange(Instant.MIN, oldestRecordTime) .build()) + .withSystems(new SystemFilter.Builder() + .withUniqueIDs(systemUIDs) + .includeMembers(true) + .build()) .build()); // for each remaining datastream, remove all obs with a timestamp older than // the latest result time minus the max age long numObsRemoved = 0; - var allDataStreams = db.getDataStreamStore().selectEntries(db.getDataStreamStore().selectAllFilter()).iterator(); - while (allDataStreams.hasNext()) + var dataStreams = db.getDataStreamStore() + .selectEntries(new DataStreamFilter.Builder() + .withSystems(new SystemFilter.Builder() + .withUniqueIDs(systemUIDs).includeMembers(true).build()).build()).iterator(); + while (dataStreams.hasNext()) { - var dsEntry = allDataStreams.next(); + var dsEntry = dataStreams.next(); var dsID = dsEntry.getKey().getInternalID(); var resultTimeRange = dsEntry.getValue().getResultTimeRange(); @@ -109,10 +125,13 @@ public void trimStorage(IObsSystemDatabase db, Logger log) // for each remaining command stream, remove all commands and status with a timestamp older than // the latest issue time minus the max age long numCmdRemoved = 0; - var allCmdStreams = db.getCommandStreamStore().selectEntries(db.getCommandStreamStore().selectAllFilter()).iterator(); - while (allCmdStreams.hasNext()) + var cmdStreams = db.getCommandStreamStore().selectEntries( + new CommandStreamFilter.Builder() + .withSystems(new SystemFilter.Builder() + .withUniqueIDs(systemUIDs).includeMembers(true).build()).build()).iterator(); + while (cmdStreams.hasNext()) { - var dsEntry = allCmdStreams.next(); + var dsEntry = cmdStreams.next(); var dsID = dsEntry.getKey().getInternalID(); var issueTimeRange = dsEntry.getValue().getIssueTimeRange(); diff --git a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabase.java b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabase.java index 172a97648..9257b8ec3 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabase.java +++ b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabase.java @@ -80,21 +80,27 @@ protected void doStart() throws SensorHubException { throw new DataStoreException("Cannot instantiate underlying database " + config.dbConfig.moduleClass, e); } + + if(!config.autoPurgeConfig.isEmpty()) + autoPurgeTimer = new Timer(); // start auto-purge timer thread if policy is specified and enabled - if (config.autoPurgeConfig != null && config.autoPurgeConfig.enabled) + for(var autoPurgeConfig : config.autoPurgeConfig) { - final IObsSystemDbAutoPurgePolicy policy = config.autoPurgeConfig.getPolicy(); - autoPurgeTimer = new Timer(); - TimerTask task = new TimerTask() { - public void run() - { - if (!db.isReadOnly()) - policy.trimStorage(db, logger); - } - }; - - autoPurgeTimer.schedule(task, 0, (long)(config.autoPurgeConfig.purgePeriod*1000)); + if (autoPurgeConfig != null && autoPurgeConfig.enabled) + { + var uids = Collections.unmodifiableCollection(autoPurgeConfig.systemUIDs); + final IObsSystemDbAutoPurgePolicy policy = autoPurgeConfig.getPolicy(); + TimerTask task = new TimerTask() { + public void run() + { + if (!db.isReadOnly()) + policy.trimStorage(db, logger, uids); + } + }; + + autoPurgeTimer.schedule(task, 0, (long)(autoPurgeConfig.purgePeriod*1000)); + } } } diff --git a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabaseConfig.java b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabaseConfig.java index 9efe0d9fb..7669b92bc 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabaseConfig.java +++ b/sensorhub-core/src/main/java/org/sensorhub/impl/database/system/SystemDriverDatabaseConfig.java @@ -14,6 +14,8 @@ package org.sensorhub.impl.database.system; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.TreeSet; import org.sensorhub.api.config.DisplayInfo; @@ -46,7 +48,7 @@ public class SystemDriverDatabaseConfig extends DatabaseConfig @DisplayInfo(label="Automatic Purge Policy", desc="Policy for automatically purging historical data") - public HistoricalObsAutoPurgeConfig autoPurgeConfig; + public List autoPurgeConfig = new ArrayList<>(); @DisplayInfo(desc="Minimum period between database commits (in ms)")