Skip to content

Commit

Permalink
unique draft Task rules, db constraint, duplicate delete query and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hhund committed Dec 16, 2024
1 parent b434470 commit 56d6943
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
Expand All @@ -15,6 +16,7 @@
import org.hl7.fhir.r4.model.ActivityDefinition;
import org.hl7.fhir.r4.model.CanonicalType;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Identifier;
import org.hl7.fhir.r4.model.Organization;
import org.hl7.fhir.r4.model.Resource;
import org.hl7.fhir.r4.model.StringType;
Expand All @@ -34,6 +36,10 @@
import dev.dsf.fhir.dao.TaskDao;
import dev.dsf.fhir.dao.provider.DaoProvider;
import dev.dsf.fhir.help.ParameterConverter;
import dev.dsf.fhir.search.PageAndCount;
import dev.dsf.fhir.search.PartialResult;
import dev.dsf.fhir.search.SearchQuery;
import dev.dsf.fhir.search.SearchQueryParameterError;
import dev.dsf.fhir.service.ReferenceResolver;
import dev.dsf.fhir.service.ResourceReference;

Expand Down Expand Up @@ -88,10 +94,20 @@ public Optional<String> reasonCreateAllowed(Connection connection, Identity iden
Optional<String> errors = draftTaskOk(connection, identity, newResource);
if (errors.isEmpty())
{
logger.info("Create of Task authorized for local organization identity '{}', Task.status draft",
identity.getName());
if (!draftTaskExists(connection, newResource))
{
logger.info(
"Create of Task authorized for local organization identity '{}', Task.status draft",
identity.getName());

return Optional.of("Local identity, Task.status draft");
}
else
{
logger.warn("Create of Task unauthorized, unique resource already exists");

return Optional.of("Local identity, Task.status draft");
return Optional.empty();
}
}
else
{
Expand Down Expand Up @@ -284,9 +300,16 @@ private Optional<String> draftTaskOk(Connection connection, Identity identity, T
{
List<String> errors = new ArrayList<>();

if (newResource.getIdentifier().stream().noneMatch(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem())))
if (newResource.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem()))
.count() != 1)
{
errors.add("Task.identifier[" + NAMING_SYSTEM_TASK_IDENTIFIER + "] not defined or more than once");
}
else if (newResource.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem()))
.findFirst().filter(Identifier::hasValueElement).map(Identifier::getValueElement)
.filter(StringType::hasValue).isEmpty())
{
errors.add("Task.identifier[" + NAMING_SYSTEM_TASK_IDENTIFIER + "] missing");
errors.add("Task.identifier[" + NAMING_SYSTEM_TASK_IDENTIFIER + "] value not defined");
}

if (newResource.hasRequester())
Expand Down Expand Up @@ -399,6 +422,49 @@ private Stream<String> getMessageNames(Task newResource)
.filter(s -> !s.isBlank());
}

/**
* A draft {@link Task} with identifier (system {@link #NAMING_SYSTEM_TASK_IDENTIFIER}) may not exist
*
* @param connection
* not <code>null</code>
* @param newResource
* not <code>null</code>
* @return <code>true</code> if the given draft Task is unique
*/
private boolean draftTaskExists(Connection connection, Task newResource)
{
SearchQuery<Task> query = getDao().createSearchQueryWithoutUserFilter(PageAndCount.exists())
.configureParameters(Map.of("identifier",
List.of(NAMING_SYSTEM_TASK_IDENTIFIER + "|" + getDraftTaskIdentifierValue(newResource))));

List<SearchQueryParameterError> uQp = query.getUnsupportedQueryParameters();
if (!uQp.isEmpty())
{
logger.warn("Unable to search for Task: Unsupported query parameters: {}", uQp);

throw new IllegalStateException("Unable to search for Task: Unsupported query parameters");
}

try
{
PartialResult<Task> result = getDao().searchWithTransaction(connection, query);
return result.getTotal() >= 1;
}
catch (SQLException e)
{
logger.debug("Unable to search for Task", e);
logger.warn("Unable to search for Task: {} - {}", e.getClass().getName(), e.getMessage());

throw new RuntimeException("Unable to search for Task", e);
}
}

private String getDraftTaskIdentifierValue(Task newResource)
{
return newResource.getIdentifier().stream().filter(i -> NAMING_SYSTEM_TASK_IDENTIFIER.equals(i.getSystem()))
.findFirst().map(Identifier::getValue).get();
}

