From 650243edfb3534e4b5ea02dfe21dc12bbe5c8210 Mon Sep 17 00:00:00 2001 From: Jonas Arnhold Date: Thu, 25 Jan 2024 12:25:53 +0100 Subject: [PATCH] Implement MatchingStats for SQL mode --- .../mode/cluster/WorkerMatchingStats.java | 114 ++++++++ .../mode/local/LocalManagerProvider.java | 6 +- .../mode/local/LocalNamespaceHandler.java | 3 + .../mode/local/LocalStorageListener.java | 21 +- .../conquery/mode/local/SqlMatchingStats.java | 29 ++ .../models/config/SqlConnectorConfig.java | 4 +- .../conquery/models/datasets/Table.java | 8 +- .../datasets/concepts/MatchingStats.java | 103 +------ .../concepts/conditions/PrefixCondition.java | 2 +- .../conditions/PrefixRangeCondition.java | 9 +- .../jobs/SqlUpdateMatchingStatsJob.java | 264 ++++++++++++++++++ .../jobs/WorkerUpdateMatchingStatsJob.java | 158 +++++++++++ .../specific/UpdateElementMatchingStats.java | 13 +- .../specific/UpdateMatchingStatsMessage.java | 151 +--------- .../models/worker/LocalNamespace.java | 4 + .../concept/AggregationFilterCte.java | 5 +- .../concept/AggregationSelectCte.java | 9 +- .../cqelement/concept/CQConceptConverter.java | 10 +- .../cqelement/concept/EventFilterCte.java | 6 +- .../cqelement/concept/FinalConceptCte.java | 5 +- .../dialect/HanaSqlFunctionProvider.java | 14 +- .../dialect/PostgreSqlFunctionProvider.java | 18 +- .../dialect/SqlFunctionProvider.java | 21 ++ .../sql/conversion/model/ColumnDateRange.java | 4 + .../conquery/util/TablePrimaryColumnUtil.java | 17 ++ .../integration/common/RequiredTable.java | 16 +- .../tests/MetadataCollectionTest.java | 57 ++-- ...sts.java => WorkerMatchingStatsTests.java} | 21 +- .../tests/matchingstats/icd.concept.json | 210 ++++++++++++++ .../tests/matchingstats/kh-content.csv | 11 + 30 files changed, 1002 insertions(+), 311 deletions(-) create mode 100644 backend/src/main/java/com/bakdata/conquery/mode/cluster/WorkerMatchingStats.java create mode 100644 backend/src/main/java/com/bakdata/conquery/mode/local/SqlMatchingStats.java create mode 100644 backend/src/main/java/com/bakdata/conquery/models/jobs/SqlUpdateMatchingStatsJob.java create mode 100644 backend/src/main/java/com/bakdata/conquery/models/jobs/WorkerUpdateMatchingStatsJob.java create mode 100644 backend/src/main/java/com/bakdata/conquery/util/TablePrimaryColumnUtil.java rename backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/{MatchingStatsTests.java => WorkerMatchingStatsTests.java} (80%) create mode 100644 backend/src/test/resources/tests/matchingstats/icd.concept.json create mode 100644 backend/src/test/resources/tests/matchingstats/kh-content.csv diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/WorkerMatchingStats.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/WorkerMatchingStats.java new file mode 100644 index 0000000000..05470fee06 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/WorkerMatchingStats.java @@ -0,0 +1,114 @@ +package com.bakdata.conquery.mode.cluster; + +import java.util.HashMap; +import java.util.Map; + +import com.bakdata.conquery.models.common.daterange.CDateRange; +import com.bakdata.conquery.models.datasets.Column; +import com.bakdata.conquery.models.datasets.Table; +import com.bakdata.conquery.models.datasets.concepts.MatchingStats; +import com.bakdata.conquery.models.events.Bucket; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.fasterxml.jackson.annotation.JsonIgnore; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Setter +public class WorkerMatchingStats implements MatchingStats { + + private Map entries = new HashMap<>(); + + @JsonIgnore + private transient CDateRange span; + + @JsonIgnore + private transient long numberOfEvents = -1L; + + @JsonIgnore + private transient long numberOfEntities = -1L; + + @Override + public long countEvents() { + if (numberOfEvents == -1L) { + synchronized (this) { + if (numberOfEvents == -1L) { + numberOfEvents = entries.values().stream().mapToLong(Entry::getNumberOfEvents).sum(); + } + } + } + return numberOfEvents; + } + + + @Override + public long countEntities() { + if (numberOfEntities == -1L) { + synchronized (this) { + if (numberOfEntities == -1L) { + numberOfEntities = entries.values().stream().mapToLong(Entry::getNumberOfEntities).sum(); + } + } + } + return numberOfEntities; + } + + @Override + public CDateRange spanEvents() { + if (span == null) { + synchronized (this) { + if (span == null) { + span = entries.values().stream().map(Entry::getSpan).reduce(CDateRange.all(), CDateRange::spanClosed); + } + } + } + return span; + + } + + public void putEntry(WorkerId source, Entry entry) { + synchronized (this) { + entries.put(source, entry); + span = null; + numberOfEntities = -1L; + numberOfEvents = -1L; + } + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class Entry { + private long numberOfEvents; + + @JsonIgnore + private final IntSet foundEntities = new IntOpenHashSet(); + private long numberOfEntities; + private CDateRange span; + + + public void addEvent(Table table, Bucket bucket, int event, int entityForEvent) { + numberOfEvents++; + if (foundEntities.add(entityForEvent)) { + numberOfEntities++; + } + + for (Column c : table.getColumns()) { + if (!c.getType().isDateCompatible()) { + continue; + } + + if (!bucket.has(event, c)) { + continue; + } + + final CDateRange time = bucket.getAsDateRange(event, c); + span = time.spanClosed(span); + } + } + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java index 6214b22374..e45fd64328 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java @@ -19,6 +19,7 @@ import com.bakdata.conquery.sql.conversion.dialect.HanaSqlDialect; import com.bakdata.conquery.sql.conversion.dialect.PostgreSqlDialect; import com.bakdata.conquery.sql.conversion.dialect.SqlDialect; +import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider; import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory; import com.bakdata.conquery.sql.execution.SqlExecutionService; import io.dropwizard.setup.Environment; @@ -35,6 +36,7 @@ public DelegateManager provideManager(ConqueryConfig config, Env SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig(); DSLContext dslContext = DslContextFactory.create(sqlConnectorConfig); SqlDialect sqlDialect = createSqlDialect(sqlConnectorConfig, dslContext); + SqlFunctionProvider functionProvider = sqlDialect.getFunctionProvider(); SqlContext sqlContext = new SqlContext(sqlConnectorConfig, sqlDialect); SqlExecutionService sqlExecutionService = new SqlExecutionService( @@ -42,7 +44,7 @@ public DelegateManager provideManager(ConqueryConfig config, Env ResultSetProcessorFactory.create(sqlDialect) ); - NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext, sqlExecutionService); + NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext, sqlExecutionService, functionProvider); DatasetRegistry datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator, sqlExecutionService); creator.init(datasetRegistry); @@ -51,7 +53,7 @@ public DelegateManager provideManager(ConqueryConfig config, Env environment, datasetRegistry, new FailingImportHandler(), - new LocalStorageListener(), + new LocalStorageListener(datasetRegistry), EMPTY_NODE_PROVIDER, List.of(), creator, diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java index ce79a76c6c..5cb7ab5f97 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.models.worker.LocalNamespace; import com.bakdata.conquery.sql.SqlContext; import com.bakdata.conquery.sql.conquery.SqlExecutionManager; +import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider; import com.bakdata.conquery.sql.execution.SqlExecutionService; import lombok.RequiredArgsConstructor; @@ -22,6 +23,7 @@ public class LocalNamespaceHandler implements NamespaceHandler { private final InternalObjectMapperCreator mapperCreator; private final SqlContext sqlContext; private final SqlExecutionService sqlExecutionService; + private final SqlFunctionProvider functionProvider; @Override public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService) { @@ -33,6 +35,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto namespaceStorage, executionManager, sqlExecutionService, + functionProvider, namespaceData.getJobManager(), namespaceData.getFilterSearch(), namespaceData.getIndexService(), diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalStorageListener.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalStorageListener.java index 20dcd9e25b..f3894bda81 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalStorageListener.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalStorageListener.java @@ -1,14 +1,21 @@ package com.bakdata.conquery.mode.local; +import java.util.Collection; + import com.bakdata.conquery.mode.StorageListener; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.SecondaryIdDescription; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.jobs.SqlUpdateMatchingStatsJob; +import com.bakdata.conquery.models.worker.DatasetRegistry; +import com.bakdata.conquery.models.worker.LocalNamespace; +import lombok.RequiredArgsConstructor; +@RequiredArgsConstructor public class LocalStorageListener implements StorageListener { - // When running without shards, no further actions are required + private final DatasetRegistry datasetRegistry; @Override public void onAddSecondaryId(SecondaryIdDescription secondaryId) { @@ -36,5 +43,17 @@ public void onDeleteConcept(Concept concept) { @Override public void onUpdateMatchingStats(Dataset dataset) { + + final LocalNamespace namespace = datasetRegistry.get(dataset.getId()); + final Collection> concepts = namespace.getStorage().getAllConcepts(); + + SqlUpdateMatchingStatsJob matchingStatsJob = new SqlUpdateMatchingStatsJob( + datasetRegistry.getConfig().getSqlConnectorConfig(), + namespace.getSqlExecutionService(), + namespace.getFunctionProvider(), + concepts + ); + + datasetRegistry.get(dataset.getId()).getJobManager().addSlowJob(matchingStatsJob); } } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/SqlMatchingStats.java b/backend/src/main/java/com/bakdata/conquery/mode/local/SqlMatchingStats.java new file mode 100644 index 0000000000..a47199712b --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/SqlMatchingStats.java @@ -0,0 +1,29 @@ +package com.bakdata.conquery.mode.local; + +import com.bakdata.conquery.models.common.daterange.CDateRange; +import com.bakdata.conquery.models.datasets.concepts.MatchingStats; +import lombok.Value; + +@Value +public class SqlMatchingStats implements MatchingStats { + + long numberOfEvents; + long numberOfEntities; + CDateRange span; + + @Override + public long countEvents() { + return numberOfEvents; + } + + @Override + public long countEntities() { + return numberOfEntities; + } + + @Override + public CDateRange spanEvents() { + return span; + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java index e6e83b723b..7393a58f01 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java @@ -11,6 +11,8 @@ @AllArgsConstructor public class SqlConnectorConfig { + public static final String DEFAULT_PRIMARY_COLUMN = "pid"; + boolean enabled; private Dialect dialect; @@ -26,5 +28,5 @@ public class SqlConnectorConfig { private String jdbcConnectionUrl; - private String primaryColumn = "pid"; + private String primaryColumn = DEFAULT_PRIMARY_COLUMN; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/Table.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/Table.java index 856fdd54b7..6ecfad057b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/Table.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/Table.java @@ -5,6 +5,7 @@ import java.util.Set; import java.util.stream.Stream; +import javax.annotation.Nullable; import javax.validation.Valid; import javax.validation.constraints.NotNull; @@ -33,7 +34,12 @@ public class Table extends Labeled implements NamespacedIdentifiable entries = new HashMap<>(); - @JsonIgnore - private transient CDateRange span; + long countEvents(); - @JsonIgnore - private transient long numberOfEvents = -1L; + long countEntities(); - @JsonIgnore - private transient long numberOfEntities = -1L; - - public long countEvents() { - if (numberOfEvents == -1L) { - synchronized (this) { - if (numberOfEvents == -1L) { - numberOfEvents = entries.values().stream().mapToLong(Entry::getNumberOfEvents).sum(); - } - } - } - return numberOfEvents; - } - - - public long countEntities() { - if (numberOfEntities == -1L) { - synchronized (this) { - if (numberOfEntities == -1L) { - numberOfEntities = entries.values().stream().mapToLong(Entry::getNumberOfEntities).sum(); - } - } - } - return numberOfEntities; - } - - public CDateRange spanEvents() { - if (span == null) { - synchronized (this) { - if (span == null) { - span = entries.values().stream().map(Entry::getSpan).reduce(CDateRange.all(), CDateRange::spanClosed); - } - } - } - return span; - - } - - public void putEntry(WorkerId source, Entry entry) { - synchronized (this) { - entries.put(source, entry); - span = null; - numberOfEntities = -1L; - numberOfEvents = -1L; - } - } - - @Data - @NoArgsConstructor - @AllArgsConstructor - public static class Entry { - private long numberOfEvents; - - @JsonIgnore - private final IntSet foundEntities = new IntOpenHashSet(); - private long numberOfEntities; - private CDateRange span; - - - public void addEvent(Table table, Bucket bucket, int event, int entityForEvent) { - numberOfEvents++; - if (foundEntities.add(entityForEvent)) { - numberOfEntities++; - } - - for (Column c : table.getColumns()) { - if (!c.getType().isDateCompatible()) { - continue; - } - - if (!bucket.has(event, c)) { - continue; - } - - final CDateRange time = bucket.getAsDateRange(event, c); - span = time.spanClosed(span); - } - } - } + @Nullable + CDateRange spanEvents(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java index 07abf512a2..da9a839175 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixCondition.java @@ -44,7 +44,7 @@ public boolean matches(String value, CalculatedValue> rowMap @Override public WhereCondition convertToSqlCondition(CTConditionContext context) { Field field = DSL.field(DSL.name(context.getConnectorTable().getName(), context.getConnectorColumn().getName()), String.class); - String pattern = Arrays.stream(prefixes).collect(Collectors.joining("|", "", "%")); + String pattern = Arrays.stream(prefixes).collect(Collectors.joining("|", "", context.getFunctionProvider().getAnyCharRegex())); Condition condition = context.getFunctionProvider().likeRegex(field, pattern); return new WhereConditionWrapper(condition, ConditionType.PREPROCESSING); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java index fef3ba818f..2652728cd3 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/conditions/PrefixRangeCondition.java @@ -6,6 +6,7 @@ import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; +import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider; import com.bakdata.conquery.sql.conversion.model.filter.ConditionType; import com.bakdata.conquery.sql.conversion.model.filter.WhereCondition; import com.bakdata.conquery.sql.conversion.model.filter.WhereConditionWrapper; @@ -24,8 +25,6 @@ @CPSType(id="PREFIX_RANGE", base=CTCondition.class) public class PrefixRangeCondition implements CTCondition { - private static final String ANY_CHAR_REGEX = ".*"; - @Getter @Setter @NotEmpty private String min; @Getter @Setter @NotEmpty @@ -51,12 +50,12 @@ public boolean matches(String value, CalculatedValue> rowMap @Override public WhereCondition convertToSqlCondition(CTConditionContext context) { Field field = DSL.field(DSL.name(context.getConnectorTable().getName(), context.getConnectorColumn().getName()), String.class); - String pattern = buildSqlRegexPattern(); + String pattern = buildSqlRegexPattern(context.getFunctionProvider()); Condition regexCondition = context.getFunctionProvider().likeRegex(field, pattern); return new WhereConditionWrapper(regexCondition, ConditionType.PREPROCESSING); } - private String buildSqlRegexPattern() { + private String buildSqlRegexPattern(SqlFunctionProvider functionProvider) { StringBuilder builder = new StringBuilder(); char[] minChars = min.toCharArray(); char[] maxChars = max.toCharArray(); @@ -70,6 +69,6 @@ private String buildSqlRegexPattern() { builder.append(minChar); } } - return builder.append(ANY_CHAR_REGEX).toString(); + return builder.append(functionProvider.getAnyCharRegex()).toString(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/SqlUpdateMatchingStatsJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/SqlUpdateMatchingStatsJob.java new file mode 100644 index 0000000000..17ed7af621 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/SqlUpdateMatchingStatsJob.java @@ -0,0 +1,264 @@ +package com.bakdata.conquery.models.jobs; + +import java.math.BigDecimal; +import java.sql.Date; +import java.time.LocalDate; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.bakdata.conquery.mode.local.SqlMatchingStats; +import com.bakdata.conquery.models.common.daterange.CDateRange; +import com.bakdata.conquery.models.config.SqlConnectorConfig; +import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.Connector; +import com.bakdata.conquery.models.datasets.concepts.conditions.CTCondition; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode; +import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; +import com.bakdata.conquery.sql.conversion.cqelement.concept.CTConditionContext; +import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider; +import com.bakdata.conquery.sql.conversion.model.ColumnDateRange; +import com.bakdata.conquery.sql.execution.SqlExecutionService; +import com.bakdata.conquery.util.TablePrimaryColumnUtil; +import lombok.extern.slf4j.Slf4j; +import org.jooq.Condition; +import org.jooq.DSLContext; +import org.jooq.Record; +import org.jooq.Record1; +import org.jooq.Record2; +import org.jooq.Result; +import org.jooq.Select; +import org.jooq.SelectConditionStep; +import org.jooq.SelectJoinStep; +import org.jooq.impl.DSL; + +@Slf4j +public class SqlUpdateMatchingStatsJob extends Job { + + private static final String EVENTS_FIELD = "events"; + private static final String EVENTS_TABLE = "events_unioned"; + private static final String PRIMARY_COLUMN = "primary_column"; + private static final String ENTITIES_TABLE = "entities"; + private static final String VALIDITY_DATE_SELECT = "unioned"; + private static final String VALIDITY_DATES_TABLE = "validity_dates"; + private static final String MIN_VALIDITY_DATE_FIELD = "min_validity_date"; + private static final String MAX_VALIDITY_DATE_FIELD = "max_validity_date"; + + private final SqlConnectorConfig sqlConnectorConfig; + private final SqlExecutionService executionService; + private final DSLContext dslContext; + private final SqlFunctionProvider functionProvider; + private final Collection> concepts; + + public SqlUpdateMatchingStatsJob( + SqlConnectorConfig sqlConnectorConfig, + SqlExecutionService executionService, + SqlFunctionProvider functionProvider, + Collection> concepts + ) { + this.sqlConnectorConfig = sqlConnectorConfig; + this.executionService = executionService; + this.dslContext = executionService.getDslContext(); + this.functionProvider = functionProvider; + this.concepts = concepts; + } + + @Override + public String getLabel() { + return "Calculating Matching Stats for %s.".formatted(executionService); + } + + @Override + public void execute() throws Exception { + + log.debug("BEGIN update Matching stats for {} Concepts.", concepts.size()); + + for (Concept concept : concepts) { + if (!(concept instanceof TreeConcept tree)) { + log.error("Collecting MatchingStats is currently only supported for TreeConcepts."); + break; + } + SqlMatchingStats matchingStats = collectMatchingStats(concept.getConnectors(), tree); + concept.setMatchingStats(matchingStats); + } + + log.debug("DONE collecting matching stats."); + } + + private SqlMatchingStats collectMatchingStats(List connectors, ConceptTreeNode treeNode) { + + treeNode.getChildren().forEach(child -> { + SqlMatchingStats childStats = collectMatchingStats(connectors, child); + child.setMatchingStats(childStats); + }); + + Optional childCondition = treeNode instanceof ConceptTreeChild treeChild + ? Optional.of(treeChild.getCondition()) + : Optional.empty(); + + long events = collectEventCount(connectors, childCondition); + long entities = collectEntityCount(connectors, childCondition); + CDateRange span = collectDateSpan(connectors, childCondition); + + return new SqlMatchingStats(events, entities, span); + } + + /** + * Applies a count(*) on each connector's table, unions these tables and finally calculates the sum() of the count per connector + * to obtain the concept's total event count. + */ + private long collectEventCount(List connectors, Optional childCondition) { + + org.jooq.Table> eventsUnioned = + union(connectors, connector -> createCountEventsQuery(connector, childCondition), Select::unionAll, EVENTS_TABLE); + + SelectJoinStep> eventsQuery = dslContext.select(DSL.sum(eventsUnioned.field(EVENTS_FIELD, BigDecimal.class)).as(EVENTS_FIELD)) + .from(eventsUnioned); + + Result result = executionService.fetch(eventsQuery); + try { + BigDecimal events = (BigDecimal) result.getValue(0, EVENTS_FIELD); + return Objects.requireNonNull(events).longValue(); + } + catch (Exception e) { + log.error("Expecting exactly 1 column of numeric type and 1 row in Result when querying for events of a concept node. Error: ", e); + return 0; + } + } + + private SelectConditionStep> createCountEventsQuery(Connector connector, Optional childCondition) { + return dslContext.select(DSL.count().as(EVENTS_FIELD)) + .from(DSL.table(DSL.name(connector.getTable().getName()))) + .where(toJooqCondition(connector, childCondition)); + } + + /** + * Selects the PIDs for each connector, unions these tables and does a countDistinct(pid) to obtain the concepts total entity count. + */ + private long collectEntityCount(List connectors, Optional childCondition) { + + org.jooq.Table> entitiesUnioned = + union(connectors, connector -> createCountEntitiesQuery(connector, childCondition), Select::union, ENTITIES_TABLE); + + SelectJoinStep> entitiesQuery = dslContext.select(DSL.countDistinct(entitiesUnioned.field(PRIMARY_COLUMN)).as(PRIMARY_COLUMN)) + .from(entitiesUnioned); + + Result result = executionService.fetch(entitiesQuery); + try { + // we will get an Integer as SQL return type of SUM select, but MatchingStats expect a long + Integer value = (Integer) result.getValue(0, PRIMARY_COLUMN); + return Objects.requireNonNull(value).longValue(); + } + catch (Exception e) { + log.error("Expecting exactly 1 column of type Integer and 1 row in Result when querying for events of a concept node. Error: ", e); + return 0; + } + } + + private SelectConditionStep> createCountEntitiesQuery(Connector connector, Optional childCondition) { + return dslContext.select(TablePrimaryColumnUtil.findPrimaryColumn(connector.getTable(), sqlConnectorConfig)) + .from(DSL.table(DSL.name(connector.getTable().getName()))) + .where(toJooqCondition(connector, childCondition)); + } + + /** + * For each connector and each of its validity dates, we select the start and end date, union all these tables and select the min(start) and max(end) + * to obtain the concepts total date span. + * + * @return A {@link CDateRange} with the min and max validity date over all the given connectors. Null, if the given connectors have no validity date at all. + */ + private CDateRange collectDateSpan(List connectors, Optional childCondition) { + + Map> validityDateMap = connectors.stream().collect( + // we create all validity dates with the same alias to union them later + Collectors.toMap(Function.identity(), connector -> createColumnDateRanges(connector, VALIDITY_DATE_SELECT)) + ); + if (validityDateMap.isEmpty()) { + return null; + } + + org.jooq.Table validityDatesUnioned = unionAllValidityDates(validityDateMap, childCondition); + // we just need any of the generated column date ranges to get the name of the unioned field(s) + ColumnDateRange anyOfTheUnionedDates = validityDateMap.get(connectors.get(0)).get(0); + // ensure we have a start and end field (and not a single-column range), because we need to get the min(start) and max(end) + ColumnDateRange dualColumn = functionProvider.toDualColumn(anyOfTheUnionedDates); + SelectJoinStep> dateSpanQuery = dslContext.select( + DSL.min(dualColumn.getStart()).as(MIN_VALIDITY_DATE_FIELD), + DSL.max(dualColumn.getEnd()).as(MAX_VALIDITY_DATE_FIELD) + ) + .from(validityDatesUnioned); + + Result result = executionService.fetch(dateSpanQuery); + try { + LocalDate minDate = getDateFromResult(result, MIN_VALIDITY_DATE_FIELD, LocalDate.MIN); + LocalDate maxDate = getDateFromResult(result, MAX_VALIDITY_DATE_FIELD, LocalDate.MAX); + if (maxDate != LocalDate.MAX) { + // we treat the end date as excluded internally when using ColumnDateRanges, but a CDateRange expects an inclusive range + maxDate = maxDate.minusDays(1); + } + return CDateRange.of(minDate, maxDate); + } + catch (Exception e) { + log.error("Expecting exactly 2 columns (start and end date) of type date when querying for the date span of a concept. Error: ", e); + return null; + } + } + + private List createColumnDateRanges(Connector connector, String alias) { + return connector.getValidityDates().stream() + .map(validityDate -> functionProvider.daterange(validityDate, connector.getTable().getName(), alias)) + .toList(); + } + + private org.jooq.Table unionAllValidityDates(Map> validityDateMap, Optional childCondition) { + return validityDateMap.entrySet().stream() + .flatMap(entry -> { + Connector connector = entry.getKey(); + List validityDates = entry.getValue(); + return validityDates.stream().map(columnDateRange -> createValidityDateQuery(columnDateRange, connector, childCondition)); + }) + .reduce((validityDate1, validityDate2) -> (SelectConditionStep) validityDate1.unionAll(validityDate2)) + .orElseThrow(() -> new RuntimeException("Expected at least 1 validity date to be present.")) + .asTable(DSL.name(VALIDITY_DATES_TABLE)); + } + + private SelectConditionStep createValidityDateQuery(ColumnDateRange columnDateRange, Connector connector, Optional childCondition) { + return dslContext.select(columnDateRange.toFields()) + .from(DSL.table(DSL.name(connector.getTable().getName()))) + .where(toJooqCondition(connector, childCondition)); + } + + private LocalDate getDateFromResult(Result result, String field, LocalDate defaultDate) { + return Optional.ofNullable(result.getValue(0, field)) + .map(Object::toString) + .map(LocalDate::parse) + .orElse(defaultDate); + } + + private static org.jooq.Table union( + Collection input, + Function> mapper, + BinaryOperator> operator, + String tableName + ) { + return input.stream() + .map(mapper) + .reduce(operator) + .orElseThrow(() -> new IllegalStateException("Expected at least one element to union")) + .asTable(DSL.name(tableName)); + } + + private Condition toJooqCondition(Connector connector, Optional childCondition) { + CTConditionContext context = new CTConditionContext(connector.getTable(), connector.getColumn(), functionProvider); + return childCondition.or(() -> Optional.ofNullable(connector.getCondition())) + .map(condition -> condition.convertToSqlCondition(context).condition()) + .orElse(DSL.noCondition()); + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/WorkerUpdateMatchingStatsJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/WorkerUpdateMatchingStatsJob.java new file mode 100644 index 0000000000..35989a8e12 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/WorkerUpdateMatchingStatsJob.java @@ -0,0 +1,158 @@ +package com.bakdata.conquery.models.jobs; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.bakdata.conquery.mode.cluster.WorkerMatchingStats; +import com.bakdata.conquery.models.datasets.Table; +import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; +import com.bakdata.conquery.models.datasets.concepts.Connector; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode; +import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; +import com.bakdata.conquery.models.events.Bucket; +import com.bakdata.conquery.models.events.CBlock; +import com.bakdata.conquery.models.messages.namespaces.specific.UpdateElementMatchingStats; +import com.bakdata.conquery.models.worker.Worker; +import com.bakdata.conquery.util.progressreporter.ProgressReporter; +import com.google.common.base.Functions; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@RequiredArgsConstructor +public class WorkerUpdateMatchingStatsJob extends Job { + + private final Worker worker; + private final Collection> concepts; + + @Override + public String getLabel() { + return String.format("Calculate Matching Stats for %s", worker.getInfo().getDataset()); + } + + @Override + public void execute() throws Exception { + + if (worker.getStorage().getAllCBlocks().isEmpty()) { + log.debug("Worker {} is empty, skipping.", worker); + return; + } + + final ProgressReporter progressReporter = getProgressReporter(); + progressReporter.setMax(concepts.size()); + + log.info("BEGIN update Matching stats for {} Concepts", concepts.size()); + + final Map, CompletableFuture> + subJobs = + concepts.stream() + .collect(Collectors.toMap( + Functions.identity(), + concept -> CompletableFuture.runAsync(() -> { + final Map, WorkerMatchingStats.Entry> + matchingStats = + new HashMap<>(concept.countElements()); + + calculateConceptMatches(concept, matchingStats, worker); + + worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats)); + + progressReporter.report(1); + }, worker.getJobsExecutorService()) + )); + + + log.debug("All jobs submitted. Waiting for completion."); + + + final CompletableFuture all = CompletableFuture.allOf(subJobs.values().toArray(CompletableFuture[]::new)); + + do { + try { + all.get(1, TimeUnit.MINUTES); + } + catch (TimeoutException exception) { + // Count unfinished matching stats jobs. + if (log.isDebugEnabled()) { + final long unfinished = subJobs.values().stream().filter(Predicate.not(CompletableFuture::isDone)).count(); + log.debug("{} still waiting for {} tasks", worker.getInfo().getDataset(), unfinished); + } + + // When trace, also log the unfinished jobs. + if (log.isTraceEnabled()) { + subJobs.forEach((concept, future) -> { + if (future.isDone()) { + return; + } + + log.trace("Still waiting for `{}`", concept.getId()); + + }); + } + } + } while (!all.isDone()); + + log.debug("DONE collecting matching stats for {}", worker.getInfo().getDataset()); + + } + + private static void calculateConceptMatches(Concept concept, Map, WorkerMatchingStats.Entry> results, Worker worker) { + log.debug("BEGIN calculating for `{}`", concept.getId()); + + for (CBlock cBlock : worker.getStorage().getAllCBlocks()) { + + if (!cBlock.getConnector().getConcept().equals(concept)) { + continue; + } + + try { + final Bucket bucket = cBlock.getBucket(); + final Table table = bucket.getTable(); + + for (int entity : bucket.entities()) { + + final int entityEnd = bucket.getEntityEnd(entity); + + for (int event = bucket.getEntityStart(entity); event < entityEnd; event++) { + + final int[] localIds = cBlock.getPathToMostSpecificChild(event); + + + if (!(concept instanceof TreeConcept) || localIds == null) { + + results.computeIfAbsent(concept, (ignored) -> new WorkerMatchingStats.Entry()).addEvent(table, bucket, event, entity); + + continue; + } + + if (Connector.isNotContained(localIds)) { + continue; + } + + ConceptTreeNode element = ((TreeConcept) concept).getElementByLocalIdPath(localIds); + + while (element != null) { + results.computeIfAbsent(((ConceptElement) element), (ignored) -> new WorkerMatchingStats.Entry()) + .addEvent(table, bucket, event, entity); + element = element.getParent(); + } + } + } + + } + catch (Exception e) { + log.error("Failed to collect the matching stats for {}", cBlock, e); + } + } + + log.trace("DONE calculating for `{}`", concept.getId()); + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java index 5efc018639..79afee78a3 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateElementMatchingStats.java @@ -5,8 +5,8 @@ import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.io.jackson.serializer.NsIdRefKeys; +import com.bakdata.conquery.mode.cluster.WorkerMatchingStats; import com.bakdata.conquery.models.datasets.concepts.ConceptElement; -import com.bakdata.conquery.models.datasets.concepts.MatchingStats; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage; import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage; @@ -23,22 +23,23 @@ @Getter @ToString public class UpdateElementMatchingStats extends NamespaceMessage { + private final WorkerId source; @ToString.Exclude @NsIdRefKeys - private final Map, MatchingStats.Entry> values; + private final Map, WorkerMatchingStats.Entry> values; @Override public void react(DistributedNamespace context) throws Exception { - for (Entry, MatchingStats.Entry> entry : values.entrySet()) { + for (Entry, WorkerMatchingStats.Entry> entry : values.entrySet()) { try { final ConceptElement target = entry.getKey(); - final MatchingStats.Entry value = entry.getValue(); + final WorkerMatchingStats.Entry value = entry.getValue(); - MatchingStats matchingStats = target.getMatchingStats(); + WorkerMatchingStats matchingStats = (WorkerMatchingStats) target.getMatchingStats(); if (matchingStats == null) { - matchingStats = new MatchingStats(); + matchingStats = new WorkerMatchingStats(); target.setMatchingStats(matchingStats); } matchingStats.putEntry(source, value); diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java index b32c0ca9e0..3049bc9428 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java @@ -1,32 +1,15 @@ package com.bakdata.conquery.models.messages.namespaces.specific; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; -import java.util.stream.Collectors; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.io.jackson.serializer.NsIdRefCollection; -import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; -import com.bakdata.conquery.models.datasets.concepts.ConceptElement; -import com.bakdata.conquery.models.datasets.concepts.Connector; -import com.bakdata.conquery.models.datasets.concepts.MatchingStats; -import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeNode; -import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; -import com.bakdata.conquery.models.events.Bucket; -import com.bakdata.conquery.models.events.CBlock; -import com.bakdata.conquery.models.jobs.Job; +import com.bakdata.conquery.models.jobs.WorkerUpdateMatchingStatsJob; import com.bakdata.conquery.models.messages.namespaces.NamespacedMessage; import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; import com.bakdata.conquery.models.worker.Worker; -import com.bakdata.conquery.util.progressreporter.ProgressReporter; import com.fasterxml.jackson.annotation.JsonCreator; -import com.google.common.base.Functions; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -44,137 +27,7 @@ public class UpdateMatchingStatsMessage extends WorkerMessage { @Override public void react(Worker worker) throws Exception { - worker.getJobManager().addSlowJob(new UpdateMatchingStatsJob(worker, concepts)); + worker.getJobManager().addSlowJob(new WorkerUpdateMatchingStatsJob(worker, concepts)); } - @RequiredArgsConstructor - private static class UpdateMatchingStatsJob extends Job { - private final Worker worker; - private final Collection> concepts; - - @Override - public String getLabel() { - return String.format("Calculate Matching Stats for %s", worker.getInfo().getDataset()); - } - - @Override - public void execute() throws Exception { - if (worker.getStorage().getAllCBlocks().isEmpty()) { - log.debug("Worker {} is empty, skipping.", worker); - return; - } - - final ProgressReporter progressReporter = getProgressReporter(); - progressReporter.setMax(concepts.size()); - - log.info("BEGIN update Matching stats for {} Concepts", concepts.size()); - - final Map, CompletableFuture> - subJobs = - concepts.stream() - .collect(Collectors.toMap(Functions.identity(), - concept -> CompletableFuture.runAsync(() -> { - final Map, MatchingStats.Entry> - matchingStats = - new HashMap<>(concept.countElements()); - - calculateConceptMatches(concept, matchingStats, worker); - - worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats)); - - progressReporter.report(1); - }, worker.getJobsExecutorService()) - )); - - - log.debug("All jobs submitted. Waiting for completion."); - - - final CompletableFuture all = CompletableFuture.allOf(subJobs.values().toArray(CompletableFuture[]::new)); - - do { - try { - all.get(1, TimeUnit.MINUTES); - } - catch (TimeoutException exception) { - // Count unfinished matching stats jobs. - if (log.isDebugEnabled()) { - final long unfinished = subJobs.values().stream().filter(Predicate.not(CompletableFuture::isDone)).count(); - log.debug("{} still waiting for {} tasks", worker.getInfo().getDataset(), unfinished); - } - - // When trace, also log the unfinished jobs. - if (log.isTraceEnabled()) { - subJobs.forEach((concept, future) -> { - if (future.isDone()) { - return; - } - - log.trace("Still waiting for `{}`", concept.getId()); - - }); - } - } - } while (!all.isDone()); - - log.debug("DONE collecting matching stats for {}", worker.getInfo().getDataset()); - - } - - - private static void calculateConceptMatches(Concept concept, Map, MatchingStats.Entry> results, Worker worker) { - log.debug("BEGIN calculating for `{}`", concept.getId()); - - for (CBlock cBlock : worker.getStorage().getAllCBlocks()) { - - if (!cBlock.getConnector().getConcept().equals(concept)) { - continue; - } - - try { - final Bucket bucket = cBlock.getBucket(); - final Table table = bucket.getTable(); - - for (int entity : bucket.entities()) { - - final int entityEnd = bucket.getEntityEnd(entity); - - for (int event = bucket.getEntityStart(entity); event < entityEnd; event++) { - - final int[] localIds = cBlock.getPathToMostSpecificChild(event); - - - if (!(concept instanceof TreeConcept) || localIds == null) { - - results.computeIfAbsent(concept, (ignored) -> new MatchingStats.Entry()).addEvent(table, bucket, event, entity); - - continue; - } - - if (Connector.isNotContained(localIds)) { - continue; - } - - ConceptTreeNode element = ((TreeConcept) concept).getElementByLocalIdPath(localIds); - - while (element != null) { - results.computeIfAbsent(((ConceptElement) element), (ignored) -> new MatchingStats.Entry()) - .addEvent(table, bucket, event, entity); - element = element.getParent(); - } - } - } - - } - catch (Exception e) { - log.error("Failed to collect the matching stats for {}", cBlock, e); - } - } - - log.trace("DONE calculating for `{}`", concept.getId()); - } - - } - - } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java index 5bab2bad96..910977e626 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/LocalNamespace.java @@ -8,6 +8,7 @@ import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.FilterSearch; +import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider; import com.bakdata.conquery.sql.execution.SqlExecutionService; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; @@ -16,6 +17,7 @@ public class LocalNamespace extends Namespace { private final SqlExecutionService sqlExecutionService; + private final SqlFunctionProvider functionProvider; public LocalNamespace( ObjectMapper preprocessMapper, @@ -23,6 +25,7 @@ public LocalNamespace( NamespaceStorage storage, ExecutionManager executionManager, SqlExecutionService sqlExecutionService, + SqlFunctionProvider functionProvider, JobManager jobManager, FilterSearch filterSearch, IndexService indexService, @@ -30,5 +33,6 @@ public LocalNamespace( ) { super(preprocessMapper, communicationMapper, storage, executionManager, jobManager, filterSearch, indexService, injectables); this.sqlExecutionService = sqlExecutionService; + this.functionProvider = functionProvider; } } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationFilterCte.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationFilterCte.java index b6e35bb525..2c4b2066e3 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationFilterCte.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationFilterCte.java @@ -2,11 +2,13 @@ import java.util.List; +import com.bakdata.conquery.sql.conversion.model.QualifyingUtil; import com.bakdata.conquery.sql.conversion.model.QueryStep; import com.bakdata.conquery.sql.conversion.model.Selects; import com.bakdata.conquery.sql.conversion.model.filter.WhereCondition; import com.bakdata.conquery.sql.conversion.model.select.SqlSelect; import org.jooq.Condition; +import org.jooq.Field; class AggregationFilterCte extends ConceptCte { @@ -14,8 +16,9 @@ class AggregationFilterCte extends ConceptCte { public QueryStep.QueryStepBuilder convertStep(ConceptCteContext conceptCteContext) { String predecessorTableName = conceptCteContext.getConceptTables().getPredecessor(cteStep()); + Field primaryColumn = QualifyingUtil.qualify(conceptCteContext.getPrimaryColumn(), predecessorTableName); Selects aggregationFilterSelects = Selects.builder() - .primaryColumn(conceptCteContext.getPrimaryColumn()) + .primaryColumn(primaryColumn) .sqlSelects(getForAggregationFilterSelects(conceptCteContext)) .build() .qualify(predecessorTableName); diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationSelectCte.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationSelectCte.java index 9a07cecf03..e9f04d3f81 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationSelectCte.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/AggregationSelectCte.java @@ -2,28 +2,33 @@ import java.util.List; +import com.bakdata.conquery.sql.conversion.model.QualifyingUtil; import com.bakdata.conquery.sql.conversion.model.QueryStep; import com.bakdata.conquery.sql.conversion.model.Selects; import com.bakdata.conquery.sql.conversion.model.select.SqlSelect; +import org.jooq.Field; class AggregationSelectCte extends ConceptCte { @Override public QueryStep.QueryStepBuilder convertStep(ConceptCteContext conceptCteContext) { + String predecessor = conceptCteContext.getConceptTables().getPredecessor(ConceptCteStep.AGGREGATION_SELECT); + Field primaryColumn = QualifyingUtil.qualify(conceptCteContext.getPrimaryColumn(), predecessor); + List requiredInAggregationFilterStep = conceptCteContext.allConceptSelects() .flatMap(sqlSelects -> sqlSelects.getAggregationSelects().stream()) .distinct() .toList(); Selects aggregationSelectSelects = Selects.builder() - .primaryColumn(conceptCteContext.getPrimaryColumn()) + .primaryColumn(primaryColumn) .sqlSelects(requiredInAggregationFilterStep) .build(); return QueryStep.builder() .selects(aggregationSelectSelects) - .groupBy(List.of(conceptCteContext.getPrimaryColumn())); + .groupBy(List.of(primaryColumn)); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java index 687c9bec9c..06f814c08b 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java @@ -11,6 +11,7 @@ import com.bakdata.conquery.apiv1.query.concept.filter.CQTable; import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; +import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild; import com.bakdata.conquery.sql.conversion.NodeConverter; @@ -27,7 +28,9 @@ import com.bakdata.conquery.sql.conversion.model.select.FieldWrapper; import com.bakdata.conquery.sql.conversion.model.select.SqlSelect; import com.bakdata.conquery.sql.conversion.model.select.SqlSelects; +import com.bakdata.conquery.util.TablePrimaryColumnUtil; import org.jooq.Condition; +import org.jooq.Field; public class CQConceptConverter implements NodeConverter { @@ -75,9 +78,12 @@ public ConversionContext convert(CQConcept cqConcept, ConversionContext context) private ConceptCteContext createConceptCteContext(CQConcept cqConcept, ConversionContext context) { - CQTable cqTable = cqConcept.getTables().get(0); - String tableName = cqTable.getConnector().getTable().getName(); String conceptLabel = context.getNameGenerator().conceptName(cqConcept); + + CQTable cqTable = cqConcept.getTables().get(0); + Table table = cqTable.getConnector().getTable(); + Field primaryColumn = TablePrimaryColumnUtil.findPrimaryColumn(table, context.getConfig()); + String tableName = table.getName(); Optional validityDateSelect = convertValidityDate(cqTable, tableName, conceptLabel); Set requiredSteps = getRequiredSteps(cqTable, context.dateRestrictionActive(), validityDateSelect); diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/EventFilterCte.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/EventFilterCte.java index c6abf98fe5..234106204e 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/EventFilterCte.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/EventFilterCte.java @@ -4,11 +4,13 @@ import java.util.Optional; import com.bakdata.conquery.sql.conversion.model.ColumnDateRange; +import com.bakdata.conquery.sql.conversion.model.QualifyingUtil; import com.bakdata.conquery.sql.conversion.model.QueryStep; import com.bakdata.conquery.sql.conversion.model.Selects; import com.bakdata.conquery.sql.conversion.model.filter.WhereCondition; import com.bakdata.conquery.sql.conversion.model.select.SqlSelect; import org.jooq.Condition; +import org.jooq.Field; class EventFilterCte extends ConceptCte { @@ -32,6 +34,8 @@ public ConceptCteStep cteStep() { private Selects getEventFilterSelects(ConceptCteContext conceptCteContext) { String predecessorTableName = conceptCteContext.getConceptTables().getPredecessor(cteStep()); + Field primaryColumn = QualifyingUtil.qualify(conceptCteContext.getPrimaryColumn(), predecessorTableName); + Optional validityDate = conceptCteContext.getValidityDate(); if (validityDate.isPresent()) { validityDate = Optional.of(validityDate.get().qualify(predecessorTableName)); @@ -43,7 +47,7 @@ private Selects getEventFilterSelects(ConceptCteContext conceptCteContext) { .toList(); return Selects.builder() - .primaryColumn(conceptCteContext.getPrimaryColumn()) + .primaryColumn(primaryColumn) .validityDate(validityDate) .sqlSelects(sqlSelects) .build(); diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/FinalConceptCte.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/FinalConceptCte.java index 098dc1897a..e9249d2e71 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/FinalConceptCte.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/FinalConceptCte.java @@ -7,6 +7,7 @@ import com.bakdata.conquery.sql.conversion.cqelement.intervalpacking.IntervalPackingTables; import com.bakdata.conquery.sql.conversion.model.ColumnDateRange; import com.bakdata.conquery.sql.conversion.model.LogicalOperation; +import com.bakdata.conquery.sql.conversion.model.QualifyingUtil; import com.bakdata.conquery.sql.conversion.model.QueryStep; import com.bakdata.conquery.sql.conversion.model.QueryStepJoiner; import com.bakdata.conquery.sql.conversion.model.Selects; @@ -26,8 +27,10 @@ protected QueryStep.QueryStepBuilder convertStep(ConceptCteContext conceptCteCon .toList(); if (conceptCteContext.getValidityDate().isEmpty() || conceptCteContext.isExcludedFromDateAggregation()) { + String predecessor = conceptCteContext.getConceptTables().getPredecessor(ConceptCteStep.FINAL); + Field primaryColumn = QualifyingUtil.qualify(conceptCteContext.getPrimaryColumn(), predecessor); Selects finalConceptSelects = Selects.builder() - .primaryColumn(conceptCteContext.getPrimaryColumn()) + .primaryColumn(primaryColumn) .sqlSelects(forFinalStep) .build(); return QueryStep.builder() diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java index 08cec4cb31..2aab432fd1 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java @@ -18,9 +18,10 @@ class HanaSqlFunctionProvider implements SqlFunctionProvider { - public static final char DELIMITER = ','; + private static final char DELIMITER = ','; private static final String MAX_DATE_VALUE = "9999-12-31"; private static final String MIN_DATE_VALUE = "0001-01-01"; + private static final String ANY_CHAR_REGEX = ".*"; @Override public String getMinDateExpression() { @@ -41,6 +42,11 @@ public Field cast(Field field, DataType type) { ); } + @Override + public String getAnyCharRegex() { + return ANY_CHAR_REGEX; + } + @Override public Condition dateRestriction(ColumnDateRange dateRestriction, ColumnDateRange validityDate) { @@ -109,6 +115,12 @@ public ColumnDateRange aggregated(ColumnDateRange columnDateRange) { ); } + @Override + public ColumnDateRange toDualColumn(ColumnDateRange columnDateRange) { + // HANA does not support single column ranges + return ColumnDateRange.of(columnDateRange.getStart(), columnDateRange.getEnd()); + } + @Override public Field validityDateStringAggregation(ColumnDateRange columnDateRange) { diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java index 388b617996..ff8b8534ec 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java @@ -26,6 +26,7 @@ class PostgreSqlFunctionProvider implements SqlFunctionProvider { private static final String INFINITY_DATE_VALUE = "infinity"; private static final String MINUS_INFINITY_DATE_VALUE = "-infinity"; + private static final String ANY_CHAR_REGEX = "%"; @Override public String getMaxDateExpression() { @@ -37,6 +38,11 @@ public Field cast(Field field, DataType type) { return DSL.cast(field, type); } + @Override + public String getAnyCharRegex() { + return ANY_CHAR_REGEX; + } + @Override public String getMinDateExpression() { return MINUS_INFINITY_DATE_VALUE; @@ -83,7 +89,7 @@ public ColumnDateRange daterange(CDateRange dateRestriction) { } @Override - public ColumnDateRange daterange(ValidityDate validityDate, String qualifier, String conceptLabel) { + public ColumnDateRange daterange(ValidityDate validityDate, String qualifier, String alias) { Field dateRange; @@ -119,7 +125,7 @@ public ColumnDateRange daterange(ValidityDate validityDate, String qualifier, St } return ColumnDateRange.of(dateRange) - .asValidityDateRange(conceptLabel); + .asValidityDateRange(alias); } @Override @@ -127,6 +133,14 @@ public ColumnDateRange aggregated(ColumnDateRange columnDateRange) { return ColumnDateRange.of(DSL.function("range_agg", Object.class, columnDateRange.getRange())); } + @Override + public ColumnDateRange toDualColumn(ColumnDateRange columnDateRange) { + Field daterange = columnDateRange.getRange(); + Field start = DSL.function("lower", Date.class, daterange); + Field end = DSL.function("upper", Date.class, daterange); + return ColumnDateRange.of(start, end); + } + @Override public Field validityDateStringAggregation(ColumnDateRange columnDateRange) { if (!columnDateRange.isSingleColumnRange()) { diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java index 687802700a..9f37f758dd 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/SqlFunctionProvider.java @@ -32,6 +32,15 @@ public interface SqlFunctionProvider { Field cast(Field field, DataType type); + /** + * @return The regex that matches any char repeated any times (including 0), for example: + *
    + *
  • '%' for Postgres' POSIX-like regexes
  • + *
  • '.*' for HANA's PERL-like regexes
  • + *
= validityDateStart */ @@ -39,10 +48,22 @@ public interface SqlFunctionProvider { ColumnDateRange daterange(CDateRange dateRestriction); + /** + * Creates a {@link ColumnDateRange} for a given {@link ValidityDate}. The upper end of the created {@link ColumnDateRange} is treated as excluded, + * meaning that it sets the end date to one day after the upper bound of the provided {@link ValidityDate}. + */ ColumnDateRange daterange(ValidityDate validityDate, String qualifier, String conceptLabel); ColumnDateRange aggregated(ColumnDateRange columnDateRange); + /** + * Given a single-column {@link ColumnDateRange}, it will create a new {@link ColumnDateRange} with a start and end field. + * For dialects that don't support single-column ranges, it will create a copy of the given {@link ColumnDateRange}. + * + * @return A {@link ColumnDateRange} which has a start and end field. + */ + ColumnDateRange toDualColumn(ColumnDateRange columnDateRange); + /** * Aggregates the start and end columns of the validity date of entries into one compound string expression. *

diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/ColumnDateRange.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/ColumnDateRange.java index 55013f2efa..d355e08d1a 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/ColumnDateRange.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/ColumnDateRange.java @@ -9,6 +9,10 @@ import org.jooq.Field; import org.jooq.impl.DSL; +/** + * This class represents a date range in SQL. It supports both single-column and two-column representations of date ranges. + * It provides functionality to handle date ranges as either a single field (e.g., in Postgres) or as separate start and end fields (e.g., in HANA). + */ @Getter public class ColumnDateRange { diff --git a/backend/src/main/java/com/bakdata/conquery/util/TablePrimaryColumnUtil.java b/backend/src/main/java/com/bakdata/conquery/util/TablePrimaryColumnUtil.java new file mode 100644 index 0000000000..491bc04338 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/util/TablePrimaryColumnUtil.java @@ -0,0 +1,17 @@ +package com.bakdata.conquery.util; + +import com.bakdata.conquery.models.config.SqlConnectorConfig; +import com.bakdata.conquery.models.datasets.Table; +import org.jooq.Field; +import org.jooq.impl.DSL; + +public class TablePrimaryColumnUtil { + + public static Field findPrimaryColumn(Table table, SqlConnectorConfig sqlConfig) { + String primaryColumnName = table.getPrimaryColum() == null + ? sqlConfig.getPrimaryColumn() + : table.getPrimaryColum().getName(); + return DSL.field(DSL.name(table.getName(), primaryColumnName)); + } + +} diff --git a/backend/src/test/java/com/bakdata/conquery/integration/common/RequiredTable.java b/backend/src/test/java/com/bakdata/conquery/integration/common/RequiredTable.java index 074d7fcbfd..d969f218a0 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/common/RequiredTable.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/common/RequiredTable.java @@ -1,5 +1,13 @@ package com.bakdata.conquery.integration.common; +import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; + +import javax.validation.Valid; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + import com.bakdata.conquery.integration.IntegrationTest; import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.models.datasets.Column; @@ -13,13 +21,6 @@ import lombok.Setter; import net.minidev.json.annotate.JsonIgnore; -import javax.validation.Valid; -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; -import java.io.IOException; -import java.util.Arrays; -import java.util.Objects; - @Getter @Setter public class RequiredTable { @@ -40,6 +41,7 @@ public class RequiredTable { public Table toTable(Dataset dataset, CentralRegistry centralRegistry) { Table table = new Table(); + table.setPrimaryColum(primaryColumn.toColumn(table, centralRegistry)); table.setDataset(dataset); table.setName(name); table.setColumns(Arrays.stream(columns) diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java index fbaa0d43ef..05fbeca4f9 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/MetadataCollectionTest.java @@ -3,16 +3,18 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.LocalDate; +import java.util.Collection; +import java.util.Set; import com.bakdata.conquery.integration.IntegrationTest; import com.bakdata.conquery.integration.json.ConqueryTestSpec; import com.bakdata.conquery.integration.json.JsonIntegrationTest; import com.bakdata.conquery.models.common.daterange.CDateRange; +import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild; import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; import com.bakdata.conquery.models.exceptions.ValidatorHelper; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.messages.namespaces.specific.UpdateMatchingStatsMessage; -import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.util.support.StandaloneSupport; import com.github.powerlibraries.io.In; import lombok.extern.slf4j.Slf4j; @@ -20,38 +22,55 @@ @Slf4j public class MetadataCollectionTest extends IntegrationTest.Simple implements ProgrammaticIntegrationTest { + @Override + public Set forModes() { + return Set.of(StandaloneSupport.Mode.WORKER, StandaloneSupport.Mode.SQL); + } + @Override public void execute(StandaloneSupport conquery) throws Exception { //read test sepcification - String testJson = In.resource("/tests/query/SIMPLE_TREECONCEPT_QUERY/SIMPLE_TREECONCEPT_Query.test.json").withUTF8().readAll(); + String testJson = In.resource("/tests/matchingstats/icd.concept.json").withUTF8().readAll(); DatasetId dataset = conquery.getDataset().getId(); - ConqueryTestSpec test = JsonIntegrationTest.readJson(dataset, testJson); ValidatorHelper.failOnError(log, conquery.getValidator().validate(test)); - test.importRequiredData(conquery); //ensure the metadata is collected - DistributedNamespace namespace = (DistributedNamespace) conquery.getNamespace(); - namespace.getWorkerHandler().sendToAll(new UpdateMatchingStatsMessage(conquery.getNamespace().getStorage().getAllConcepts())); - + conquery.getDatasetsProcessor() + .getStorageListener() + .onUpdateMatchingStats(conquery.getDataset()); conquery.waitUntilWorkDone(); - TreeConcept concept = (TreeConcept) conquery.getNamespace().getStorage().getAllConcepts().iterator().next(); + Collection> allConcepts = conquery.getNamespace().getStorage().getAllConcepts(); + TreeConcept concept = (TreeConcept) allConcepts.iterator().next(); - //check the number of matched events - assertThat(concept.getMatchingStats().countEvents()).isEqualTo(4); - assertThat(concept.getChildren()).allSatisfy(c -> { - assertThat(c.getMatchingStats().countEvents()).isEqualTo(2); + //check the number of matched events from root node to the deepest child node + assertThat(concept.getMatchingStats().countEvents()).isEqualTo(10); + assertThat(concept.getMatchingStats().countEntities()).isEqualTo(3); + // concepts 1. child (F00-F99) + ConceptTreeChild f00_99 = concept.getChildren().get(0); + assertThat(f00_99.getMatchingStats().countEvents()).isEqualTo(8); + assertThat(f00_99.getMatchingStats().countEntities()).isEqualTo(3); + // 1. child's child (F20-29) + ConceptTreeChild f20_29 = f00_99.getChildren().get(0); + assertThat(f20_29.getMatchingStats().countEvents()).isEqualTo(7); + assertThat(f20_29.getMatchingStats().countEntities()).isEqualTo(2); + // 1. child's child's child (yeah it's getting wild) + ConceptTreeChild f20 = f20_29.getChildren().get(0); + assertThat(f20.getMatchingStats().countEvents()).isEqualTo(5); + assertThat(f20.getMatchingStats().countEntities()).isEqualTo(1); + // 1. child's child's child's children (I promise it won't get worse) + assertThat(f20.getChildren()).allSatisfy(child -> { + assertThat(child.getMatchingStats().countEvents()).isEqualTo(1); + assertThat(child.getMatchingStats().countEntities()).isEqualTo(1); }); - + //check the date ranges assertThat(concept.getMatchingStats().spanEvents()) - .isEqualTo(CDateRange.of(LocalDate.parse("2010-07-15"), LocalDate.parse("2013-11-10"))); - assertThat(concept.getChildren().get(0).getMatchingStats().spanEvents()) - .isEqualTo(CDateRange.of(LocalDate.parse("2012-01-01"), LocalDate.parse("2013-11-10"))); - assertThat(concept.getChildren().get(1).getMatchingStats().spanEvents()) - .isEqualTo(CDateRange.of(LocalDate.parse("2010-07-15"), LocalDate.parse("2012-11-11"))); + .isEqualTo(CDateRange.of(LocalDate.parse("2009-05-18"), LocalDate.parse("2023-08-20"))); + assertThat(f20.getMatchingStats().spanEvents()) + .isEqualTo(CDateRange.of(LocalDate.parse("2010-07-01"), LocalDate.parse("2023-02-18"))); } } diff --git a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/MatchingStatsTests.java b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/WorkerMatchingStatsTests.java similarity index 80% rename from backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/MatchingStatsTests.java rename to backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/WorkerMatchingStatsTests.java index 0caa9fc8e8..14a4a21259 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/MatchingStatsTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/WorkerMatchingStatsTests.java @@ -2,18 +2,15 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.bakdata.conquery.mode.cluster.WorkerMatchingStats; import com.bakdata.conquery.models.common.daterange.CDateRange; import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Table; -import com.bakdata.conquery.models.datasets.concepts.MatchingStats; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import org.junit.jupiter.api.Test; -import java.util.HashMap; -import java.util.Map; - -public class MatchingStatsTests { +public class WorkerMatchingStatsTests { private final WorkerId workerId1 = new WorkerId(new DatasetId("sampleDataset"), "sampleWorker"); private final WorkerId workerId2 = new WorkerId(new DatasetId("sampleDataset2"), "sampleWorker2"); @@ -21,17 +18,17 @@ public class MatchingStatsTests { @Test public void entitiesCountTest() { - MatchingStats stats = new MatchingStats(); + WorkerMatchingStats stats = new WorkerMatchingStats(); assertThat(stats.countEntities()).isEqualTo(0); - stats.putEntry(workerId1, new MatchingStats.Entry(5, 5, CDateRange.of(10, 20))); + stats.putEntry(workerId1, new WorkerMatchingStats.Entry(5, 5, CDateRange.of(10, 20))); assertThat(stats.countEntities()).isEqualTo(5); - stats.putEntry(workerId1, new MatchingStats.Entry(5, 8, CDateRange.of(10, 20))); + stats.putEntry(workerId1, new WorkerMatchingStats.Entry(5, 8, CDateRange.of(10, 20))); assertThat(stats.countEntities()).isEqualTo(8); - stats.putEntry(workerId2, new MatchingStats.Entry(5, 2, CDateRange.of(10, 20))); + stats.putEntry(workerId2, new WorkerMatchingStats.Entry(5, 2, CDateRange.of(10, 20))); assertThat(stats.countEntities()).isEqualTo(10); @@ -39,7 +36,7 @@ public void entitiesCountTest() { @Test public void addEventTest(){ - MatchingStats stats = new MatchingStats(); + WorkerMatchingStats stats = new WorkerMatchingStats(); Table table = new Table(); table.setColumns(new Column[0]); @@ -47,7 +44,7 @@ public void addEventTest(){ assertThat(stats.countEntities()).isEqualTo(0); - MatchingStats.Entry entry1 = new MatchingStats.Entry(); + WorkerMatchingStats.Entry entry1 = new WorkerMatchingStats.Entry(); entry1.addEvent(table, null, 1, 1); entry1.addEvent(table, null, 2, 1); @@ -67,7 +64,7 @@ public void addEventTest(){ assertThat(stats.countEntities()).isEqualTo(4); - MatchingStats.Entry entry2 = new MatchingStats.Entry(); + WorkerMatchingStats.Entry entry2 = new WorkerMatchingStats.Entry(); entry2.addEvent(table, null, 1, 1); entry2.addEvent(table, null, 2, 2); diff --git a/backend/src/test/resources/tests/matchingstats/icd.concept.json b/backend/src/test/resources/tests/matchingstats/icd.concept.json new file mode 100644 index 0000000000..6f4ac312e3 --- /dev/null +++ b/backend/src/test/resources/tests/matchingstats/icd.concept.json @@ -0,0 +1,210 @@ +{ + "type": "QUERY_TEST", + "label": "COMMON_CONCEPT_ICD_QUERY Test", + "expectedCsv": "", + "query": { + "type": "CONCEPT_QUERY", + "root": { + "type": "AND", + "children": [ + { + "type": "DATE_RESTRICTION", + "dateRange": { + "min": "2017-01-01", + "max": "2017-12-31" + }, + "child": { + "type": "CONCEPT", + "ids": [ + "icd.f00$2df99.f20$2df29.f20" + ], + "label": "F20", + "tables": [ + { + "id": "icd.kh_diagnose_icd_code", + "filters": [] + } + ] + } + } + ] + } + }, + "concepts": [ + { + "label": "ICD", + "type": "TREE", + "additionalInfos": [ + { + "key": "ICD-Codes", + "value": "Historisierung bis einschließlich des Jahres 2018" + } + ], + "connectors": [ + { + "label": "KH-Diagnose", + "name": "kh_diagnose_icd_code", + "column": "kh_diagnose.icd_code", + "condition": { + "type": "PREFIX_RANGE", + "min": "E", + "max": "F" + }, + "validityDates": [ + { + "label": "Aufnahmedatum", + "column": "kh_diagnose.aufnahmedatum" + }, + { + "label": "Entlassungsdatum", + "column": "kh_diagnose.entlassungsdatum" + } + ], + "filters": [] + } + ], + "children": [ + { + "label": "F00-F99", + "description": "Psychische und Verhaltensstörungen", + "condition": { + "type": "PREFIX_RANGE", + "min": "F00", + "max": "F99" + }, + "children": [ + { + "label": "F20-F29", + "description": "Schizophrenie, schizotype und wahnhafte Störungen", + "condition": { + "type": "PREFIX_RANGE", + "min": "F20", + "max": "F29" + }, + "children": [ + { + "label": "F20", + "description": "Schizophrenie", + "condition": { + "type": "PREFIX_LIST", + "prefixes": [ + "F20" + ] + }, + "children": [ + { + "label": "F20.0", + "description": "Paranoide Schizophrenie", + "additionalInfos": [ + { + "key": "Stichworte", + "value": "Paranoide Schizophrenie -- Paranoid-halluzinatorische Schizophrenie -- Paranoide Schizophrenie mit Halluzination -- Paraphrenie -- Paranoid-schizophrene Psychose -- Akute Paraphrenie -- Paraphrene Schizophrenie -- Akute paranoide Schizophrenie" + } + ], + "condition": { + "type": "PREFIX_LIST", + "prefixes": [ + "F200" + ] + } + }, + { + "label": "F20.1", + "description": "Hebephrene Schizophrenie", + "additionalInfos": [ + { + "key": "Stichworte", + "value": "Hebephrenie -- Hebephrene Schizophrenie -- Akute Hebephrenie -- Hebephrene Demenz -- Hebephrene Dementia praecox -- Desintegrative Schizophrenie -- Desorganisierte Schizophrenie -- Jugendirresein" + } + ], + "condition": { + "type": "PREFIX_LIST", + "prefixes": [ + "F201" + ] + } + }, + { + "label": "F20.4", + "description": "Postschizophrene Depression", + "additionalInfos": [ + { + "key": "Stichworte", + "value": "Postschizophrene Depression" + } + ], + "condition": { + "type": "PREFIX_LIST", + "prefixes": [ + "F204" + ] + } + }, + { + "label": "F20.5", + "description": "Schizophrenes Residuum", + "additionalInfos": [ + { + "key": "Stichworte", + "value": "Schizophrenes Residuum -- Schizophrener Restzustand -- Chronischer Morbus Bleuler -- Schizophrener Defekt -- Chronische Schizophrenie a.n.k. -- Residuale Schizophrenie -- Schizophrener Residualzustand -- Chronische undifferenzierte Schizophrenie" + } + ], + "condition": { + "type": "PREFIX_LIST", + "prefixes": [ + "F205" + ] + } + }, + { + "label": "F20.6", + "description": "Schizophrenia simplex", + "additionalInfos": [ + { + "key": "Stichworte", + "value": "Schizophrenia simplex -- Akute primäre Schizophrenie -- Akute einfache Schizophrenie" + } + ], + "condition": { + "type": "PREFIX_LIST", + "prefixes": [ + "F206" + ] + } + } + ] + } + ] + } + ] + } + ] + } + ], + "content": { + "tables": [ + { + "csv": "tests/matchingstats/kh-content.csv", + "name": "kh_diagnose", + "primaryColumn": { + "name": "primary_column", + "type": "STRING" + }, + "columns": [ + { + "name": "icd_code", + "type": "STRING" + }, + { + "name": "aufnahmedatum", + "type": "DATE" + }, + { + "name": "entlassungsdatum", + "type": "DATE" + } + ] + } + ] + } +} diff --git a/backend/src/test/resources/tests/matchingstats/kh-content.csv b/backend/src/test/resources/tests/matchingstats/kh-content.csv new file mode 100644 index 0000000000..1838f3f3c6 --- /dev/null +++ b/backend/src/test/resources/tests/matchingstats/kh-content.csv @@ -0,0 +1,11 @@ +pid,icd_code,aufnahmedatum,entlassungsdatum +3,"F200",2022-11-28,2022-11-11 +3,"F201",2021-08-31,2021-12-15 +3,"F204",2010-07-01,2019-07-13 +3,"F205",2023-02-06,2023-02-18 +3,"F206",2021-10-22,2021-11-06 +10,"F21",2014-04-18,2022-06-29 +10,"F22",2016-12-15,2018-11-28 +15,"F3",2017-12-08,2019-09-23 +15,"F31",2022-03-22,2023-08-20 +15,"E66",2009-05-18,2021-11-06