diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java index ef648fa78..7d11d01e4 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/PostgresClientFactory.java @@ -194,7 +194,8 @@ private static PgConnectOptions getConnectOptions(String tenantId) { .setPassword(postgresConfig.getString(PASSWORD)) .setIdleTimeout(postgresConfig.getInteger(IDLE_TIMEOUT, 60000)) .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) - .addProperty(DEFAULT_SCHEMA_PROPERTY, convertToPsqlStandard(tenantId)); + .addProperty(DEFAULT_SCHEMA_PROPERTY, convertToPsqlStandard(tenantId)) + .addProperty("application_name", "srs-pgpool"); } private static DataSource getDataSource(String tenantId) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index 2688e7208..df781f33e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -9,6 +9,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.reactivex.pgclient.PgPool; +import io.vertx.reactivex.sqlclient.SqlConnection; import io.vertx.sqlclient.Row; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.text.StrSubstitutor; @@ -117,6 +118,7 @@ import static org.jooq.impl.DSL.condition; import static org.jooq.impl.DSL.countDistinct; import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.inline; import static org.jooq.impl.DSL.max; import static org.jooq.impl.DSL.name; import static org.jooq.impl.DSL.primaryKey; @@ -198,6 +200,9 @@ public class RecordDaoImpl implements RecordDao { private final PostgresClientFactory postgresClientFactory; + @org.springframework.beans.factory.annotation.Value("${srs.record.matching.fallback-query.enable:false}") + private boolean enableFallbackQuery; + @Autowired public RecordDaoImpl(final PostgresClientFactory postgresClientFactory) { this.postgresClientFactory = postgresClientFactory; @@ -360,8 +365,11 @@ public Flowable streamMarcRecordIds(ParseLeaderResult parseLeaderResult, Pa .flatMapPublisher(conn -> conn.rxBegin() .flatMapPublisher(tx -> conn.rxPrepare(sql) .flatMapPublisher(pq -> pq.createStream(10000) - .toFlowable().map(this::toRow)) - .doAfterTerminate(tx::commit))); + .toFlowable() + .filter(row -> !enableFallbackQuery || row.getInteger(COUNT) != 0) + .switchIfEmpty(streamMarcRecordIdsWithoutTracking(conn, parseLeaderResult, parseFieldsResult, searchParameters)) + .map(this::toRow)) + .doAfterTerminate(() -> tx.commit()))); } private void appendJoin(SelectJoinStep selectJoinStep, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult) { @@ -382,6 +390,44 @@ private void appendJoin(SelectJoinStep selectJoinStep, ParseLeaderResult parseLe } } + private Flowable streamMarcRecordIdsWithoutTracking(SqlConnection conn, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult, + RecordSearchParameters searchParameters) { + return conn.rxPrepare(getAltQuery(parseLeaderResult, parseFieldsResult, searchParameters)) + .flatMapPublisher(pq -> pq.createStream(10000).toFlowable()); + } + + private String getAltQuery(ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult, RecordSearchParameters searchParameters) { + /* Building a search query */ + SelectJoinStep searchQuery = DSL.selectDistinct(RECORDS_LB.EXTERNAL_ID, inline("test").as("test_col")).from(RECORDS_LB); + appendJoinAlternative(searchQuery, parseLeaderResult, parseFieldsResult); + appendWhere(searchQuery, parseLeaderResult, parseFieldsResult, searchParameters); + if (searchParameters.getOffset() != null) { + searchQuery.offset(searchParameters.getOffset()); + } + if (searchParameters.getLimit() != null) { + searchQuery.limit(searchParameters.getLimit()); + } + /* Building a count query */ + SelectJoinStep countQuery = DSL.select(countDistinct(RECORDS_LB.EXTERNAL_ID)).from(RECORDS_LB); + appendJoinAlternative(countQuery, parseLeaderResult, parseFieldsResult); + appendWhere(countQuery, parseLeaderResult, parseFieldsResult, searchParameters); + + return DSL.select().from(searchQuery).rightJoin(countQuery).on(DSL.trueCondition()).getSQL(ParamType.INLINED); + } + + private void appendJoinAlternative(SelectJoinStep selectJoinStep, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult) { + if (parseLeaderResult.isEnabled()) { + Table marcIndexersLeader = table(name("marc_indexers_leader")); + selectJoinStep.innerJoin(marcIndexersLeader).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersLeader, name(MARC_ID)))); + } + if (parseFieldsResult.isEnabled()) { + parseFieldsResult.getFieldsToJoin().forEach(fieldToJoin -> { + Table marcIndexers = table(name("marc_indexers_" + fieldToJoin)).as("i" + fieldToJoin); + selectJoinStep.innerJoin(marcIndexers).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexers, name(MARC_ID)))); + }); + } + } + private void appendWhere(SelectJoinStep step, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult, RecordSearchParameters searchParameters) { Condition recordTypeCondition = RecordDaoUtil.filterRecordByType(searchParameters.getRecordType().value()); Condition recordStateCondition = RecordDaoUtil.filterRecordByDeleted(searchParameters.isDeleted()); diff --git a/mod-source-record-storage-server/src/test/java/org/folio/dao/RecordDaoImplTest.java b/mod-source-record-storage-server/src/test/java/org/folio/dao/RecordDaoImplTest.java new file mode 100644 index 000000000..4280454c8 --- /dev/null +++ b/mod-source-record-storage-server/src/test/java/org/folio/dao/RecordDaoImplTest.java @@ -0,0 +1,95 @@ +package org.folio.dao; + +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.folio.TestMocks; +import org.folio.dao.util.SnapshotDaoUtil; +import org.folio.rest.jaxrs.model.MarcRecordSearchRequest; +import org.folio.rest.jaxrs.model.Record; +import org.folio.rest.jaxrs.model.Snapshot; +import org.folio.services.AbstractLBServiceTest; +import org.folio.services.RecordSearchParameters; +import org.folio.services.RecordService; +import org.folio.services.RecordServiceImpl; +import org.folio.services.TenantDataProviderImpl; +import org.folio.services.util.parser.ParseFieldsResult; +import org.folio.services.util.parser.ParseLeaderResult; +import org.folio.services.util.parser.SearchExpressionParser; +import org.folio.verticle.MarcIndexersVersionDeletionVerticle; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; + +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.folio.rest.jaxrs.model.Record.State.ACTUAL; + +@RunWith(VertxUnitRunner.class) +public class RecordDaoImplTest extends AbstractLBServiceTest { + + private RecordDao recordDao; + private Record record; + + private RecordService recordService; + + @Before + public void setUp(TestContext context) { + Async async = context.async(); + recordDao = new RecordDaoImpl(postgresClientFactory); +// recordService = new RecordServiceImpl(recordDao); + + String recordId = UUID.randomUUID().toString(); + Snapshot snapshot = TestMocks.getSnapshot(0); + + this.record = new Record() + .withId(recordId) + .withState(ACTUAL) + .withMatchedId(recordId) + .withSnapshotId(snapshot.getJobExecutionId()) + .withGeneration(0) + .withRecordType(Record.RecordType.MARC_BIB) + .withRawRecord(TestMocks.getRecord(0).getRawRecord().withId(recordId)) + .withParsedRecord(TestMocks.getRecord(0).getParsedRecord().withId(recordId)); + + SnapshotDaoUtil.save(postgresClientFactory.getQueryExecutor(TENANT_ID), snapshot) + .compose(savedSnapshot -> recordService.saveRecord(record, TENANT_ID)) + .onComplete(save -> { + if (save.failed()) { + context.fail(save.cause()); + } + async.complete(); + }); + } + + @After + public void cleanUp(TestContext context) { + Async async = context.async(); + SnapshotDaoUtil.deleteAll(postgresClientFactory.getQueryExecutor(TENANT_ID)).onComplete(delete -> { + if (delete.failed()) { + context.fail(delete.cause()); + } + async.complete(); + }); + } + + public void t(TestContext context) { +// ParseLeaderResult parseLeaderResult = new ParseLeaderResult(); +// ParseFieldsResult parseFieldsResult = new ParseFieldsResult() +// .withFieldsToJoin(Set.of("001")) +// .withBindingParams(List.of("393893")) +// .withWhereExpression("\"i001\".\"value\" = ?") +// .enable(); +// RecordSearchParameters recordSearchParameters = new RecordSearchParameters(); + + MarcRecordSearchRequest searchRequest = new MarcRecordSearchRequest() + .withFieldsSearchExpression("001.value = '393893'"); + RecordSearchParameters searchParams = RecordSearchParameters.from(searchRequest); + ParseLeaderResult parseLeaderResult = SearchExpressionParser.parseLeaderSearchExpression(searchParams.getLeaderSearchExpression()); + ParseFieldsResult parseFieldsResult = SearchExpressionParser.parseFieldsSearchExpression(searchParams.getFieldsSearchExpression()); + recordDao.streamMarcRecordIds(parseLeaderResult, parseFieldsResult, searchParams, TENANT_ID); + } + +} diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/AbstractRestVerticleTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/AbstractRestVerticleTest.java index 3bed93581..8068ccac0 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/AbstractRestVerticleTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/impl/AbstractRestVerticleTest.java @@ -52,7 +52,7 @@ public abstract class AbstractRestVerticleTest { private static String useExternalDatabase; private static int okapiPort; - static final String TENANT_ID = "test"; + static final String TENANT_ID = "test2"; static final String SOURCE_STORAGE_RECORDS_PATH = "/source-storage/records"; static final String SOURCE_STORAGE_SNAPSHOTS_PATH = "/source-storage/snapshots"; diff --git a/mod-source-record-storage-server/src/test/java/org/folio/rest/util/QueryParamUtilTest.java b/mod-source-record-storage-server/src/test/java/org/folio/rest/util/QueryParamUtilTest.java index d3350f1c9..ebe2463ae 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/rest/util/QueryParamUtilTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/rest/util/QueryParamUtilTest.java @@ -4,6 +4,8 @@ import javax.ws.rs.BadRequestException; +import io.reactivex.Flowable; +import io.reactivex.disposables.Disposable; import org.folio.dao.util.IdType; import org.folio.dao.util.RecordType; import org.folio.rest.jooq.enums.RecordState; @@ -16,6 +18,18 @@ public class QueryParamUtilTest { @Test public void shouldReturnRecordExternalIdType() { + Flowable source = Flowable.just(0, 0, 0); + Flowable fallback = Flowable.just(1, 2, 3); + + Disposable subscribe = source + .switchMap(integer -> fallback) +// .switchIfEmpty(fallback) +// .filter(n -> n != null) + .subscribe(System.out::println); + + System.out.println(subscribe); + + assertEquals(IdType.RECORD, QueryParamUtil.toExternalIdType("RECORD")); }