Skip to content

Commit

Permalink
Delete resources in the sink DB and FHIR server (#1183)
Browse files Browse the repository at this point in the history
* Delete resources in the sink DB and FHIR server

* Review comments
  • Loading branch information
chandrashekar-s authored Oct 24, 2024
1 parent 062cc3d commit 90fd8b7
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void writeResource(HapiRowDescriptor element)
String jsonResource = element.jsonResource();
long startTime = System.currentTimeMillis();
Resource resource = null;
if (jsonResource == null || jsonResource.isBlank()) {
boolean isResourceDeleted = jsonResource == null || jsonResource.isBlank();
if (isResourceDeleted) {
// The jsonResource field will be empty in case of deleted records and are ignored during
// the initial batch run
if (!processDeletedRecords) {
Expand Down Expand Up @@ -149,15 +150,19 @@ public void writeResource(HapiRowDescriptor element)
}
if (!sinkPath.isEmpty()) {
startTime = System.currentTimeMillis();
// TODO : Remove the deleted resources from the sink fhir store
// https://github.com/google/fhir-data-pipes/issues/588
fhirStoreUtil.uploadResource(resource);
if (isResourceDeleted) {
fhirStoreUtil.deleteResourceById(resourceType, resource.getId());
} else {
fhirStoreUtil.uploadResource(resource);
}
totalPushTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
}
if (sinkDbConfig != null) {
// TODO : Remove the deleted resources from the sink database
// https://github.com/google/fhir-data-pipes/issues/588
jdbcWriter.writeResource(resource);
if (isResourceDeleted) {
jdbcWriter.deleteResourceById(resourceType, resource.getId());
} else {
jdbcWriter.writeResource(resource);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public FetchRowsJdbcIo(
+ " FROM hfj_resource res JOIN"
+ " hfj_res_ver ver ON res.res_id = ver.res_id AND res.res_ver = ver.res_ver "
+ " LEFT JOIN hfj_forced_id hfi ON res.res_id = hfi.resource_pid "
+ " WHERE res.res_type = ? AND res.res_id % ? = ? AND"
+ " ver.res_encoding != 'DEL'");
+ " WHERE res.res_type = ? AND res.res_id % ? = ?");
// TODO do date sanity-checking on `since` (note this is partly done by HAPI client call).
if (since != null && !since.isEmpty()) {
builder.append(" AND res.res_updated > '").append(since).append("'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ static void createTables(FhirEtlOptions options)
}
}

// TODO expose this such that we can properly handle deleted FHIR resources in the pipeline.
private static void deleteOldViewRows(DataSource dataSource, String tableName, String resId)
private static void deleteRowsById(DataSource dataSource, String tableName, String resId)
throws SQLException {
String sql = String.format("DELETE FROM %s WHERE %s=? ;", tableName, ID_COLUMN);
try (Connection connection = dataSource.getConnection();
Expand All @@ -192,6 +191,27 @@ private static void deleteOldViewRows(DataSource dataSource, String tableName, S
}
}

/**
* Deletes a resource based on resourceType and id
*
* @param resourceType the type of resource to be deleted
* @param id the id of the resource to be deleted
* @throws SQLException
*/
public void deleteResourceById(String resourceType, String id) throws SQLException {
if (viewManager == null) {
deleteRowsById(jdbcDataSource, resourceType, id);
} else {
ImmutableList<ViewDefinition> views = viewManager.getViewsForType(resourceType);
for (ViewDefinition vDef : views) {
if (Strings.isNullOrEmpty(vDef.getName())) {
throw new SQLException("Field `name` in ViewDefinition is not defined.");
}
deleteRowsById(jdbcDataSource, vDef.getName(), id);
}
}
}

public void writeResource(Resource resource) throws SQLException, ViewApplicationException {
// TODO merge deletions and insertions into atomic transactions.
if (viewManager == null) {
Expand Down Expand Up @@ -220,13 +240,12 @@ public void writeResource(Resource resource) throws SQLException, ViewApplicatio
ViewApplicator applicator = new ViewApplicator(vDef);
RowList rowList = applicator.apply(resource);
// We should first delete old rows produced from the same resource in a previous run:
deleteOldViewRows(
deleteRowsById(
jdbcDataSource, vDef.getName(), ViewApplicator.getIdString(resource.getIdElement()));
StringBuilder builder = new StringBuilder("INSERT INTO ");
builder.append(vDef.getName()).append(" (");
builder.append(String.join(",", rowList.getColumnInfos().keySet()));
builder.append(") VALUES(");
// TODO handle deleted resources: https://github.com/google/fhir-data-pipes/issues/588
builder.append(
String.join(
",",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,15 @@ public void testWriteResource() throws SQLException, ViewApplicationException {
assertThat(indexCaptor.getAllValues().get(2), equalTo(3));
assertThat(idCaptor.getAllValues().get(2), equalTo("my-patient-id"));
}

@Test
public void testDeleteResource() throws SQLException {
JdbcResourceWriter jdbcResourceWriter = new JdbcResourceWriter(dataSourceMock, "", fhirContext);
String resourceType = "Patient";
String id = "123456";
jdbcResourceWriter.deleteResourceById(resourceType, id);
verify(statementMock, times(1)).setString(indexCaptor.capture(), idCaptor.capture());
assertThat(indexCaptor.getAllValues().get(0), equalTo(1));
assertThat(idCaptor.getAllValues().get(0), equalTo("123456"));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,6 +92,25 @@ public MethodOutcome uploadResource(Resource resource) {
return updateFhirResource(sinkUrl, resource, interceptors);
}

/**
* Deletes a resource using the given resourceType and id
*
* @param resourceType the type of resource to be deleted
* @param id the id of the resource to be deleted
* @return the output result of delete operation
*/
public MethodOutcome deleteResourceById(String resourceType, String id) {
Collection<IClientInterceptor> interceptors = Collections.emptyList();
if (!isNullOrEmpty(sinkUsername) && !isNullOrEmpty(sinkPassword)) {
interceptors = Collections.singleton(new BasicAuthInterceptor(sinkUsername, sinkPassword));
}
IGenericClient client = createGenericClient(sinkUrl, interceptors);
// Initialize the client, which will be used to interact with the service.
MethodOutcome outcome = client.delete().resourceById(resourceType, id).execute();
log.debug("FHIR resource deleted at" + sinkUrl + "? " + outcome.getCreated());
return outcome;
}

public Collection<MethodOutcome> uploadBundle(Bundle bundle) {
Collection<IClientInterceptor> interceptors = Collections.emptyList();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import ca.uhn.fhir.rest.api.MethodOutcome;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import ca.uhn.fhir.rest.gclient.IDeleteTyped;
import ca.uhn.fhir.rest.gclient.IUpdateTyped;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -50,6 +51,8 @@ public class FhirStoreUtilTest {

@Mock IUpdateTyped iexec;

@Mock IDeleteTyped iDeleteTyped;

private FhirStoreUtil fhirStoreUtil;

private Patient patient;
Expand Down Expand Up @@ -119,4 +122,16 @@ public void testUploadBundle() {
assertThat(result, not(Matchers.empty()));
assertThat(result.iterator().next().getCreated(), equalTo(true));
}

@Test
public void testDeleteResource() {
String resourceType = "Patient";
String id = "patient-id";
when(client.delete().resourceById(resourceType, id)).thenReturn(iDeleteTyped);
MethodOutcome outcome = new MethodOutcome();
outcome.setCreated(true);
doReturn(outcome).when(iDeleteTyped).execute();
MethodOutcome result = fhirStoreUtil.deleteResourceById(resourceType, id);
assertThat(result.getCreated(), equalTo(true));
}
}

0 comments on commit 90fd8b7

Please sign in to comment.