Skip to content

Commit

Permalink
database constraint trigger to ensure resource unique criteria
Browse files Browse the repository at this point in the history
* Unique constraints implemented as constraint trigger run after insert
* Constraint trigger functions use postgres advisory transaction locks
to ensure uniqueness checks are not executed in parallel
* Transaction isolation level of insert/update operations changed from
repeatable read to read committed, enabling dirty reads needed to allow
constraint triggers to see inserts/updates executed by parallel running
transactions
* New integration test to validate parallel create operations via
transaction and batch bundles as well as direct POSTs
  • Loading branch information
hhund committed Nov 19, 2024
1 parent b8923ea commit 0cbf256
Show file tree
Hide file tree
Showing 19 changed files with 1,760 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.hl7.fhir.r4.model.IdType;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dev.dsf.fhir.event.EventHandler;
import dev.dsf.fhir.help.ExceptionHandler;
import dev.dsf.fhir.help.ResponseGenerator;
import dev.dsf.fhir.validation.SnapshotGenerator;
import jakarta.ws.rs.WebApplicationException;

Expand All @@ -31,15 +34,18 @@ public class BatchCommandList extends AbstractCommandList implements CommandList
private final ValidationHelper validationHelper;
private final SnapshotGenerator snapshotGenerator;
private final EventHandler eventHandler;
private final ResponseGenerator responseGenerator;

public BatchCommandList(DataSource dataSource, ExceptionHandler exceptionHandler, List<? extends Command> commands,
ValidationHelper validationHelper, SnapshotGenerator snapshotGenerator, EventHandler eventHandler)
ValidationHelper validationHelper, SnapshotGenerator snapshotGenerator, EventHandler eventHandler,
ResponseGenerator responseGenerator)
{
super(dataSource, exceptionHandler, commands);

this.validationHelper = validationHelper;
this.snapshotGenerator = snapshotGenerator;
this.eventHandler = eventHandler;
this.responseGenerator = responseGenerator;
}

@Override
Expand All @@ -50,23 +56,15 @@ public Bundle execute() throws WebApplicationException
boolean initialReadOnly = connection.isReadOnly();
boolean initialAutoCommit = connection.getAutoCommit();
int initialTransactionIsolationLevel = connection.getTransactionIsolation();
logger.debug(
"Running batch with DB connection setting: read-only {}, auto-commit {}, transaction-isolation-level {}",
initialReadOnly, initialAutoCommit,
getTransactionIsolationLevelString(initialTransactionIsolationLevel));

Map<Integer, Exception> caughtExceptions = new HashMap<>((int) (commands.size() / 0.75) + 1);
Map<String, IdType> idTranslationTable = new HashMap<>();

if (hasModifyingCommands)
{
logger.debug(
"Elevating DB connection setting to: read-only {}, auto-commit {}, transaction-isolation-level {}",
false, false, getTransactionIsolationLevelString(Connection.TRANSACTION_REPEATABLE_READ));

connection.setReadOnly(false);
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
}

commands.forEach(preExecute(idTranslationTable, connection, caughtExceptions));
Expand All @@ -75,11 +73,6 @@ public Bundle execute() throws WebApplicationException

if (hasModifyingCommands)
{
logger.debug(
"Reseting DB connection setting to: read-only {}, auto-commit {}, transaction-isolation-level {}",
initialReadOnly, initialAutoCommit,
getTransactionIsolationLevelString(initialTransactionIsolationLevel));

connection.setReadOnly(initialReadOnly);
connection.setAutoCommit(initialAutoCommit);
connection.setTransactionIsolation(initialTransactionIsolationLevel);
Expand Down Expand Up @@ -110,20 +103,6 @@ public Bundle execute() throws WebApplicationException
}
}

private String getTransactionIsolationLevelString(int level)
{
return switch (level)
{
case Connection.TRANSACTION_NONE -> "NONE";
case Connection.TRANSACTION_READ_UNCOMMITTED -> "READ_UNCOMMITTED";
case Connection.TRANSACTION_READ_COMMITTED -> "READ_COMMITTED";
case Connection.TRANSACTION_REPEATABLE_READ -> "REPEATABLE_READ";
case Connection.TRANSACTION_SERIALIZABLE -> "SERIALIZABLE";

default -> "?";
};
}

