Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow more flexible database purge policy configuration #268

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.slf4j.Logger;

import java.util.Collection;

/**
* <p>
Expand All @@ -35,6 +36,7 @@ public interface IObsSystemDbAutoPurgePolicy
* for this aging policy
* @param db
* @param log
* @param systemUIDs
*/
public void trimStorage(IObsSystemDatabase db, Logger log);
public void trimStorage(IObsSystemDatabase db, Logger log, Collection<String> systemUIDs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.sensorhub.api.config.DisplayInfo;
import org.sensorhub.api.database.IObsSystemDbAutoPurgePolicy;

import java.util.ArrayList;
import java.util.List;

/**
* <p>
Expand All @@ -36,6 +38,10 @@ public abstract class HistoricalObsAutoPurgeConfig
@DisplayInfo(label="Purge Execution Period", desc="Execution period of the purge policy (in seconds)")
public double purgePeriod = 3600.0;


@DisplayInfo.Required
@DisplayInfo.FieldType(DisplayInfo.FieldType.Type.SYSTEM_UID)
@DisplayInfo(label="System UIDs", desc="Unique IDs of system drivers to purge")
public List<String> systemUIDs = new ArrayList<>(List.of("*"));

public abstract IObsSystemDbAutoPurgePolicy getPolicy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.sensorhub.api.datastore.system.SystemFilter;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import org.sensorhub.api.database.IObsSystemDatabase;
import org.slf4j.Logger;
import org.vast.util.DateTimeFormat;
Expand Down Expand Up @@ -52,47 +53,62 @@ public class MaxAgeAutoPurgePolicy implements IObsSystemDbAutoPurgePolicy


@Override
public void trimStorage(IObsSystemDatabase db, Logger log)
public void trimStorage(IObsSystemDatabase db, Logger log, Collection<String> systemUIDs)
{
// remove all systems, datastreams, commandstreams and fois whose validity time period
// ended before (now - max age)
var oldestRecordTime = Instant.now().minusSeconds((long)config.maxRecordAge);

long numProcRemoved = db.getSystemDescStore().removeEntries(new SystemFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build());

long numFoisRemoved = db.getFoiStore().removeEntries(new FoiFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build());

long numDsRemoved = db.getDataStreamStore().removeEntries(new DataStreamFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build())
.build());

long numCsRemoved = db.getCommandStreamStore().removeEntries(new CommandStreamFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build())
.build());

// for each remaining datastream, remove all obs with a timestamp older than
// the latest result time minus the max age
long numObsRemoved = 0;
var allDataStreams = db.getDataStreamStore().selectEntries(db.getDataStreamStore().selectAllFilter()).iterator();
while (allDataStreams.hasNext())
var dataStreams = db.getDataStreamStore()
.selectEntries(new DataStreamFilter.Builder()
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs).includeMembers(true).build()).build()).iterator();
while (dataStreams.hasNext())
{
var dsEntry = allDataStreams.next();
var dsEntry = dataStreams.next();
var dsID = dsEntry.getKey().getInternalID();
var resultTimeRange = dsEntry.getValue().getResultTimeRange();

Expand All @@ -109,10 +125,13 @@ public void trimStorage(IObsSystemDatabase db, Logger log)
// for each remaining command stream, remove all commands and status with a timestamp older than
// the latest issue time minus the max age
long numCmdRemoved = 0;
var allCmdStreams = db.getCommandStreamStore().selectEntries(db.getCommandStreamStore().selectAllFilter()).iterator();
while (allCmdStreams.hasNext())
var cmdStreams = db.getCommandStreamStore().selectEntries(
new CommandStreamFilter.Builder()
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs).includeMembers(true).build()).build()).iterator();
while (cmdStreams.hasNext())
{
var dsEntry = allCmdStreams.next();
var dsEntry = cmdStreams.next();
var dsID = dsEntry.getKey().getInternalID();
var issueTimeRange = dsEntry.getValue().getIssueTimeRange();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,27 @@ protected void doStart() throws SensorHubException
{
throw new DataStoreException("Cannot instantiate underlying database " + config.dbConfig.moduleClass, e);
}

if(!config.autoPurgeConfig.isEmpty())
autoPurgeTimer = new Timer();

// start auto-purge timer thread if policy is specified and enabled
if (config.autoPurgeConfig != null && config.autoPurgeConfig.enabled)
for(var autoPurgeConfig : config.autoPurgeConfig)
{
final IObsSystemDbAutoPurgePolicy policy = config.autoPurgeConfig.getPolicy();
autoPurgeTimer = new Timer();
TimerTask task = new TimerTask() {
public void run()
{
if (!db.isReadOnly())
policy.trimStorage(db, logger);
}
};

autoPurgeTimer.schedule(task, 0, (long)(config.autoPurgeConfig.purgePeriod*1000));
if (autoPurgeConfig != null && autoPurgeConfig.enabled)
{
var uids = Collections.unmodifiableCollection(autoPurgeConfig.systemUIDs);
final IObsSystemDbAutoPurgePolicy policy = autoPurgeConfig.getPolicy();
TimerTask task = new TimerTask() {
public void run()
{
if (!db.isReadOnly())
policy.trimStorage(db, logger, uids);
}
};

autoPurgeTimer.schedule(task, 0, (long)(autoPurgeConfig.purgePeriod*1000));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.sensorhub.impl.database.system;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.sensorhub.api.config.DisplayInfo;
Expand Down Expand Up @@ -46,7 +48,7 @@ public class SystemDriverDatabaseConfig extends DatabaseConfig


@DisplayInfo(label="Automatic Purge Policy", desc="Policy for automatically purging historical data")
public HistoricalObsAutoPurgeConfig autoPurgeConfig;
public List<HistoricalObsAutoPurgeConfig> autoPurgeConfig = new ArrayList<>();


@DisplayInfo(desc="Minimum period between database commits (in ms)")
Expand Down
Loading