Skip to content

Commit

Permalink
wip 671
Browse files Browse the repository at this point in the history
  • Loading branch information
RuslanLavrov committed Aug 2, 2023
1 parent b4aa386 commit 58564e8
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ private static PgConnectOptions getConnectOptions(String tenantId) {
.setPassword(postgresConfig.getString(PASSWORD))
.setIdleTimeout(postgresConfig.getInteger(IDLE_TIMEOUT, 60000))
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
.addProperty(DEFAULT_SCHEMA_PROPERTY, convertToPsqlStandard(tenantId));
.addProperty(DEFAULT_SCHEMA_PROPERTY, convertToPsqlStandard(tenantId))
.addProperty("application_name", "srs-pgpool");
}

private static DataSource getDataSource(String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.reactivex.pgclient.PgPool;
import io.vertx.reactivex.sqlclient.SqlConnection;
import io.vertx.sqlclient.Row;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.text.StrSubstitutor;
Expand Down Expand Up @@ -117,6 +118,7 @@
import static org.jooq.impl.DSL.condition;
import static org.jooq.impl.DSL.countDistinct;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.inline;
import static org.jooq.impl.DSL.max;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.primaryKey;
Expand Down Expand Up @@ -198,6 +200,9 @@ public class RecordDaoImpl implements RecordDao {

private final PostgresClientFactory postgresClientFactory;

@org.springframework.beans.factory.annotation.Value("${srs.record.matching.fallback-query.enable:false}")
private boolean enableFallbackQuery;

@Autowired
public RecordDaoImpl(final PostgresClientFactory postgresClientFactory) {
this.postgresClientFactory = postgresClientFactory;
Expand Down Expand Up @@ -360,8 +365,11 @@ public Flowable<Row> streamMarcRecordIds(ParseLeaderResult parseLeaderResult, Pa
.flatMapPublisher(conn -> conn.rxBegin()
.flatMapPublisher(tx -> conn.rxPrepare(sql)
.flatMapPublisher(pq -> pq.createStream(10000)
.toFlowable().map(this::toRow))
.doAfterTerminate(tx::commit)));
.toFlowable()
.filter(row -> !enableFallbackQuery || row.getInteger(COUNT) != 0)
.switchIfEmpty(streamMarcRecordIdsWithoutTracking(conn, parseLeaderResult, parseFieldsResult, searchParameters))
.map(this::toRow))
.doAfterTerminate(() -> tx.commit())));
}

private void appendJoin(SelectJoinStep selectJoinStep, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult) {
Expand All @@ -382,6 +390,44 @@ private void appendJoin(SelectJoinStep selectJoinStep, ParseLeaderResult parseLe
}
}

private Flowable<io.vertx.reactivex.sqlclient.Row> streamMarcRecordIdsWithoutTracking(SqlConnection conn, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult,
RecordSearchParameters searchParameters) {
return conn.rxPrepare(getAltQuery(parseLeaderResult, parseFieldsResult, searchParameters))
.flatMapPublisher(pq -> pq.createStream(10000).toFlowable());
}

private String getAltQuery(ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult, RecordSearchParameters searchParameters) {
/* Building a search query */
SelectJoinStep searchQuery = DSL.selectDistinct(RECORDS_LB.EXTERNAL_ID, inline("test").as("test_col")).from(RECORDS_LB);
appendJoinAlternative(searchQuery, parseLeaderResult, parseFieldsResult);
appendWhere(searchQuery, parseLeaderResult, parseFieldsResult, searchParameters);
if (searchParameters.getOffset() != null) {
searchQuery.offset(searchParameters.getOffset());
}
if (searchParameters.getLimit() != null) {
searchQuery.limit(searchParameters.getLimit());
}
/* Building a count query */
SelectJoinStep countQuery = DSL.select(countDistinct(RECORDS_LB.EXTERNAL_ID)).from(RECORDS_LB);
appendJoinAlternative(countQuery, parseLeaderResult, parseFieldsResult);
appendWhere(countQuery, parseLeaderResult, parseFieldsResult, searchParameters);

return DSL.select().from(searchQuery).rightJoin(countQuery).on(DSL.trueCondition()).getSQL(ParamType.INLINED);
}

private void appendJoinAlternative(SelectJoinStep selectJoinStep, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult) {
if (parseLeaderResult.isEnabled()) {
Table marcIndexersLeader = table(name("marc_indexers_leader"));
selectJoinStep.innerJoin(marcIndexersLeader).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersLeader, name(MARC_ID))));
}
if (parseFieldsResult.isEnabled()) {
parseFieldsResult.getFieldsToJoin().forEach(fieldToJoin -> {
Table marcIndexers = table(name("marc_indexers_" + fieldToJoin)).as("i" + fieldToJoin);
selectJoinStep.innerJoin(marcIndexers).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexers, name(MARC_ID))));
});
}
}