private Consumer<Command> preExecute(Map<String, IdType> idTranslationTable, Connection connection,
Map<Integer, Exception> caughtExceptions)
{
Expand Down Expand Up @@ -188,7 +167,12 @@ private Consumer<Command> execute(Map<String, IdType> idTranslationTable, Connec
logger.warn("Error while executing command {}, rolling back transaction for entry at index {}: {} - {}",
command.getClass().getName(), command.getIndex(), e.getClass().getName(), e.getMessage());

caughtExceptions.put(command.getIndex(), e);
if (e instanceof PSQLException s && PSQLState.UNIQUE_VIOLATION.getState().equals(s.getSQLState()))
caughtExceptions.put(command.getIndex(),
new WebApplicationException(responseGenerator.dupicateResourceExists()));
else
caughtExceptions.put(command.getIndex(), e);


try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,10 @@ public CommandList createCommands(Bundle bundle, Identity identity, PreferReturn
return switch (bundle.getType())
{
case BATCH -> new BatchCommandList(dataSource, exceptionHandler, commands, validationHelper,
snapshotGenerator, eventHandler);
snapshotGenerator, eventHandler, responseGenerator);

case TRANSACTION ->
new TransactionCommandList(dataSource, exceptionHandler, commands, transactionResourcesFactory);
case TRANSACTION -> new TransactionCommandList(dataSource, exceptionHandler, commands,
transactionResourcesFactory, responseGenerator);

default -> throw new BadBundleException("Unsupported bundle type " + bundle.getType());
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.dsf.fhir.dao.command;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -16,10 +17,13 @@
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Bundle.BundleType;
import org.hl7.fhir.r4.model.IdType;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import dev.dsf.fhir.help.ExceptionHandler;
import dev.dsf.fhir.help.ResponseGenerator;
import dev.dsf.fhir.validation.SnapshotGenerator;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response;
Expand All @@ -30,13 +34,16 @@ public class TransactionCommandList extends AbstractCommandList implements Comma
private static final Logger logger = LoggerFactory.getLogger(TransactionCommandList.class);

private final Function<Connection, TransactionResources> transactionResourceFactory;
private final ResponseGenerator responseGenerator;

public TransactionCommandList(DataSource dataSource, ExceptionHandler exceptionHandler,
List<? extends Command> commands, Function<Connection, TransactionResources> transactionResourceFactory)
List<? extends Command> commands, Function<Connection, TransactionResources> transactionResourceFactory,
ResponseGenerator responseGenerator)
{
super(dataSource, exceptionHandler, commands);

this.transactionResourceFactory = transactionResourceFactory;
this.responseGenerator = responseGenerator;

Collections.sort(this.commands,
Comparator.comparing(Command::getTransactionPriority).thenComparing(Command::getIndex));
Expand All @@ -55,7 +62,7 @@ public Bundle execute() throws WebApplicationException
{
connection.setReadOnly(false);
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
}

TransactionResources transactionResources = transactionResourceFactory.apply(connection);
Expand Down Expand Up @@ -131,7 +138,11 @@ public Bundle execute() throws WebApplicationException
e1.getMessage());
}

throw e;
if (e instanceof PSQLException s
&& PSQLState.UNIQUE_VIOLATION.getState().equals(s.getSQLState()))
throw new WebApplicationException(responseGenerator.dupicateResourceExists());
else
throw e;
}
}

Expand Down Expand Up @@ -177,8 +188,20 @@ public Bundle execute() throws WebApplicationException