private boolean taskAllowedForRequesterAndRecipient(Connection connection, Identity requester, Task newResource)
{
Optional<Identity> recipientOpt = organizationProvider.getLocalOrganizationAsIdentity();
Expand Down Expand Up @@ -549,24 +615,24 @@ public Optional<String> reasonReadAllowed(Connection connection, Identity identi

if (identity.hasDsfRole(FhirServerRole.READ))
{
if (isCurrentIdentityPartOfReferencedOrganization(connection, identity, "Task.requester",
existingResource.getRequester()))
if (identity.isLocalIdentity() && isCurrentIdentityPartOfReferencedOrganization(connection, identity,
"Task.restriction.recipient", existingResource.getRestriction().getRecipientFirstRep()))
{
logger.info(
"Read of Task/{}/_history/{} authorized for identity '{}', Task.requester reference could be resolved and current identity part of referenced organization",
"Read of Task/{}/_history/{} authorized for identity '{}', Task.restriction.recipient reference could be resolved and current identity part of referenced organization",
resourceId, resourceVersion, identity.getName());

return Optional.of("Task.requester resolved and identity part of referenced organization");
return Optional
.of("Task.restriction.recipient resolved and local identity part of referenced organization");
}
else if (identity.isLocalIdentity() && isCurrentIdentityPartOfReferencedOrganization(connection, identity,
"Task.restriction.recipient", existingResource.getRestriction().getRecipientFirstRep()))
else if (isCurrentIdentityPartOfReferencedOrganization(connection, identity, "Task.requester",
existingResource.getRequester()))
{
logger.info(
"Read of Task/{}/_history/{} authorized for identity '{}', Task.restriction.recipient reference could be resolved and current identity part of referenced organization",
"Read of Task/{}/_history/{} authorized for identity '{}', Task.requester reference could be resolved and current identity part of referenced organization",
resourceId, resourceVersion, identity.getName());

return Optional
.of("Task.restriction.recipient resolved and local identity part of referenced organization");
return Optional.of("Task.requester resolved and identity part of referenced organization");
}
else
{
Expand Down Expand Up @@ -605,11 +671,23 @@ public Optional<String> reasonUpdateAllowed(Connection connection, Identity iden
Optional<String> errors = draftTaskOk(connection, identity, newResource);
if (errors.isEmpty())
{
logger.info("Update of Task/{}/_history/{} ({} -> {}) authorized for local identity '{}'",
oldResourceId, oldResourceVersion, TaskStatus.DRAFT.toCode(), TaskStatus.DRAFT.toCode(),
identity.getName());
if (draftTaskIdentifierSame(oldResource, newResource))
{
logger.info("Update of Task/{}/_history/{} ({} -> {}) authorized for local identity '{}'",
oldResourceId, oldResourceVersion, TaskStatus.DRAFT.toCode(),
TaskStatus.DRAFT.toCode(), identity.getName());

return Optional.of("Local identity, old Task.status draft, new Task.status draft");
return Optional.of("Local identity, old Task.status draft, new Task.status draft");
}
else
{
logger.warn(
"Update of Task/{}/_history/{} ({} -> {}) unauthorized for identity '{}' - identifier modified",
oldResourceId, oldResourceVersion, TaskStatus.DRAFT.toCode(),
TaskStatus.DRAFT.toCode(), identity.getName());

return Optional.empty();
}
}
else
{
Expand Down Expand Up @@ -790,6 +868,11 @@ else if (TaskStatus.INPROGRESS.equals(oldResource.getStatus())
}
}

private boolean draftTaskIdentifierSame(Task oldResource, Task newResource)
{
return Objects.equals(getDraftTaskIdentifierValue(oldResource), getDraftTaskIdentifierValue(newResource));
}

private Optional<String> reasonNotSame(Task oldResource, Task newResource)
{
List<String> errors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_subscriptions_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/subscriptions_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_tasks_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/tasks_unique.sql" splitStatements="false" />
</changeSet>
<changeSet author="hhund" id="db.constraint_trigger.changelog-1.6.1_value_sets_unique" runOnChange="true">
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="unique_trigger_functions/value_sets_unique.sql" splitStatements="false" />
</changeSet>
Expand All @@ -44,6 +47,7 @@
CREATE CONSTRAINT TRIGGER organization_affiliations_unique AFTER INSERT ON organization_affiliations FOR EACH ROW EXECUTE PROCEDURE organization_affiliations_unique();
CREATE CONSTRAINT TRIGGER structure_definitions_unique AFTER INSERT ON structure_definitions FOR EACH ROW EXECUTE PROCEDURE structure_definitions_unique();
CREATE CONSTRAINT TRIGGER subscriptions_unique AFTER INSERT ON subscriptions FOR EACH ROW EXECUTE PROCEDURE subscriptions_unique();
CREATE CONSTRAINT TRIGGER tasks_unique AFTER INSERT ON tasks FOR EACH ROW EXECUTE PROCEDURE tasks_unique();
CREATE CONSTRAINT TRIGGER value_sets_unique AFTER INSERT ON value_sets FOR EACH ROW EXECUTE PROCEDURE value_sets_unique();
</sql>
</changeSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,24 @@
)
</sql>
</changeSet>
<changeSet author="hhund" id="db.delete_duplicate_resources.changelog-1.6.1.tasks-draft">
<sql dbms="postgresql">
DELETE FROM tasks
WHERE task->>'status' = 'draft'
AND jsonb_path_exists(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')
AND task_id IN (
SELECT task_id FROM (
SELECT
row_number() OVER (
PARTITION BY jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')
ORDER BY jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value'), (task->'meta'->>'lastUpdated')::timestamp DESC
) AS rn
, task_id
FROM current_tasks
) AS t WHERE rn > 1
)
</sql>
</changeSet>
<changeSet author="hhund" id="db.delete_duplicate_resources.changelog-1.6.1.value_sets">
<sql dbms="postgresql">
DELETE FROM value_sets
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION tasks_unique() RETURNS TRIGGER AS $$
BEGIN
IF NEW.task->>'status' = 'draft' THEN
PERFORM pg_advisory_xact_lock(hashtext(jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')::text));
IF EXISTS (SELECT 1 FROM current_tasks WHERE task_id <> NEW.task_id
AND ((
jsonb_path_exists(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')
AND
jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') @>
jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')
) OR (
jsonb_path_exists(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')
AND
jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') @>
jsonb_path_query_array(task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value')
)
)) THEN
RAISE EXCEPTION 'Conflict: Not inserting Task with identifier.value %, resource already exists with given identifier.value',
jsonb_path_query_array(NEW.task, '$.identifier[*] ? (@.system == "http://dsf.dev/sid/task-identifier").value') USING ERRCODE = 'unique_violation';
ELSE
RETURN NEW;
END IF;
ELSE
RETURN NEW;
END IF;
END;
$$ LANGUAGE PLPGSQL
Loading

0 comments on commit 56d6943

Please sign in to comment.