private void appendWhere(SelectJoinStep step, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult, RecordSearchParameters searchParameters) {
Condition recordTypeCondition = RecordDaoUtil.filterRecordByType(searchParameters.getRecordType().value());
Condition recordStateCondition = RecordDaoUtil.filterRecordByDeleted(searchParameters.isDeleted());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.folio.dao;

import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.folio.TestMocks;
import org.folio.dao.util.SnapshotDaoUtil;
import org.folio.rest.jaxrs.model.MarcRecordSearchRequest;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.Snapshot;
import org.folio.services.AbstractLBServiceTest;
import org.folio.services.RecordSearchParameters;
import org.folio.services.RecordService;
import org.folio.services.RecordServiceImpl;
import org.folio.services.TenantDataProviderImpl;
import org.folio.services.util.parser.ParseFieldsResult;
import org.folio.services.util.parser.ParseLeaderResult;
import org.folio.services.util.parser.SearchExpressionParser;
import org.folio.verticle.MarcIndexersVersionDeletionVerticle;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;

import java.util.List;
import java.util.Set;
import java.util.UUID;

import static org.folio.rest.jaxrs.model.Record.State.ACTUAL;

@RunWith(VertxUnitRunner.class)
public class RecordDaoImplTest extends AbstractLBServiceTest {

private RecordDao recordDao;
private Record record;

private RecordService recordService;

@Before
public void setUp(TestContext context) {
Async async = context.async();
recordDao = new RecordDaoImpl(postgresClientFactory);
// recordService = new RecordServiceImpl(recordDao);

String recordId = UUID.randomUUID().toString();
Snapshot snapshot = TestMocks.getSnapshot(0);

this.record = new Record()
.withId(recordId)
.withState(ACTUAL)
.withMatchedId(recordId)
.withSnapshotId(snapshot.getJobExecutionId())
.withGeneration(0)
.withRecordType(Record.RecordType.MARC_BIB)
.withRawRecord(TestMocks.getRecord(0).getRawRecord().withId(recordId))
.withParsedRecord(TestMocks.getRecord(0).getParsedRecord().withId(recordId));

SnapshotDaoUtil.save(postgresClientFactory.getQueryExecutor(TENANT_ID), snapshot)
.compose(savedSnapshot -> recordService.saveRecord(record, TENANT_ID))
.onComplete(save -> {
if (save.failed()) {
context.fail(save.cause());
}
async.complete();
});
}

@After
public void cleanUp(TestContext context) {
Async async = context.async();
SnapshotDaoUtil.deleteAll(postgresClientFactory.getQueryExecutor(TENANT_ID)).onComplete(delete -> {
if (delete.failed()) {
context.fail(delete.cause());
}
async.complete();
});
}

public void t(TestContext context) {
// ParseLeaderResult parseLeaderResult = new ParseLeaderResult();
// ParseFieldsResult parseFieldsResult = new ParseFieldsResult()
// .withFieldsToJoin(Set.of("001"))
// .withBindingParams(List.of("393893"))
// .withWhereExpression("\"i001\".\"value\" = ?")
// .enable();
// RecordSearchParameters recordSearchParameters = new RecordSearchParameters();

MarcRecordSearchRequest searchRequest = new MarcRecordSearchRequest()
.withFieldsSearchExpression("001.value = '393893'");
RecordSearchParameters searchParams = RecordSearchParameters.from(searchRequest);
ParseLeaderResult parseLeaderResult = SearchExpressionParser.parseLeaderSearchExpression(searchParams.getLeaderSearchExpression());
ParseFieldsResult parseFieldsResult = SearchExpressionParser.parseFieldsSearchExpression(searchParams.getFieldsSearchExpression());
recordDao.streamMarcRecordIds(parseLeaderResult, parseFieldsResult, searchParams, TENANT_ID);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public abstract class AbstractRestVerticleTest {
private static String useExternalDatabase;
private static int okapiPort;

static final String TENANT_ID = "test";
static final String TENANT_ID = "test2";

static final String SOURCE_STORAGE_RECORDS_PATH = "/source-storage/records";
static final String SOURCE_STORAGE_SNAPSHOTS_PATH = "/source-storage/snapshots";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import javax.ws.rs.BadRequestException;

import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import org.folio.dao.util.IdType;
import org.folio.dao.util.RecordType;
import org.folio.rest.jooq.enums.RecordState;
Expand All @@ -16,6 +18,18 @@ public class QueryParamUtilTest {

@Test
public void shouldReturnRecordExternalIdType() {
Flowable<Integer> source = Flowable.just(0, 0, 0);
Flowable<Integer> fallback = Flowable.just(1, 2, 3);

Disposable subscribe = source
.switchMap(integer -> fallback)
// .switchIfEmpty(fallback)
// .filter(n -> n != null)
.subscribe(System.out::println);

System.out.println(subscribe);


assertEquals(IdType.RECORD, QueryParamUtil.toExternalIdType("RECORD"));
}

Expand Down

0 comments on commit 58564e8

Please sign in to comment.