Skip to content

Commit

Permalink
Parallelize per tree element
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld committed Feb 22, 2024
1 parent 650243e commit b23898d
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.bakdata.conquery.models.config;

import javax.validation.constraints.Min;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand Down Expand Up @@ -29,4 +31,10 @@ public class SqlConnectorConfig {
private String jdbcConnectionUrl;

private String primaryColumn = DEFAULT_PRIMARY_COLUMN;

/**
* The amount of threads for background tasks like calculating matching stats {@link com.bakdata.conquery.models.jobs.SqlUpdateMatchingStatsJob}.
*/
@Min(1)
private int backgroundThreads;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ public interface ConceptTreeNode<ID extends ConceptElementId<? extends ConceptEl

MatchingStats getMatchingStats();

void setMatchingStats(MatchingStats matchingStats);

String getDescription();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.bakdata.conquery.mode.local.SqlMatchingStats;
import com.bakdata.conquery.models.common.daterange.CDateRange;
Expand All @@ -26,6 +30,7 @@
import com.bakdata.conquery.sql.conversion.model.ColumnDateRange;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import com.bakdata.conquery.util.TablePrimaryColumnUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Condition;
import org.jooq.DSLContext;
Expand Down Expand Up @@ -55,6 +60,7 @@ public class SqlUpdateMatchingStatsJob extends Job {
private final DSLContext dslContext;
private final SqlFunctionProvider functionProvider;
private final Collection<Concept<?>> concepts;
private final ExecutorService executors;

public SqlUpdateMatchingStatsJob(
SqlConnectorConfig sqlConnectorConfig,
Expand All @@ -67,6 +73,7 @@ public SqlUpdateMatchingStatsJob(
this.dslContext = executionService.getDslContext();
this.functionProvider = functionProvider;
this.concepts = concepts;
this.executors = Executors.newFixedThreadPool(sqlConnectorConfig.getBackgroundThreads());
}

@Override
Expand All @@ -79,34 +86,38 @@ public void execute() throws Exception {

log.debug("BEGIN update Matching stats for {} Concepts.", concepts.size());

for (Concept<?> concept : concepts) {
if (!(concept instanceof TreeConcept tree)) {
log.error("Collecting MatchingStats is currently only supported for TreeConcepts.");
break;
}
SqlMatchingStats matchingStats = collectMatchingStats(concept.getConnectors(), tree);
concept.setMatchingStats(matchingStats);
concepts.stream()
.filter(SqlUpdateMatchingStatsJob::isTreeConcept)
.flatMap(concept -> collectMatchingStats(concept.getConnectors(), (TreeConcept) concept))
.forEach(executors::submit);

executors.shutdown();
while (!executors.awaitTermination(1, TimeUnit.MINUTES)) {
log.debug("Waiting for executors to set matching stats for all concepts...");
}

log.debug("DONE collecting matching stats.");
}

private SqlMatchingStats collectMatchingStats(List<? extends Connector> connectors, ConceptTreeNode<?> treeNode) {

treeNode.getChildren().forEach(child -> {
SqlMatchingStats childStats = collectMatchingStats(connectors, child);
child.setMatchingStats(childStats);
});

Optional<CTCondition> childCondition = treeNode instanceof ConceptTreeChild treeChild
? Optional.of(treeChild.getCondition())
: Optional.empty();
@Override
public void cancel() {
super.cancel();
executors.shutdownNow();
}

long events = collectEventCount(connectors, childCondition);
long entities = collectEntityCount(connectors, childCondition);
CDateRange span = collectDateSpan(connectors, childCondition);
private static boolean isTreeConcept(Concept<?> concept) {
if (!(concept instanceof TreeConcept)) {
log.error("Collecting MatchingStats is currently only supported for TreeConcepts.");
return false;
}
return true;
}

return new SqlMatchingStats(events, entities, span);
private Stream<Runnable> collectMatchingStats(List<? extends Connector> connectors, ConceptTreeNode<?> treeNode) {
return Stream.concat(
treeNode.getChildren().stream().flatMap(child -> collectMatchingStats(connectors, child)),
Stream.of(new SqlMatchingStatsTask(connectors, treeNode))
);
}

/**
Expand Down Expand Up @@ -261,4 +272,25 @@ private Condition toJooqCondition(Connector connector, Optional<CTCondition> chi
.orElse(DSL.noCondition());
}

@RequiredArgsConstructor
private class SqlMatchingStatsTask implements Runnable {

private final List<? extends Connector> connectors;
private final ConceptTreeNode<?> treeNode;

@Override
public void run() {
Optional<CTCondition> childCondition = treeNode instanceof ConceptTreeChild treeChild
? Optional.of(treeChild.getCondition())
: Optional.empty();

long events = collectEventCount(connectors, childCondition);
long entities = collectEntityCount(connectors, childCondition);
CDateRange span = collectDateSpan(connectors, childCondition);

SqlMatchingStats matchingStats = new SqlMatchingStats(events, entities, span);
treeNode.setMatchingStats(matchingStats);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ColumnDateRange daterange(CDateRange dateRestriction) {
}

@Override
public ColumnDateRange daterange(ValidityDate validityDate, String qualifier, String conceptLabel) {
public ColumnDateRange daterange(ValidityDate validityDate, String qualifier, String label) {

Column startColumn;
Column endColumn;
Expand All @@ -104,7 +104,7 @@ public ColumnDateRange daterange(ValidityDate validityDate, String qualifier, St
);

return ColumnDateRange.of(rangeStart, rangeEnd)
.asValidityDateRange(conceptLabel);
.asValidityDateRange(label);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public HanaTestcontainerContextProvider() {
.databaseUsername(hanaContainer.getUsername())
.databasePassword(hanaContainer.getPassword())
.primaryColumn("pid")
.backgroundThreads(Runtime.getRuntime().availableProcessors())
.build();
this.dslContext = DslContextFactory.create(sqlConnectorConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ static void before() {
.databasePassword(PASSWORD)
.withPrettyPrinting(true)
.primaryColumn("pid")
.backgroundThreads(Runtime.getRuntime().availableProcessors())
.build();
dslContext = DslContextFactory.create(sqlConfig);
testSqlDialect = new TestPostgreSqlDialect(dslContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pid,icd_code,aufnahmedatum,entlassungsdatum
primary_column,icd_code,aufnahmedatum,entlassungsdatum
3,"F200",2022-11-28,2022-11-11
3,"F201",2021-08-31,2021-12-15
3,"F204",2010-07-01,2019-07-13
Expand Down

0 comments on commit b23898d

Please sign in to comment.