if (hasModifyingCommands)
{
logger.debug("Committing DB transaction");
connection.commit();
try
{
logger.debug("Committing DB transaction");
connection.commit();
}
catch (SQLException e)
{
connection.rollback();

if (PSQLState.UNIQUE_VIOLATION.getState().equals(e.getSQLState()))
throw new WebApplicationException(responseGenerator.dupicateResourceExists());
else
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public Connection newReadWriteTransaction() throws SQLException
{
Connection connection = dataSource.getConnection();
connection.setReadOnly(false);
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setAutoCommit(false);

return connection;
Expand Down Expand Up @@ -565,12 +565,8 @@ public final R update(R resource, Long expectedVersion)
Objects.requireNonNull(resource, "resource");
// expectedVersion may be null

try (Connection connection = dataSource.getConnection())
try (Connection connection = newReadWriteTransaction())
{
connection.setReadOnly(false);
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
connection.setAutoCommit(false);

try
{
R updatedResource = updateWithTransaction(connection, resource, expectedVersion);
Expand All @@ -596,9 +592,8 @@ public R updateWithTransaction(Connection connection, R resource, Long expectedV
// expectedVersion may be null
if (connection.isReadOnly())
throw new IllegalArgumentException("Connection is read-only");
if (connection.getTransactionIsolation() != Connection.TRANSACTION_REPEATABLE_READ
&& connection.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE)
throw new IllegalArgumentException("Connection transaction isolation not REPEATABLE_READ or SERIALIZABLE");
if (connection.getTransactionIsolation() != Connection.TRANSACTION_READ_COMMITTED)
throw new IllegalArgumentException("Connection transaction isolation not READ_COMMITTED");
if (connection.getAutoCommit())
throw new IllegalArgumentException("Connection transaction is in auto commit mode");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,23 @@ public Response multipleExists(String resourceTypeName, String ifNoneExistsHeade
return Response.status(Status.PRECONDITION_FAILED).entity(outcome).build();
}

public Response dupicateResourceExists()
{
logger.warn("Duplicate resources exists");

OperationOutcome outcome = createOutcome(IssueSeverity.ERROR, IssueType.DUPLICATE, "Duplicate resources exist");
return Response.status(Status.FORBIDDEN).entity(outcome).build();
}

public Response dupicateResourceExists(String resourceTypeName)
{
logger.warn("Duplicate {} resources exists", resourceTypeName);

OperationOutcome outcome = createOutcome(IssueSeverity.ERROR, IssueType.DUPLICATE,
"Duplicate " + resourceTypeName + " resources exist");
return Response.status(Status.FORBIDDEN).entity(outcome).build();
}

public Response badIfNoneExistHeaderValue(String logMessageReason, String ifNoneExistsHeaderValue)
{
logger.warn("Bad If-None-Exist header value: {}", logMessageReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hl7.fhir.r4.model.IdType;
import org.hl7.fhir.r4.model.OperationOutcome;
import org.hl7.fhir.r4.model.Resource;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
Expand Down Expand Up @@ -162,7 +163,16 @@ public Response create(R resource, UriInfo uri, HttpHeaders headers)

return created;
}
catch (SQLException | WebApplicationException e)
catch (SQLException e)
{
connection.rollback();

if (PSQLState.UNIQUE_VIOLATION.getState().equals(e.getSQLState()))
throw new WebApplicationException(responseGenerator.dupicateResourceExists(resourceTypeName));
else
throw e;
}
catch (WebApplicationException e)
{
connection.rollback();
throw e;
Expand Down Expand Up @@ -513,15 +523,24 @@ public Response update(String id, R resource, UriInfo uri, HttpHeaders headers)
{
resolveLogicalReferences(resource, connection);

R updated = dao.update(resource, ifMatch.orElse(null));
R updated = dao.updateWithTransaction(connection, resource, ifMatch.orElse(null));

checkReferences(resource, connection, ref -> checkReferenceAfterUpdate(updated, ref));

connection.commit();

return updated;
}
catch (SQLException | WebApplicationException e)
catch (SQLException e)
{
if (PSQLState.UNIQUE_VIOLATION.getState().equals(e.getSQLState()))
throw new WebApplicationException(
responseGenerator.dupicateResourceExists(resourceTypeName));

connection.rollback();
throw e;
}
catch (WebApplicationException e)
{
connection.rollback();
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@

<include file="db/db.read_access.changelog-1.6.0.xml" />

<include file="db/db.constraint_trigger.changelog-1.6.1.xml" />

</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">

<property name="json" value="JSONB" dbms="postgresql" />
<property name="json" value="varchar(5000)" dbms="h2" />

<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_activity_definitions_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/activity_definitions_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_code_systems_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/code_systems_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_endpoints_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/endpoints_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_naming_systems_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/naming_systems_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_organizations_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/organizations_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_organization_affiliations_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/organization_affiliations_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_structure_definitions_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/structure_definitions_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_subscriptions_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/subscriptions_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_value_sets_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/value_sets_unique.sql" splitStatements="false" />
</changeSet>

<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1">
<sql dbms="postgresql">
CREATE CONSTRAINT TRIGGER activity_definitions_unique AFTER INSERT ON activity_definitions FOR EACH ROW EXECUTE PROCEDURE activity_definitions_unique();
CREATE CONSTRAINT TRIGGER code_systems_unique AFTER INSERT ON code_systems FOR EACH ROW EXECUTE PROCEDURE code_systems_unique();
CREATE CONSTRAINT TRIGGER endpoints_unique AFTER INSERT ON endpoints FOR EACH ROW EXECUTE PROCEDURE endpoints_unique();
CREATE CONSTRAINT TRIGGER naming_systems_unique AFTER INSERT ON naming_systems FOR EACH ROW EXECUTE PROCEDURE naming_systems_unique();
CREATE CONSTRAINT TRIGGER organizations_unique AFTER INSERT ON organizations FOR EACH ROW EXECUTE PROCEDURE organizations_unique();
CREATE CONSTRAINT TRIGGER organization_affiliations_unique AFTER INSERT ON organization_affiliations FOR EACH ROW EXECUTE PROCEDURE organization_affiliations_unique();
CREATE CONSTRAINT TRIGGER structure_definitions_unique AFTER INSERT ON structure_definitions FOR EACH ROW EXECUTE PROCEDURE structure_definitions_unique();
CREATE CONSTRAINT TRIGGER subscriptions_unique AFTER INSERT ON subscriptions FOR EACH ROW EXECUTE PROCEDURE subscriptions_unique();
CREATE CONSTRAINT TRIGGER value_sets_unique AFTER INSERT ON value_sets FOR EACH ROW EXECUTE PROCEDURE value_sets_unique();
</sql>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE OR REPLACE FUNCTION activity_definitions_unique() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_advisory_xact_lock(hashtext((NEW.activity_definition->>'url') || (NEW.activity_definition->>'version')));
IF EXISTS (SELECT 1 FROM current_activity_definitions WHERE activity_definition_id <> NEW.activity_definition_id
AND activity_definition->>'url' = NEW.activity_definition->>'url'
AND activity_definition->>'version' = NEW.activity_definition->>'version') THEN
RAISE EXCEPTION 'Conflict: Not inserting ActivityDefinition with url % and version %, resource already exists with given url and version',
NEW.activity_definition->>'url', NEW.activity_definition->>'version' USING ERRCODE = 'unique_violation';
ELSE
RETURN NEW;
END IF;
END;
$$ LANGUAGE PLPGSQL
Loading

0 comments on commit 0cbf256

Please sign in to comment.