diff --git a/backend/src/main/java/com/bakdata/conquery/Conquery.java b/backend/src/main/java/com/bakdata/conquery/Conquery.java index 313eab5f96..6051d23a9c 100644 --- a/backend/src/main/java/com/bakdata/conquery/Conquery.java +++ b/backend/src/main/java/com/bakdata/conquery/Conquery.java @@ -1,12 +1,9 @@ package com.bakdata.conquery; +import jakarta.validation.Validator; + import ch.qos.logback.classic.Level; -import com.bakdata.conquery.commands.DistributedStandaloneCommand; -import com.bakdata.conquery.commands.ManagerNode; -import com.bakdata.conquery.commands.MigrateCommand; -import com.bakdata.conquery.commands.PreprocessorCommand; -import com.bakdata.conquery.commands.RecodeStoreCommand; -import com.bakdata.conquery.commands.ShardNode; +import com.bakdata.conquery.commands.*; import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.MutableInjectableValues; import com.bakdata.conquery.metrics.prometheus.PrometheusBundle; @@ -22,7 +19,6 @@ import io.dropwizard.core.ConfiguredBundle; import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; -import jakarta.validation.Validator; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -51,7 +47,7 @@ public void initialize(Bootstrap bootstrap) { // main config file is json bootstrap.setConfigurationFactoryFactory(JsonConfigurationFactory::new); - bootstrap.addCommand(new ShardNode()); + bootstrap.addCommand(new ShardCommand()); bootstrap.addCommand(new PreprocessorCommand()); bootstrap.addCommand(new DistributedStandaloneCommand(this)); bootstrap.addCommand(new RecodeStoreCommand()); diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java index 52f3294e08..92d2b9b463 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java @@ -39,6 +39,7 @@ import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; import com.bakdata.conquery.apiv1.query.concept.specific.CQOr; import com.bakdata.conquery.apiv1.query.concept.specific.external.CQExternal; +import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolver; import com.bakdata.conquery.io.result.ResultRender.ResultRendererProvider; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.metrics.ExecutionMetrics; @@ -108,7 +109,6 @@ public class QueryProcessor { private Validator validator; - public Stream getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) { final Collection allQueries = storage.getAllExecutions(); @@ -294,14 +294,13 @@ public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject su public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, ExternalUpload upload) { final Namespace namespace = datasetRegistry.get(dataset.getId()); - final CQExternal.ResolveStatistic statistic = CQExternal.resolveEntities( + final EntityResolver.ResolveStatistic statistic = namespace.getEntityResolver().resolveEntities( upload.getValues(), upload.getFormat(), namespace.getStorage().getIdMapping(), config.getIdColumns(), config.getLocale().getDateReader(), - upload.isOneRowPerEntity(), - true + upload.isOneRowPerEntity() ); // Resolving nothing is a problem thus we fail. @@ -526,9 +525,8 @@ public Stream> resolveEntities(Subject subject, List ids = config.getIdColumns() .getIds().stream() - // We're only interested in returning printable AND resolvable ids + // We're only interested in returning printable ids .filter(ColumnConfig::isPrint) - .filter(ColumnConfig::isResolvable) .collect(Collectors.toList()); diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java index b698e7b26a..95f9e1c94b 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/frontend/FrontendSelect.java @@ -1,7 +1,6 @@ package com.bakdata.conquery.apiv1.frontend; import com.bakdata.conquery.models.identifiable.ids.specific.SelectId; -import com.bakdata.conquery.models.types.ResultType; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Builder; import lombok.Data; @@ -15,7 +14,7 @@ public class FrontendSelect { private SelectId id; private String label; private String description; - private ResultType resultType; + private String resultType; @JsonProperty("default") private Boolean isDefault; } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/CQExternal.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/CQExternal.java index 20dbbdd034..2ba0d02a0b 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/CQExternal.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/CQExternal.java @@ -3,7 +3,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -11,7 +10,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import com.bakdata.conquery.apiv1.query.CQElement; @@ -21,7 +19,6 @@ import com.bakdata.conquery.models.config.IdColumnConfig; import com.bakdata.conquery.models.error.ConqueryError; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; -import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; import com.bakdata.conquery.models.query.QueryExecutionContext; import com.bakdata.conquery.models.query.QueryPlanContext; import com.bakdata.conquery.models.query.QueryResolveContext; @@ -33,16 +30,12 @@ import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.resultinfo.SimpleResultInfo; import com.bakdata.conquery.models.types.ResultType; -import com.bakdata.conquery.util.DateReader; -import com.bakdata.conquery.util.io.IdColumnUtil; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonView; import com.google.common.collect.Streams; import io.dropwizard.validation.ValidationMethod; import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; import lombok.AccessLevel; -import lombok.Data; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -55,8 +48,6 @@ @NoArgsConstructor public class CQExternal extends CQElement { - private static final String FORMAT_EXTRA = "EXTRA"; - /** * Describes the format of {@code values}, how to extract data from each row: *

@@ -118,7 +109,7 @@ public QPNode createQueryPlan(QueryPlanContext context, ConceptQueryPlan plan) { final String[] extraHeaders = Streams.zip( Arrays.stream(headers), format.stream(), - (header, format) -> format.equals(FORMAT_EXTRA) ? header : null + (header, format) -> format.equals(EntityResolverUtil.FORMAT_EXTRA) ? header : null ) .filter(Objects::nonNull) .toArray(String[]::new); @@ -138,17 +129,17 @@ public void collectRequiredQueries(Set requiredQueries) { private ExternalNode createExternalNodeOnlySingle(QueryPlanContext context, ConceptQueryPlan plan, String[] extraHeaders) { // Remove zero element Lists and substitute one element Lists by containing String final Map> extraFlat = extra.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entityToRowMap -> entityToRowMap.getValue().entrySet().stream() - .filter(headerToValue -> !headerToValue.getValue() - .isEmpty()) - .collect(Collectors.toMap( - Map.Entry::getKey, - headerToValue -> headerToValue.getValue() - .get(0) - )) - )); + .collect(Collectors.toMap( + Map.Entry::getKey, + entityToRowMap -> entityToRowMap.getValue().entrySet().stream() + .filter(headerToValue -> !headerToValue.getValue() + .isEmpty()) + .collect(Collectors.toMap( + Map.Entry::getKey, + headerToValue -> headerToValue.getValue() + .get(0) + )) + )); final Map> extraAggregators = new HashMap<>(extraHeaders.length); for (String extraHeader : extraHeaders) { @@ -180,78 +171,18 @@ private ExternalNode> createExternalNodeForList(QueryPlanContext co ); } - /** - * For each row try and collect all dates. - * - * @return Row -> Dates - */ - private static CDateSet[] readDates(String[][] values, List format, DateReader dateReader) { - final CDateSet[] out = new CDateSet[values.length]; - - final List dateFormats = format.stream() - .map(CQExternal::resolveDateFormat) - // Don't use Stream#toList to preserve null-values - .collect(Collectors.toList()); - - - /* - If no format is provided, put empty dates into output. - This indicates that no date context was provided and - the entries are not restricted by any date restriction, - but can also don't contribute to any date aggregation. - */ - if (dateFormats.stream().allMatch(Objects::isNull)) { - // Initialize empty - for (int row = 0; row < values.length; row++) { - out[row] = CDateSet.createEmpty(); - } - return out; - } - - for (int row = 1; row < values.length; row++) { - try { - final CDateSet dates = CDateSet.createEmpty(); - - // Collect all specified dates into a single set. - for (int col = 0; col < dateFormats.size(); col++) { - final DateFormat dateFormat = dateFormats.get(col); - - if (dateFormat == null) { - continue; - } - dateFormat.readDates(values[row][col], dateReader, dates); - } - - if (dates.isEmpty()) { - continue; - } - - if (out[row] == null) { - out[row] = CDateSet.createEmpty(); - } - - out[row].addAll(dates); - } - catch (Exception e) { - log.warn("Failed to parse Date from {}", row, e); - } - } - - return out; - } - @Override public void resolve(QueryResolveContext context) { headers = values[0]; - final ResolveStatistic resolved = - resolveEntities(values, format, - context.getNamespace().getStorage().getIdMapping(), - context.getConfig().getIdColumns(), - context.getConfig().getLocale().getDateReader(), - onlySingles, - context.getConfig().getSqlConnectorConfig().isEnabled() - ); + final EntityResolver.ResolveStatistic resolved = context.getNamespace().getEntityResolver().resolveEntities( + values, + format, + context.getNamespace().getStorage().getIdMapping(), + context.getConfig().getIdColumns(), + context.getConfig().getLocale().getDateReader(), + onlySingles + ); if (resolved.getResolved().isEmpty()) { throw new ConqueryError.ExternalResolveEmptyError(); @@ -277,158 +208,6 @@ public void resolve(QueryResolveContext context) { extra = resolved.getExtra(); } - @Data - public static class ResolveStatistic { - - @JsonIgnore - private final Map resolved; - - /** - * Entity -> Column -> Values - */ - @JsonIgnore - private final Map>> extra; - - private final List unreadableDate; - private final List unresolvedId; - - } - - /** - * Helper method to try and resolve entities in values using the specified format. - */ - public static ResolveStatistic resolveEntities(@NotEmpty String[][] values, @NotEmpty List format, EntityIdMap mapping, IdColumnConfig idColumnConfig, @NotNull DateReader dateReader, boolean onlySingles, boolean isInSqlMode) { - final Map resolved = new HashMap<>(); - - final List unresolvedDate = new ArrayList<>(); - final List unresolvedId = new ArrayList<>(); - - // extract dates from rows - final CDateSet[] rowDates = readDates(values, format, dateReader); - - // Extract extra data from rows by Row, to be collected into by entities - // Row -> Column -> Value - final Map[] extraDataByRow = readExtras(values, format); - - final List> readers = IdColumnUtil.getIdReaders(format, idColumnConfig.getIdMappers()); - - // We will not be able to resolve anything... - if (readers.isEmpty()) { - return new ResolveStatistic(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), List.of(values)); - } - - // Entity -> Column -> Values - final Map>> extraDataByEntity = new HashMap<>(); - - // ignore the first row, because this is the header - for (int rowNum = 1; rowNum < values.length; rowNum++) { - - final String[] row = values[rowNum]; - - if (rowDates[rowNum] == null) { - unresolvedDate.add(row); - continue; - } - - // TODO proper implementation of EntityIdMap#resolve for SQL mode - String resolvedId = isInSqlMode - ? String.valueOf(row[0]) - : tryResolveId(row, readers, mapping); - - if (resolvedId == null) { - unresolvedId.add(row); - continue; - } - - //read the dates from the row - resolved.put(resolvedId, rowDates[rowNum]); - - // Entity was resolved for row so we collect the data. - if (extraDataByRow[rowNum] != null) { - - for (Map.Entry entry : extraDataByRow[rowNum].entrySet()) { - extraDataByEntity.computeIfAbsent(resolvedId, (ignored) -> new HashMap<>()) - .computeIfAbsent(entry.getKey(), (ignored) -> new ArrayList<>()) - .add(entry.getValue()); - } - } - } - - if (onlySingles) { - // Check that there is at most one value per entity and per column - final boolean alright = extraDataByEntity.values().stream() - .map(Map::values) - .flatMap(Collection::stream) - .allMatch(l -> l.size() <= 1); - if (!alright) { - throw new ConqueryError.ExternalResolveOnePerRowError(); - } - } - - return new ResolveStatistic(resolved, extraDataByEntity, unresolvedDate, unresolvedId); - } - - /** - * Try to extract a {@link com.bakdata.conquery.models.identifiable.mapping.EntityIdMap.ExternalId} from the row, - * then try to map it to an internal {@link com.bakdata.conquery.models.query.entity.Entity} - */ - private static String tryResolveId(String[] row, List> readers, EntityIdMap mapping) { - String resolvedId = null; - - for (Function reader : readers) { - final EntityIdMap.ExternalId externalId = reader.apply(row); - - if (externalId == null) { - continue; - } - - String innerResolved = mapping.resolve(externalId); - - if (innerResolved == null) { - continue; - } - - // Only if all resolvable ids agree on the same entity, do we return the id. - if (resolvedId != null && !innerResolved.equals(resolvedId)) { - log.error("`{}` maps to different Entities", (Object) row); - continue; - } - - resolvedId = innerResolved; - } - return resolvedId; - } - - /** - * Try and extract Extra data from input to be returned as extra-data in output. - *

- * Line -> ( Column -> Value ) - */ - private static Map[] readExtras(String[][] values, List format) { - final String[] names = values[0]; - final Map[] extrasByRow = new Map[values.length]; - - - for (int line = 1; line < values.length; line++) { - for (int col = 0; col < format.size(); col++) { - if (!format.get(col).equals(FORMAT_EXTRA)) { - continue; - } - - - if (extrasByRow[line] == null) { - extrasByRow[line] = new HashMap<>(names.length); - } - - extrasByRow[line].put(names[col], values[line][col]); - } - } - - - return extrasByRow; - } - - @Override public RequiredEntities collectRequiredEntities(QueryExecutionContext context) { return new RequiredEntities(valuesResolved.keySet()); @@ -441,7 +220,7 @@ public List getResultInfos() { } List resultInfos = new ArrayList<>(); for (int col = 0; col < format.size(); col++) { - if (!format.get(col).equals(FORMAT_EXTRA)) { + if (!format.get(col).equals(EntityResolverUtil.FORMAT_EXTRA)) { continue; } @@ -484,15 +263,4 @@ public boolean isHeadersUnique() { return false; } - /** - * Try to resolve a date format, return nothing if not possible. - */ - private static DateFormat resolveDateFormat(String name) { - try { - return DateFormat.valueOf(name); - } - catch (IllegalArgumentException e) { - return null; // Does not exist - } - } } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolver.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolver.java new file mode 100644 index 0000000000..c93f1e3375 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolver.java @@ -0,0 +1,51 @@ +package com.bakdata.conquery.apiv1.query.concept.specific.external; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.bakdata.conquery.models.common.CDateSet; +import com.bakdata.conquery.models.config.IdColumnConfig; +import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; +import com.bakdata.conquery.util.DateReader; +import com.fasterxml.jackson.annotation.JsonIgnore; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +public interface EntityResolver { + + /** + * Helper method to try and resolve entities in values using the specified format. + */ + ResolveStatistic resolveEntities( + @NotEmpty String[][] values, + @NotEmpty List format, + EntityIdMap mapping, + IdColumnConfig idColumnConfig, + @NotNull DateReader dateReader, + boolean onlySingles + ); + + @Data + class ResolveStatistic { + + @JsonIgnore + private final Map resolved; + + /** + * Entity -> Column -> Values + */ + @JsonIgnore + private final Map>> extra; + + private final List unreadableDate; + private final List unresolvedId; + + public static ResolveStatistic forEmptyReaders(String[][] values) { + return new ResolveStatistic(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), List.of(values)); + } + + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolverUtil.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolverUtil.java new file mode 100644 index 0000000000..3a9be46a25 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolverUtil.java @@ -0,0 +1,179 @@ +package com.bakdata.conquery.apiv1.query.concept.specific.external; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.bakdata.conquery.models.common.CDateSet; +import com.bakdata.conquery.models.error.ConqueryError; +import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; +import com.bakdata.conquery.models.identifiable.mapping.ExternalId; +import com.bakdata.conquery.util.DateReader; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EntityResolverUtil { + + public static final String FORMAT_EXTRA = "EXTRA"; + + /** + * For each row try and collect all dates. + * + * @return Row -> Dates + */ + public static CDateSet[] readDates(String[][] values, List format, DateReader dateReader) { + final CDateSet[] out = new CDateSet[values.length]; + + final List dateFormats = format.stream() + .map(EntityResolverUtil::resolveDateFormat) + // Don't use Stream#toList to preserve null-values + .collect(Collectors.toList()); + + /* + If no format is provided, put empty dates into output. + This indicates that no date context was provided and + the entries are not restricted by any date restriction, + but can also don't contribute to any date aggregation. + */ + if (dateFormats.stream().allMatch(Objects::isNull)) { + // Initialize empty + for (int row = 0; row < values.length; row++) { + out[row] = CDateSet.createEmpty(); + } + return out; + } + + for (int row = 1; row < values.length; row++) { + try { + final CDateSet dates = CDateSet.createEmpty(); + + // Collect all specified dates into a single set. + for (int col = 0; col < dateFormats.size(); col++) { + final DateFormat dateFormat = dateFormats.get(col); + + if (dateFormat == null) { + continue; + } + dateFormat.readDates(values[row][col], dateReader, dates); + } + + if (dates.isEmpty()) { + continue; + } + + if (out[row] == null) { + out[row] = CDateSet.createEmpty(); + } + + out[row].addAll(dates); + } + catch (Exception e) { + log.warn("Failed to parse Date from {}", row, e); + } + } + + return out; + } + + public static void collectExtraData(Map[] extraDataByRow, int rowNum, Map>> extraDataByEntity, String resolvedId) { + if (extraDataByRow[rowNum] != null) { + for (Map.Entry entry : extraDataByRow[rowNum].entrySet()) { + extraDataByEntity.computeIfAbsent(resolvedId, (ignored) -> new HashMap<>()) + .computeIfAbsent(entry.getKey(), (ignored) -> new ArrayList<>()) + .add(entry.getValue()); + } + } + } + + public static void verifyOnlySingles(boolean onlySingles, Map>> extraDataByEntity) { + if (!onlySingles) { + return; + } + // Check that there is at most one value per entity and per column + final boolean alright = extraDataByEntity.values().stream() + .map(Map::values) + .flatMap(Collection::stream) + .allMatch(l -> l.size() <= 1); + if (!alright) { + throw new ConqueryError.ExternalResolveOnePerRowError(); + } + } + + /** + * Try to extract a {@link ExternalId} from the row, + * then try to map it to an internal {@link com.bakdata.conquery.models.query.entity.Entity} + */ + public static String tryResolveId(String[] row, List> readers, EntityIdMap mapping) { + String resolvedId = null; + + for (Function reader : readers) { + final ExternalId externalId = reader.apply(row); + + if (externalId == null) { + continue; + } + + String innerResolved = mapping.resolve(externalId); + + if (innerResolved == null) { + continue; + } + + // Only if all resolvable ids agree on the same entity, do we return the id. + if (resolvedId != null && !innerResolved.equals(resolvedId)) { + log.error("`{}` maps to different Entities", (Object) row); + continue; + } + + resolvedId = innerResolved; + } + return resolvedId; + } + + /** + * Try and extract Extra data from input to be returned as extra-data in output. + *

+ * Line -> ( Column -> Value ) + */ + public static Map[] readExtras(String[][] values, List format) { + final String[] names = values[0]; + final Map[] extrasByRow = new Map[values.length]; + + + for (int line = 1; line < values.length; line++) { + for (int col = 0; col < format.size(); col++) { + if (!format.get(col).equals(FORMAT_EXTRA)) { + continue; + } + + + if (extrasByRow[line] == null) { + extrasByRow[line] = new HashMap<>(names.length); + } + + extrasByRow[line].put(names[col], values[line][col]); + } + } + + + return extrasByRow; + } + + /** + * Try to resolve a date format, return nothing if not possible. + */ + private static DateFormat resolveDateFormat(String name) { + try { + return DateFormat.valueOf(name); + } + catch (IllegalArgumentException e) { + return null; // Does not exist + } + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/commands/CollectEntitiesCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/CollectEntitiesCommand.java deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java index 1a9a0ab165..8e368fc189 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java @@ -26,7 +26,7 @@ public class DistributedStandaloneCommand extends ServerCommand private final Conquery conquery; private ClusterManager manager; - private ManagerNode managerNode = new ManagerNode(); + private final ManagerNode managerNode = new ManagerNode(); private final List shardNodes = new Vector<>(); // TODO clean up the command structure, so we can use the Environment from EnvironmentCommand @@ -104,7 +104,7 @@ public void startStandalone(Environment environment, Namespace namespace, Conque clone = config.withStorage(((XodusStoreFactory) config.getStorage()).withDirectory(managerDir)); } - sc.run(environment, namespace, clone); + sc.run(clone, environment); return sc; })); } diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java index 004048186d..d1c5686040 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ManagerNode.java @@ -46,7 +46,7 @@ import org.glassfish.jersey.internal.inject.AbstractBinder; /** - * Central node of Conquery. Hosts the frontend, api, meta data and takes care of query distribution to + * Central node of Conquery. Hosts the frontend, api, metadata and takes care of query distribution to * {@link ShardNode}s and respectively the {@link Worker}s hosted on them. The {@link ManagerNode} can also * forward queries or results to statistic backends. Finally, it collects the results of queries for access over the api. */ @@ -113,7 +113,7 @@ public void run(Manager manager) throws InterruptedException { // Create AdminServlet first to make it available to the realms admin = new AdminServlet(this); - authController = new AuthorizationController(getStorage(), config, environment, admin); + authController = new AuthorizationController(getMetaStorage(), config, environment, admin); environment.lifecycle().manage(authController); // Register default components for the admin interface @@ -145,14 +145,14 @@ public void run(Manager manager) throws InterruptedException { private void registerTasks(Manager manager, Environment environment, ConqueryConfig config) { environment.admin().addTask(formScanner); environment.admin().addTask( - new QueryCleanupTask(getStorage(), Duration.of( + new QueryCleanupTask(getMetaStorage(), Duration.of( config.getQueries().getOldQueriesTime().getQuantity(), config.getQueries().getOldQueriesTime().getUnit().toChronoUnit() ))); - environment.admin().addTask(new PermissionCleanupTask(getStorage())); + environment.admin().addTask(new PermissionCleanupTask(getMetaStorage())); manager.getAdminTasks().forEach(environment.admin()::addTask); - environment.admin().addTask(new ReloadMetaStorageTask(getStorage())); + environment.admin().addTask(new ReloadMetaStorageTask(getMetaStorage())); final ShutdownTask shutdown = new ShutdownTask(); environment.admin().addTask(shutdown); @@ -164,7 +164,7 @@ private void configureApiServlet(ConqueryConfig config, DropwizardResourceConfig jerseyConfig.register(new AbstractBinder() { @Override protected void configure() { - bind(getStorage()).to(MetaStorage.class); + bind(getMetaStorage()).to(MetaStorage.class); bind(getDatasetRegistry()).to(DatasetRegistry.class); } }); @@ -203,7 +203,7 @@ public void customizeApiObjectMapper(ObjectMapper objectMapper) { injectableValues.add(Validator.class, getValidator()); getDatasetRegistry().injectInto(objectMapper); - getStorage().injectInto(objectMapper); + getMetaStorage().injectInto(objectMapper); getConfig().injectInto(objectMapper); } @@ -219,10 +219,10 @@ public ObjectMapper createInternalObjectMapper(Class viewClass) private void loadMetaStorage() { log.info("Opening MetaStorage"); - getStorage().openStores(getInternalObjectMapperCreator().createInternalObjectMapper(View.Persistence.Manager.class)); + getMetaStorage().openStores(getInternalObjectMapperCreator().createInternalObjectMapper(View.Persistence.Manager.class)); log.info("Loading MetaStorage"); - getStorage().loadData(); - log.info("MetaStorage loaded {}", getStorage()); + getMetaStorage().loadData(); + log.info("MetaStorage loaded {}", getMetaStorage()); } @SneakyThrows(InterruptedException.class) @@ -236,7 +236,7 @@ public void loadNamespaces() { final Collection namespaceStorages = getConfig().getStorage().discoverNamespaceStorages(); for (NamespaceStorage namespaceStorage : namespaceStorages) { loaders.submit(() -> { - registry.createNamespace(namespaceStorage); + registry.createNamespace(namespaceStorage, getMetaStorage()); }); } @@ -262,16 +262,16 @@ public void stop() throws Exception { provider.close(); } catch (Exception e) { - log.error(provider + " could not be closed", e); + log.error("{} could not be closed", provider, e); } } try { - getStorage().close(); + getMetaStorage().close(); } catch (Exception e) { - log.error("{} could not be closed", getStorage(), e); + log.error("{} could not be closed", getMetaStorage(), e); } } diff --git a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java index ea5f91cd78..686e8200eb 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java @@ -44,6 +44,7 @@ import net.sourceforge.argparse4j.inf.ArgumentGroup; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; +import org.jetbrains.annotations.NotNull; @Slf4j @FieldNameConstants @@ -52,7 +53,7 @@ public class PreprocessorCommand extends ConqueryCommand { private final List failed = Collections.synchronizedList(new ArrayList<>()); private final List success = Collections.synchronizedList(new ArrayList<>()); private ExecutorService pool; - private boolean isFailFast = false; + private boolean isFailFast; private boolean isStrict = true; public PreprocessorCommand() { @@ -71,14 +72,14 @@ public static boolean requiresProcessing(PreprocessingJob preprocessingJob) { log.info("EXISTS ALREADY"); - int currentHash = preprocessingJob.getDescriptor() - .calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag()); + final int currentHash = preprocessingJob.getDescriptor() + .calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag()); final ObjectMapper om = Jackson.BINARY_MAPPER.copy(); try (final PreprocessedReader parser = new PreprocessedReader(new GZIPInputStream(new FileInputStream(preprocessingJob.getPreprocessedFile())), om)) { - PreprocessedHeader header = parser.readHeader(); + final PreprocessedHeader header = parser.readHeader(); if (header.getValidityHash() == currentHash) { log.info("\tHASH STILL VALID"); @@ -133,13 +134,18 @@ public void configure(Subparser subparser) { group.addArgument("--fast-fail") .action(Arguments.storeTrue()) .setDefault(false) - .help("Stop preprocessing and exit with failure if an error occures that prevents the generation of a cqpp."); + .help("Stop preprocessing and exit with failure if an error occurs that prevents the generation of a cqpp."); group.addArgument("--strict") .type(new BooleanArgumentType()) .setDefault(true) .help("Escalate missing files to errors."); + group.addArgument("--buckets") + .type(Integer.class) + .setDefault(100) + .help("Number of buckets to use for id-hashing. This value is required to be a constant per-dataset."); + } @Override @@ -150,41 +156,49 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig // Tag if present is appended to input-file csvs, output-file cqpp and used as id of cqpps + // Seems to be a bug with dropwizard and boolean default-values isFailFast = Optional.ofNullable(namespace.getBoolean("fast-fail")).orElse(false); - isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(true); + isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(false); - final List tags = namespace.getList("tag"); + final List tags = namespace.getList("tag"); final File inDir = namespace.get("in"); final File outDir = namespace.get("out"); - final List descriptionFiles = namespace.getList("desc"); + final List descriptionFilesRoot = namespace.getList("desc"); + final int buckets = namespace.getInt("buckets"); log.info("Preprocessing from command line config."); - final Collection jobs = new ArrayList<>(); + final Collection jobs = collectJobs(descriptionFilesRoot, tags, inDir, outDir, environment); - if (tags == null || tags.isEmpty()) { - for (File desc : descriptionFiles) { - final List descriptions = - findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator()); - jobs.addAll(descriptions); - } + final List broken = validateJobs(jobs, environment); + + jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing)); + + preprocessJobs(jobs, buckets, config); + + + log.info("Successfully Preprocess {} Jobs:", success.size()); + success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc)); + + if (!broken.isEmpty()) { + log.warn("Did not find {} Files", broken.size()); + broken.forEach(desc -> log.warn("\tDid not find file for {}", desc)); } - else { - for (String tag : tags) { - for (File desc : descriptionFiles) { - final List jobDescriptions = - findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator()); - jobs.addAll(jobDescriptions); - } - } + if (isFailed()) { + log.error("Failed {} Preprocessing Jobs:", failed.size()); + failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc)); + doFail(); } + } - List broken = new ArrayList<>(); + @NotNull + private List validateJobs(Collection jobs, Environment environment) { + final List broken = new ArrayList<>(); - for (Iterator iterator = jobs.iterator(); iterator.hasNext(); ) { + for (final Iterator iterator = jobs.iterator(); iterator.hasNext(); ) { final PreprocessingJob job = iterator.next(); try { @@ -213,22 +227,48 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig log.error("FAILED Preprocessing, files are missing or invalid."); doFail(); } + return broken; + } - jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing)); + @NotNull + private Collection collectJobs(List descriptionFiles, List tags, File inDir, File outDir, Environment environment) + throws IOException { + final Collection jobs = new ArrayList<>(); + if (tags == null || tags.isEmpty()) { + for (File desc : descriptionFiles) { + final List descriptions = + findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator()); + jobs.addAll(descriptions); + } + } + else { + for (String tag : tags) { + for (File desc : descriptionFiles) { + final List jobDescriptions = + findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator()); + + jobs.addAll(jobDescriptions); + } + } + } + return jobs; + } + + private void preprocessJobs(Collection jobs, int buckets, ConqueryConfig config) throws InterruptedException { final long totalSize = jobs.stream() .mapToLong(PreprocessingJob::estimateTotalCsvSizeBytes) .sum(); log.info("Required to preprocess {} in total", BinaryByteUnit.format(totalSize)); - ProgressBar totalProgress = new ProgressBar(totalSize, System.out); + final ProgressBar totalProgress = new ProgressBar(totalSize, System.out); for (PreprocessingJob job : jobs) { pool.submit(() -> { ConqueryMDC.setLocation(job.toString()); try { - Preprocessor.preprocess(job, totalProgress, config); + Preprocessor.preprocess(job, totalProgress, config, buckets); success.add(job.toString()); } catch (FileNotFoundException e) { @@ -246,23 +286,6 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig pool.awaitTermination(24, TimeUnit.HOURS); ConqueryMDC.clearLocation(); - - - if (!success.isEmpty()) { - log.info("Successfully Preprocess {} Jobs:", success.size()); - success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc)); - } - - if (!broken.isEmpty()) { - log.warn("Did not find {} Files", broken.size()); - broken.forEach(desc -> log.warn("\tDid not find file for {}", desc)); - } - - if (isFailed()) { - log.error("Failed {} Preprocessing Jobs:", failed.size()); - failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc)); - doFail(); - } } private void addMissing(PreprocessingJob job) { @@ -281,7 +304,7 @@ private void addFailed(PreprocessingJob job) { public List findPreprocessingDescriptions(File descriptionFiles, File inDir, File outputDir, Optional tag, Validator validator) throws IOException { - List out = new ArrayList<>(); + final List out = new ArrayList<>(); final File[] files = descriptionFiles.isFile() ? new File[]{descriptionFiles} @@ -302,8 +325,7 @@ private boolean isFailed() { return !failed.isEmpty(); } - private Optional tryExtractDescriptor(Validator validator, Optional tag, File descriptionFile, File outputDir, File csvDir) - throws IOException { + private Optional tryExtractDescriptor(Validator validator, Optional tag, File descriptionFile, File outputDir, File csvDir) { try { final TableImportDescriptor descriptor = diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardCommand.java new file mode 100644 index 0000000000..bf52085427 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardCommand.java @@ -0,0 +1,39 @@ +package com.bakdata.conquery.commands; + +import java.util.Collections; + +import com.bakdata.conquery.models.config.ConqueryConfig; +import com.bakdata.conquery.util.commands.NoOpConquery; +import io.dropwizard.core.cli.ServerCommand; +import io.dropwizard.core.server.DefaultServerFactory; +import io.dropwizard.core.setup.Bootstrap; +import io.dropwizard.core.setup.Environment; +import net.sourceforge.argparse4j.inf.Namespace; + +/** + * Command to run conquery as a shard node. + */ +public class ShardCommand extends ServerCommand { + + public ShardCommand() { + super(new NoOpConquery(), "shard", "Connects this instance as a ShardNode to a running ManagerNode."); + } + + @Override + protected void run(Bootstrap bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception { + bootstrap.addBundle(new ShardNode()); + + super.run(bootstrap, namespace, configuration); + } + + @Override + protected void run(Environment environment, Namespace namespace, ConqueryConfig configuration) throws Exception { + /* + Clear application connectors for a shard, before building the server, + as we only expose the metrics through the admin connector. + */ + ((DefaultServerFactory)configuration.getServerFactory()).setApplicationConnectors(Collections.emptyList()); + + super.run(environment, namespace, configuration); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java index 2293959fec..26e41c8945 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java @@ -7,6 +7,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import jakarta.validation.Validator; import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.MutableInjectableValues; @@ -37,14 +38,13 @@ import com.fasterxml.jackson.databind.DeserializationConfig; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationConfig; +import io.dropwizard.core.ConfiguredBundle; import io.dropwizard.core.setup.Environment; import io.dropwizard.lifecycle.Managed; import io.dropwizard.util.Duration; -import jakarta.validation.Validator; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import net.sourceforge.argparse4j.inf.Namespace; import org.apache.mina.core.RuntimeIoException; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoHandler; @@ -61,10 +61,12 @@ */ @Slf4j @Getter -public class ShardNode extends ConqueryCommand implements IoHandler, Managed { +public class ShardNode implements ConfiguredBundle, IoHandler, Managed { public static final String DEFAULT_NAME = "shard-node"; + private final String name; + private NioSocketConnector connector; private ConnectFuture future; private JobManager jobManager; @@ -82,12 +84,12 @@ public ShardNode() { } public ShardNode(String name) { - super(name, "Connects this instance as a ShardNode to a running ManagerNode."); + this.name = name; } @Override - protected void run(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception { + public void run(ConqueryConfig config, Environment environment) throws Exception { this.environment = environment; this.config = config; @@ -343,7 +345,7 @@ private void connectToCluster() { future.cancel(); // Sleep thirty seconds then retry. - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(config.getCluster().getConnectRetryTimeout().toSeconds()); } catch (RuntimeIoException e) { diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/MetaIdReferenceDeserializer.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/MetaIdReferenceDeserializer.java index 0023d97757..e08b026eea 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/MetaIdReferenceDeserializer.java +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/MetaIdReferenceDeserializer.java @@ -4,18 +4,14 @@ import java.util.InputMismatchException; import java.util.Optional; +import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.models.identifiable.CentralRegistry; import com.bakdata.conquery.models.identifiable.Identifiable; import com.bakdata.conquery.models.identifiable.ids.Id; import com.bakdata.conquery.models.identifiable.ids.IdUtil; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.BeanDescription; -import com.fasterxml.jackson.databind.BeanProperty; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.*; import com.fasterxml.jackson.databind.deser.ContextualDeserializer; import com.fasterxml.jackson.databind.deser.SettableBeanProperty; import com.fasterxml.jackson.databind.jsontype.TypeDeserializer; @@ -42,7 +38,7 @@ public T deserialize(JsonParser parser, DeserializationContext ctxt) throws IOEx ID id = ctxt.readValue(parser, idClass); try { - final CentralRegistry centralRegistry = CentralRegistry.get(ctxt); + final CentralRegistry centralRegistry = MetaStorage.get(ctxt).getCentralRegistry(); // Not all Components have registries, we leave it up to the validator to be angry. if (centralRegistry == null) { diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java index d8c12a95b8..d0cf222cef 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/MetaStorage.java @@ -11,13 +11,9 @@ import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.forms.configs.FormConfig; import com.bakdata.conquery.models.identifiable.CentralRegistry; -import com.bakdata.conquery.models.identifiable.ids.specific.FormConfigId; -import com.bakdata.conquery.models.identifiable.ids.specific.GroupId; -import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; -import com.bakdata.conquery.models.identifiable.ids.specific.RoleId; -import com.bakdata.conquery.models.identifiable.ids.specific.UserId; -import com.bakdata.conquery.models.worker.DatasetRegistry; -import com.bakdata.conquery.models.worker.Namespace; +import com.bakdata.conquery.models.identifiable.ids.specific.*; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -34,8 +30,6 @@ public class MetaStorage extends ConqueryStorage implements Injectable { protected final CentralRegistry centralRegistry = new CentralRegistry(); private final StoreFactory storageFactory; - @Getter - protected final DatasetRegistry datasetRegistry; private IdentifiableStore executions; private IdentifiableStore formConfigs; private IdentifiableStore authUser; @@ -47,8 +41,8 @@ public void openStores(ObjectMapper mapper) { authRole = storageFactory.createRoleStore(centralRegistry, "meta", this, mapper); authGroup = storageFactory.createGroupStore(centralRegistry, "meta", this, mapper); // Executions depend on users - executions = storageFactory.createExecutionsStore(centralRegistry, datasetRegistry, "meta", mapper); - formConfigs = storageFactory.createFormConfigStore(centralRegistry, datasetRegistry, "meta", mapper); + executions = storageFactory.createExecutionsStore(centralRegistry, "meta", mapper); + formConfigs = storageFactory.createFormConfigStore(centralRegistry, "meta", mapper); } @@ -196,4 +190,8 @@ public void addFormConfig(FormConfig formConfig) { public MutableInjectableValues inject(MutableInjectableValues values) { return values.add(MetaStorage.class, this); } + + public static MetaStorage get(DeserializationContext ctxt) throws JsonMappingException { + return (MetaStorage) ctxt.findInjectableValue(MetaStorage.class.getName(), null, null); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java index f84b4dc544..7de472e7b3 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java @@ -2,7 +2,6 @@ import java.util.Collection; import java.util.Objects; -import java.util.OptionalInt; import com.bakdata.conquery.io.storage.xodus.stores.CachedStore; import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore; @@ -17,7 +16,6 @@ import com.bakdata.conquery.models.worker.WorkerToBucketsMap; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import jakarta.validation.Validator; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -32,7 +30,7 @@ public class NamespaceStorage extends NamespacedStorage { protected CachedStore entity2Bucket; - public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator) { + public NamespaceStorage(StoreFactory storageFactory, String pathName) { super(storageFactory, pathName); } @@ -110,22 +108,13 @@ public int getNumberOfEntities() { return entity2Bucket.count(); } - public OptionalInt getEntityBucket(String entity) { - final Integer bucket = entity2Bucket.get(entity); - if(bucket == null){ - return OptionalInt.empty(); - } - - return OptionalInt.of(bucket); + public boolean containsEntity(String entity) { + return entity2Bucket.get(entity) != null; } - public int assignEntityBucket(String entity, int bucketSize) { - final int bucket = (int) Math.ceil((1d + getNumberOfEntities()) / (double) bucketSize); - - entity2Bucket.add(entity, bucket); - - return bucket; + public void registerEntity(String entity, int bucket) { + entity2Bucket.update(entity, bucket); } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespacedStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespacedStorage.java index a4a4c07d54..a9a7378760 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespacedStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespacedStorage.java @@ -19,6 +19,7 @@ import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId; import com.bakdata.conquery.models.identifiable.ids.specific.TableId; +import com.bakdata.conquery.models.worker.SingletonNamespaceCollection; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import lombok.Getter; @@ -56,7 +57,8 @@ public NamespacedStorage(StoreFactory storageFactory, String pathName) { } public void openStores(ObjectMapper objectMapper) { - + // Before we start to parse the stores we need to replace the injected value for the IdResolveContext (from DatasetRegistry to this centralRegistry) + new SingletonNamespaceCollection(centralRegistry).injectInto(objectMapper); dataset = storageFactory.createDatasetStore(pathName, objectMapper); secondaryIds = storageFactory.createSecondaryIdDescriptionStore(centralRegistry, pathName, objectMapper); diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java index adfe9b2841..84ff4e90d6 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java @@ -72,7 +72,7 @@ private void decorateCBlockStore(IdentifiableStore baseStoreCreator) { public void addCBlock(CBlock cBlock) { - log.debug("Adding CBlock[{}]", cBlock.getId()); + log.trace("Adding CBlock[{}]", cBlock.getId()); cBlocks.add(cBlock); } @@ -81,7 +81,7 @@ public CBlock getCBlock(CBlockId id) { } public void removeCBlock(CBlockId id) { - log.debug("Removing CBlock[{}]", id); + log.trace("Removing CBlock[{}]", id); cBlocks.remove(id); } @@ -90,7 +90,7 @@ public Collection getAllCBlocks() { } public void addBucket(Bucket bucket) { - log.debug("Adding Bucket[{}]", bucket.getId()); + log.trace("Adding Bucket[{}]", bucket.getId()); buckets.add(bucket); } @@ -99,7 +99,7 @@ public Bucket getBucket(BucketId id) { } public void removeBucket(BucketId id) { - log.debug("Removing Bucket[{}]", id); + log.trace("Removing Bucket[{}]", id); buckets.remove(id); } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java index 0840ca471a..bf6588683f 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java @@ -33,8 +33,7 @@ public void add(KEY key, VALUE value) { @Override public VALUE get(KEY key) { - // TODO: 08.01.2020 fk: This assumes that all values have been read at some point! - return cache.get(key); + return cache.computeIfAbsent(key, store::get); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java b/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java index f4a402adce..53ada11ccc 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/DelegateManager.java @@ -24,6 +24,7 @@ public class DelegateManager implements Manager { ConqueryConfig config; Environment environment; DatasetRegistry datasetRegistry; + MetaStorage storage; ImportHandler importHandler; StorageListener storageListener; Supplier> nodeProvider; @@ -42,7 +43,7 @@ public void stop() throws Exception { } @Override - public MetaStorage getStorage() { - return datasetRegistry.getMetaStorage(); + public MetaStorage getMetaStorage() { + return storage; } } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/InternalObjectMapperCreator.java b/backend/src/main/java/com/bakdata/conquery/mode/InternalObjectMapperCreator.java index 64d255dad8..8fdcf63076 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/InternalObjectMapperCreator.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/InternalObjectMapperCreator.java @@ -23,13 +23,12 @@ @RequiredArgsConstructor public class InternalObjectMapperCreator { private final ConqueryConfig config; + private final MetaStorage storage; private final Validator validator; private DatasetRegistry datasetRegistry = null; - private MetaStorage storage = null; public void init(DatasetRegistry datasetRegistry) { this.datasetRegistry = datasetRegistry; - this.storage = datasetRegistry.getMetaStorage(); } public ObjectMapper createInternalObjectMapper(@Nullable Class viewClass) { diff --git a/backend/src/main/java/com/bakdata/conquery/mode/Manager.java b/backend/src/main/java/com/bakdata/conquery/mode/Manager.java index bef1a3c444..f8de4d3035 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/Manager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/Manager.java @@ -10,9 +10,9 @@ import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.models.worker.ShardNodeInformation; +import io.dropwizard.core.setup.Environment; import io.dropwizard.lifecycle.Managed; import io.dropwizard.servlets.tasks.Task; -import io.dropwizard.core.setup.Environment; /** * A manager provides the implementations that differ by running mode. @@ -27,5 +27,6 @@ public interface Manager extends Managed { List getAdminTasks(); InternalObjectMapperCreator getInternalObjectMapperCreator(); JobManager getJobManager(); - MetaStorage getStorage(); + + MetaStorage getMetaStorage(); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java index c25f63a08e..fe45f4ecbe 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/ManagerProvider.java @@ -23,8 +23,8 @@ static JobManager newJobManager(ConqueryConfig config) { return new JobManager(JOB_MANAGER_NAME, config.isFailOnError()); } - static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig config, Validator validator) { - return new InternalObjectMapperCreator(config, validator); + static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig config, MetaStorage metaStorage, Validator validator) { + return new InternalObjectMapperCreator(config, metaStorage, validator); } static DatasetRegistry createDatasetRegistry( @@ -33,16 +33,13 @@ static DatasetRegistry createDatasetRegistry( InternalObjectMapperCreator creator ) { final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel()); - DatasetRegistry datasetRegistry = new DatasetRegistry<>( + return new DatasetRegistry<>( config.getCluster().getEntityBucketSize(), config, creator, namespaceHandler, indexService ); - MetaStorage storage = new MetaStorage(config.getStorage(), datasetRegistry); - datasetRegistry.setMetaStorage(storage); - return datasetRegistry; } } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java index 8f3e4eb1e4..fef5334580 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/NamespaceHandler.java @@ -4,6 +4,7 @@ import java.util.List; import com.bakdata.conquery.io.jackson.Injectable; +import com.bakdata.conquery.io.jackson.Jackson; import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.io.storage.NamespaceStorage; @@ -32,6 +33,7 @@ public interface NamespaceHandler { static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final ConqueryConfig config, final InternalObjectMapperCreator mapperCreator, IndexService indexService) { List injectables = new ArrayList<>(); injectables.add(indexService); + ObjectMapper persistenceMapper = mapperCreator.createInternalObjectMapper(View.Persistence.Manager.class); ObjectMapper communicationMapper = mapperCreator.createInternalObjectMapper(View.InternalCommunication.class); ObjectMapper preprocessMapper = mapperCreator.createInternalObjectMapper(null); @@ -40,8 +42,9 @@ static NamespaceSetupData createNamespaceSetup(NamespaceStorage storage, final C injectables.forEach(i -> i.injectInto(communicationMapper)); injectables.forEach(i -> i.injectInto(preprocessMapper)); - // Open and load the stores - storage.openStores(persistenceMapper); + + // Each store needs its own mapper because each injects its own registry + storage.openStores(Jackson.copyMapperAndInjectables(persistenceMapper)); storage.loadData(); JobManager jobManager = new JobManager(storage.getDataset().getName(), config.isFailOnError()); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterEntityResolver.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterEntityResolver.java new file mode 100644 index 0000000000..8aa3d7c720 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterEntityResolver.java @@ -0,0 +1,83 @@ +package com.bakdata.conquery.mode.cluster; + +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.collectExtraData; +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.readDates; +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.tryResolveId; +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.verifyOnlySingles; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolver; +import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil; +import com.bakdata.conquery.models.common.CDateSet; +import com.bakdata.conquery.models.config.IdColumnConfig; +import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; +import com.bakdata.conquery.models.identifiable.mapping.ExternalId; +import com.bakdata.conquery.util.DateReader; +import com.bakdata.conquery.util.io.IdColumnUtil; +import jakarta.validation.constraints.NotEmpty; + +public class ClusterEntityResolver implements EntityResolver { + + @Override + public ResolveStatistic resolveEntities( + @NotEmpty String[][] values, + List format, + EntityIdMap mapping, + IdColumnConfig idColumnConfig, + DateReader dateReader, + boolean onlySingles + ) { + final Map resolved = new HashMap<>(); + final List unresolvedDate = new ArrayList<>(); + final List unresolvedId = new ArrayList<>(); + + // extract dates from rows + final CDateSet[] rowDates = readDates(values, format, dateReader); + + // Extract extra data from rows by Row, to be collected into by entities + // Row -> Column -> Value + final Map[] extraDataByRow = EntityResolverUtil.readExtras(values, format); + + final List> readers = IdColumnUtil.getIdReaders(format, idColumnConfig.getIdMappers()); + + // We will not be able to resolve anything... + if (readers.isEmpty()) { + return EntityResolver.ResolveStatistic.forEmptyReaders(values); + } + + // Entity -> Column -> Values + final Map>> extraDataByEntity = new HashMap<>(); + + // ignore the first row, because this is the header + for (int rowNum = 1; rowNum < values.length; rowNum++) { + + final String[] row = values[rowNum]; + + if (rowDates[rowNum] == null) { + unresolvedDate.add(row); + continue; + } + + String resolvedId = tryResolveId(row, readers, mapping); + + if (resolvedId == null) { + unresolvedId.add(row); + continue; + } + + // read the dates from the row + resolved.put(resolvedId, rowDates[rowNum]); + + // Entity was resolved for row, so we collect the data. + collectExtraData(extraDataByRow, rowNum, extraDataByEntity, resolvedId); + } + + verifyOnlySingles(onlySingles, extraDataByEntity); + return new EntityResolver.ResolveStatistic(resolved, extraDataByEntity, unresolvedDate, unresolvedId); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java index 991d5efd64..f5a1b5179b 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java @@ -1,49 +1,155 @@ package com.bakdata.conquery.mode.cluster; +import java.io.IOException; import java.io.InputStream; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.bakdata.conquery.mode.ImportHandler; -import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Import; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.Connector; +import com.bakdata.conquery.models.events.Bucket; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.jobs.ImportJob; +import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; +import com.bakdata.conquery.models.identifiable.ids.specific.TableId; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; +import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; +import com.bakdata.conquery.models.preproc.PreprocessedData; +import com.bakdata.conquery.models.preproc.PreprocessedHeader; +import com.bakdata.conquery.models.preproc.PreprocessedReader; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.models.worker.Namespace; +import com.bakdata.conquery.models.worker.WorkerInformation; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response; import lombok.AllArgsConstructor; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; /** * Handler of {@link Import} requests that realizes them both on the manager and the cluster's shards. */ @AllArgsConstructor -public -class ClusterImportHandler implements ImportHandler { +@Slf4j +public class ClusterImportHandler implements ImportHandler { - private final ConqueryConfig config; private final DatasetRegistry datasetRegistry; @SneakyThrows @Override public void updateImport(Namespace namespace, InputStream inputStream) { - ImportJob job = ImportJob.createOrUpdate( - datasetRegistry.get(namespace.getDataset().getId()), - inputStream, - config.getCluster().getEntityBucketSize(), - true - ); + handleImport(namespace, inputStream, true); + } + + private static void handleImport(Namespace namespace, InputStream inputStream, boolean update) throws IOException { + try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { + // We parse semi-manually as the incoming file consist of multiple documents we read progressively: + // 1) the header to check metadata + // 2...) The chunked Buckets + + final PreprocessedHeader header = parser.readHeader(); + + final Table table = validateImportable(((DistributedNamespace) namespace), header, update); + + readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser); + + clearDependentConcepts(namespace.getStorage().getAllConcepts(), table); + } + } + + /** + * Handle validity and update logic. + */ + private static Table validateImportable(DistributedNamespace namespace, PreprocessedHeader header, boolean update) { + final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); + final ImportId importId = new ImportId(tableId, header.getName()); + + final Table table = namespace.getStorage().getTable(tableId); + + if (table == null) { + throw new BadRequestException("Table[%s] does not exist.".formatted(tableId)); + } + + // Ensure that Import and Table have the same schema + final List errors = header.assertMatch(table); + + if (!errors.isEmpty()) { + final String errorsMessage = String.join("\n - ", errors); + + log.error("Problems concerning Import `{}`:\n{}", importId, errorsMessage); + throw new BadRequestException("Headers[%s] do not match Table[%s]:\n%s".formatted(importId, table.getId(), errorsMessage)); + } + + final Import processedImport = namespace.getStorage().getImport(importId); + + if (update) { + if (processedImport == null) { + throw new NotFoundException("Import[%s] is not present.".formatted(importId)); + } + + // before updating the import, make sure that all workers removed the prior import + namespace.getWorkerHandler().sendToAll(new RemoveImportJob(processedImport)); + namespace.getStorage().removeImport(importId); + } + else if (processedImport != null) { + throw new WebApplicationException("Import[%s] is already present.".formatted(importId), Response.Status.CONFLICT); + } + + return table; + } - namespace.getJobManager().addSlowJob(job); + private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) { + final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); + final ImportId importId = new ImportId(tableId, header.getName()); + + log.info("BEGIN importing {} into {}", header.getName(), table); + + Import imp = null; + + final Map> collectedEntities = new HashMap<>(); + + for (PreprocessedData container : (Iterable) () -> reader) { + + if (imp == null) { + // We need a container to create a description. + imp = header.createImportDescription(table, container.getStores()); + + namespace.getWorkerHandler().sendToAll(new AddImport(imp)); + namespace.getStorage().updateImport(imp); + } + + + final Bucket bucket = Bucket.fromPreprocessed(table, container, imp); + + log.trace("DONE reading bucket `{}`, contains {} entities.", bucket.getId(), bucket.entities().size()); + + final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucket.getId()); + + sendBucket(bucket, responsibleWorker); + + // NOTE: I want the bucket to be GC'd as early as possible, so I just store the part(s) I need later. + + collectedEntities.put(bucket.getBucket(), bucket.entities()); + } + + namespace.getJobManager().addSlowJob(new RegisterImportEntities(collectedEntities, namespace, importId)); + + log.debug("Successfully read {} Buckets, containing {} entities for `{}`", header.getNumberOfBuckets(), header.getNumberOfEntities(), importId); + + namespace.getWorkerHandler().sendUpdatedWorkerInformation(); - clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable()); } - private void clearDependentConcepts(Collection> allConcepts, Table table) { + private static void clearDependentConcepts(Collection> allConcepts, Table table) { for (Concept c : allConcepts) { for (Connector con : c.getConnectors()) { if (!con.getTable().equals(table)) { @@ -55,24 +161,29 @@ private void clearDependentConcepts(Collection> allConcepts, Table ta } } + /** + * select, then send buckets. + */ + public static WorkerId sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { + + responsibleWorker.awaitFreeJobQueue(); + + log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); + responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); + + return responsibleWorker.getId(); + } + @SneakyThrows @Override public void addImport(Namespace namespace, InputStream inputStream) { - ImportJob job = ImportJob.createOrUpdate( - datasetRegistry.get(namespace.getDataset().getId()), - inputStream, - config.getCluster().getEntityBucketSize(), - false - ); - namespace.getJobManager().addSlowJob(job); - - clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable()); + handleImport(namespace, inputStream, false); } @Override public void deleteImport(Import imp) { - DatasetId id = imp.getTable().getDataset().getId(); + final DatasetId id = imp.getTable().getDataset().getId(); final DistributedNamespace namespace = datasetRegistry.get(id); clearDependentConcepts(namespace.getStorage().getAllConcepts(), imp.getTable()); @@ -83,4 +194,5 @@ public void deleteImport(Import imp) { // Remove bucket assignments for consistency report namespace.getWorkerHandler().removeBucketAssignmentsForImportFormWorkers(imp); } + } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java index 1579acf868..ed682e7112 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.function.Supplier; +import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.mode.*; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.JobManager; @@ -19,7 +20,8 @@ public class ClusterManagerProvider implements ManagerProvider { public ClusterManager provideManager(ConqueryConfig config, Environment environment) { final JobManager jobManager = ManagerProvider.newJobManager(config); - final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator()); + final MetaStorage storage = new MetaStorage(config.getStorage()); + final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, storage, environment.getValidator()); final ClusterState clusterState = new ClusterState(); final NamespaceHandler namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator); final DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator); @@ -28,14 +30,14 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm final ClusterConnectionManager connectionManager = new ClusterConnectionManager(datasetRegistry, jobManager, environment.getValidator(), config, creator, clusterState); - final ImportHandler importHandler = new ClusterImportHandler(config, datasetRegistry); + final ImportHandler importHandler = new ClusterImportHandler(datasetRegistry); final StorageListener extension = new ClusterStorageListener(jobManager, datasetRegistry); final Supplier> nodeProvider = () -> clusterState.getShardNodes().values(); final List adminTasks = List.of(new ReportConsistencyTask(clusterState)); final DelegateManager delegate = - new DelegateManager<>(config, environment, datasetRegistry, importHandler, extension, nodeProvider, adminTasks, creator, jobManager); + new DelegateManager<>(config, environment, datasetRegistry, storage, importHandler, extension, nodeProvider, adminTasks, creator, jobManager); environment.healthChecks().register("cluster", new ClusterHealthCheck(clusterState)); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java index acf04a20fe..617f166d72 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterNamespaceHandler.java @@ -37,6 +37,7 @@ public DistributedNamespace createNamespace(NamespaceStorage storage, final Meta namespaceData.getJobManager(), namespaceData.getFilterSearch(), namespaceData.getIndexService(), + new ClusterEntityResolver(), namespaceData.getInjectables(), workerHandler ); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterState.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterState.java index 36356fa9fc..18e725f1d0 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterState.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterState.java @@ -31,5 +31,4 @@ public WorkerInformation getWorker(final WorkerId workerId, final DatasetId id) .flatMap(ns -> ns.getWorkers().getOptional(workerId)) .orElseThrow(() -> new NoSuchElementException("Unknown worker worker '%s' for dataset '%s'".formatted(workerId, id))); } - } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java new file mode 100644 index 0000000000..e6c89725c1 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java @@ -0,0 +1,42 @@ +package com.bakdata.conquery.mode.cluster; + +import java.util.Collection; +import java.util.Map; + +import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; +import com.bakdata.conquery.models.jobs.Job; +import com.bakdata.conquery.models.worker.DistributedNamespace; +import lombok.Data; + +/** + * This class handles registration of entities. Relevant for counting and resolving entities from external sources. + */ +@Data +class RegisterImportEntities extends Job { + + private final Map> collectedEntities; + + + private final DistributedNamespace namespace; + private final ImportId importId; + + @Override + public void execute() { + // This task is quite slow, so be delay it as far as possible. + for (Map.Entry> bucket2Entities : collectedEntities.entrySet()) { + for (String entity : bucket2Entities.getValue()) { + + if (namespace.getStorage().containsEntity(entity)) { + continue; + } + + namespace.getStorage().registerEntity(entity, bucket2Entities.getKey()); + } + } + } + + @Override + public String getLabel() { + return "Handle Bucket %s assignments.".formatted(importId); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java index 762a754eb8..41d3895cf3 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalManagerProvider.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.function.Supplier; +import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.mode.DelegateManager; import com.bakdata.conquery.mode.InternalObjectMapperCreator; import com.bakdata.conquery.mode.ManagerProvider; @@ -32,15 +33,18 @@ public LocalManagerProvider(SqlDialectFactory dialectFactory) { public DelegateManager provideManager(ConqueryConfig config, Environment environment) { - InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator()); - NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory); - DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator); + final MetaStorage storage = new MetaStorage(config.getStorage()); + final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, storage, environment.getValidator()); + final NamespaceHandler namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory); + final DatasetRegistry datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator); + creator.init(datasetRegistry); return new DelegateManager<>( config, environment, datasetRegistry, + storage, new FailingImportHandler(), new LocalStorageListener(), EMPTY_NODE_PROVIDER, diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java index 52909c50ef..a0f7f23c2d 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/LocalNamespaceHandler.java @@ -7,6 +7,7 @@ import com.bakdata.conquery.mode.NamespaceSetupData; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.DatabaseConfig; +import com.bakdata.conquery.models.config.IdColumnConfig; import com.bakdata.conquery.models.config.SqlConnectorConfig; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.index.IndexService; @@ -15,9 +16,11 @@ import com.bakdata.conquery.sql.DSLContextWrapper; import com.bakdata.conquery.sql.DslContextFactory; import com.bakdata.conquery.sql.conquery.SqlExecutionManager; +import com.bakdata.conquery.sql.conversion.NodeConversions; import com.bakdata.conquery.sql.conversion.SqlConverter; import com.bakdata.conquery.sql.conversion.dialect.SqlDialect; import com.bakdata.conquery.sql.conversion.dialect.SqlDialectFactory; +import com.bakdata.conquery.sql.execution.ResultSetProcessor; import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory; import com.bakdata.conquery.sql.execution.SqlExecutionResult; import com.bakdata.conquery.sql.execution.SqlExecutionService; @@ -38,6 +41,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, mapperCreator, indexService); + IdColumnConfig idColumns = config.getIdColumns(); SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig(); DatabaseConfig databaseConfig = sqlConnectorConfig.getDatabaseConfig(namespaceStorage.getDataset()); @@ -45,10 +49,13 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto DSLContext dslContext = dslContextWrapper.getDslContext(); SqlDialect sqlDialect = dialectFactory.createSqlDialect(databaseConfig.getDialect()); - SqlConverter sqlConverter = new SqlConverter(sqlDialect, dslContext, databaseConfig); - SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, ResultSetProcessorFactory.create(sqlDialect)); + ResultSetProcessor resultSetProcessor = ResultSetProcessorFactory.create(config, sqlDialect); + SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, resultSetProcessor); + NodeConversions nodeConversions = new NodeConversions(idColumns, sqlDialect, dslContext, databaseConfig, sqlExecutionService); + SqlConverter sqlConverter = new SqlConverter(nodeConversions); ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage); SqlStorageHandler sqlStorageHandler = new SqlStorageHandler(sqlExecutionService); + SqlEntityResolver sqlEntityResolver = new SqlEntityResolver(idColumns, dslContext, sqlDialect, sqlExecutionService); return new LocalNamespace( namespaceData.getPreprocessMapper(), @@ -60,6 +67,7 @@ public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaSto namespaceData.getJobManager(), namespaceData.getFilterSearch(), namespaceData.getIndexService(), + sqlEntityResolver, namespaceData.getInjectables() ); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/local/SqlEntityResolver.java b/backend/src/main/java/com/bakdata/conquery/mode/local/SqlEntityResolver.java new file mode 100644 index 0000000000..681889ec86 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/local/SqlEntityResolver.java @@ -0,0 +1,206 @@ +package com.bakdata.conquery.mode.local; + +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.collectExtraData; +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.readDates; +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.verifyOnlySingles; +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.val; +import static org.jooq.impl.DSL.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolver; +import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil; +import com.bakdata.conquery.models.common.CDateSet; +import com.bakdata.conquery.models.config.IdColumnConfig; +import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; +import com.bakdata.conquery.models.identifiable.mapping.ExternalId; +import com.bakdata.conquery.sql.conversion.SharedAliases; +import com.bakdata.conquery.sql.conversion.dialect.SqlDialect; +import com.bakdata.conquery.sql.execution.SqlExecutionService; +import com.bakdata.conquery.util.DateReader; +import com.bakdata.conquery.util.io.IdColumnUtil; +import jakarta.validation.constraints.NotEmpty; +import lombok.RequiredArgsConstructor; +import org.jooq.CommonTableExpression; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Name; +import org.jooq.Record; +import org.jooq.Record2; +import org.jooq.Record3; +import org.jooq.Select; +import org.jooq.SelectConditionStep; +import org.jooq.Table; + +@RequiredArgsConstructor +public class SqlEntityResolver implements EntityResolver { + + private static final Name IS_RESOLVED_ALIAS = name("is_resolved"); + private static final Name UNRESOLVED_CTE = name("ids_unresolved"); + public static final String ROW_INDEX = "rowIndex"; + + private final IdColumnConfig idColumns; + private final DSLContext context; + private final SqlDialect dialect; + private final SqlExecutionService executionService; + + @Override + public ResolveStatistic resolveEntities( + @NotEmpty String[][] values, + List format, + EntityIdMap mapping, + IdColumnConfig idColumnConfig, + DateReader dateReader, + boolean onlySingles + ) { + final Map resolved = new HashMap<>(); + final List unresolvedDate = new ArrayList<>(); + final List unresolvedId = new ArrayList<>(); + + // extract dates from rows + final CDateSet[] rowDates = readDates(values, format, dateReader); + + // Extract extra data from rows by Row, to be collected into by entities + // Row -> Column -> Value + final Map[] extraDataByRow = EntityResolverUtil.readExtras(values, format); + + final List> readers = IdColumnUtil.getIdReaders(format, idColumnConfig.getIdMappers()); + + // We will not be able to resolve anything... + if (readers.isEmpty()) { + return EntityResolver.ResolveStatistic.forEmptyReaders(values); + } + + // Entity -> Column -> Values + final Map>> extraDataByEntity = new HashMap<>(); + + // all IDs of this map had at least a matching reader + final Map resolvedIdsMap = resolveIds(values, readers); + + // ignore the first row, because this is the header + for (int rowNum = 1; rowNum < values.length; rowNum++) { + + final String[] row = values[rowNum]; + + final IdResolveInfo idResolveInfo = resolvedIdsMap.get(rowNum); + if (idResolveInfo == null) { + // row had no matching reader + unresolvedId.add(row); + continue; + } + + // external ID could not be resolved internally + if (!idResolveInfo.isResolved()) { + unresolvedDate.add(row); + continue; + } + + final String resolvedId = idResolveInfo.externalId(); + + if (rowDates[rowNum] == null) { + unresolvedDate.add(row); + continue; + } + + // read the dates from the row + resolved.put(resolvedId, rowDates[rowNum]); + + // Entity was resolved for row, so we collect the data. + collectExtraData(extraDataByRow, rowNum, extraDataByEntity, resolvedId); + } + + verifyOnlySingles(onlySingles, extraDataByEntity); + return new EntityResolver.ResolveStatistic(resolved, extraDataByEntity, unresolvedDate, unresolvedId); + + } + + /** + * Create a SQL query like this + *

+	 *     {@code
+	 *      with "ids_unresolved" as (select 1   as "row",
+	 *                                      '1'  as "primary_id"
+	 *                                -- will select more ids here via union all)
+	 * 		select "row",
+	 * 		       "primary_id",
+	 * 		       case
+	 * 		           when "entities"."id" is not null then true
+	 * 		           else false
+	 * 		           end as "is_resolved"
+	 * 		from "entities"
+	 * 		         join "ids_unresolved"
+	 * 		              on "primary_id" = "entities"."id"
+	 * 		where "primary_id" = "pid"
+	 *     }
+	 * 
+ *

+ * For each ID, that had a matching reader, it will return an entry in the map with row number -> IdResolveInfo. + */ + private Map resolveIds(String[][] values, List> readers) { + + CommonTableExpression unresolvedCte = createUnresolvedCte(values, readers); + + Field rowIndex = field(name(ROW_INDEX), Integer.class); + Field externalPrimaryColumn = field(name(SharedAliases.PRIMARY_COLUMN.getAlias()), String.class); + Field innerPrimaryColumn = field(name(idColumns.findPrimaryIdColumn().getField()), String.class); + Field isResolved = when(innerPrimaryColumn.isNotNull(), val(true)) + .otherwise(false) + .as(IS_RESOLVED_ALIAS); + + Table allIdsTable = table(name(idColumns.getTable())); + SelectConditionStep> resolveIdsQuery = + context.with(unresolvedCte) + .select(rowIndex, externalPrimaryColumn, isResolved) + .from(dialect.getFunctionProvider().innerJoin(allIdsTable, unresolvedCte, List.of(externalPrimaryColumn.eq(innerPrimaryColumn)))) + .where(externalPrimaryColumn.eq(innerPrimaryColumn)); + + return executionService.fetchStream(resolveIdsQuery) + .collect(Collectors.toMap( + record -> record.get(rowIndex), + record -> new IdResolveInfo(record.get(externalPrimaryColumn), record.get(isResolved)) + )); + } + + private CommonTableExpression createUnresolvedCte(String[][] values, List> readers) { + + List>> selects = new ArrayList<>(values.length); + for (int i = 1; i < values.length; i++) { + + final String[] row = values[i]; + + String resolvedId = null; + for (Function reader : readers) { + final ExternalId externalId = reader.apply(row); + resolvedId = externalId.getId(); + } + + // no matching reader found + if (resolvedId == null) { + continue; + } + + Field rowIndex = val(i).as(ROW_INDEX); + Field externalPrimaryColumn = val(resolvedId).as(SharedAliases.PRIMARY_COLUMN.getAlias()); + Select> externalIdSelect = context.select(rowIndex, externalPrimaryColumn) + // some dialects can't just select static values without FROM clause + .from(dialect.getFunctionProvider().getNoOpTable()); + + selects.add(externalIdSelect); + } + + return UNRESOLVED_CTE.as(selects.stream().reduce(Select::unionAll).orElseThrow(IllegalStateException::new)); + } + + + private record IdResolveInfo(String externalId, boolean isResolved) { + } + +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java index c98582ea00..c71c7fde1c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java @@ -1,13 +1,13 @@ package com.bakdata.conquery.models.config; import java.net.InetAddress; +import jakarta.validation.Valid; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; import io.dropwizard.core.Configuration; import io.dropwizard.util.Duration; import io.dropwizard.validation.PortRange; -import jakarta.validation.Valid; -import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.Setter; @@ -26,6 +26,7 @@ public class ClusterConfig extends Configuration { private int entityBucketSize = 1000; private Duration idleTimeOut = Duration.minutes(5); + private Duration connectRetryTimeout = Duration.seconds(30); /** * Amount of backpressure before jobs can volunteer to block to send messages to their shards. diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ColumnConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ColumnConfig.java index 80a98a320b..a708564a40 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ColumnConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ColumnConfig.java @@ -6,7 +6,10 @@ import com.bakdata.conquery.io.jackson.View; import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; +import com.bakdata.conquery.models.identifiable.mapping.ExternalId; import com.bakdata.conquery.resources.admin.rest.AdminDatasetProcessor; +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonView; import com.google.common.base.Strings; import jakarta.validation.constraints.NotEmpty; @@ -20,7 +23,7 @@ /** * Configuration class for QueryUpload and IdMapping. - * + *

* Describes how rows are mapped for {@link EntityIdMap}/{@link AdminDatasetProcessor#setIdMapping(java.io.InputStream, com.bakdata.conquery.models.worker.Namespace)}. */ @Builder @@ -29,24 +32,21 @@ @NoArgsConstructor @Setter @Getter +@JsonIgnoreProperties(value = {"resolvable"}) // for backwards compatibility public class ColumnConfig { - public EntityIdMap.ExternalId read(String value) { - if (!isResolvable()) { - return null; - } - + public ExternalId read(String value) { if (Strings.isNullOrEmpty(value)) { return null; } if (getLength() == -1 || getPad() == null) { - return new EntityIdMap.ExternalId(getName(), value); + return new ExternalId(getName(), value); } String padded = StringUtils.leftPad(value, getLength(), getPad()); - return new EntityIdMap.ExternalId(getName(), padded); + return new ExternalId(getName(), padded); } @@ -87,12 +87,6 @@ public EntityIdMap.ExternalId read(String value) { @Builder.Default private int length = -1; - /** - * Set to true, if the column should be resolvable in upload. This can be used to add supplemental information to an entity, for example it's data-source, which would not be unique among entities. - */ - @Builder.Default - private boolean resolvable = false; - /** * Set to true, if the Column should be printed to output. This can be used to have resolvable but not printable fields in mapping. */ @@ -100,10 +94,19 @@ public EntityIdMap.ExternalId read(String value) { @JsonView(View.Persistence.class) private boolean print = true; + /* + * Set to true, if the column should be resolvable in upload. This can be used to add supplemental information to an entity, for example it's data-source, which would not be unique among entities. + */ + @Builder.Default + private boolean resolvable = false; + /** - * Used in conjunction with {@link com.bakdata.conquery.models.identifiable.mapping.AutoIncrementingPseudomizer}: One column is required to have fillAnon true, which will be filled with pseudomized data. + * Used for CQYes to select all entities. And CQExternal as primaryId for decoding. And for IdMapping for outputting additional Ids. + *

+ * Additionally, used in conjunction with {@link com.bakdata.conquery.models.identifiable.mapping.AutoIncrementingPseudomizer}: One column is required to have fillAnon true, which will be filled with pseudomized data. */ @Builder.Default @JsonView(View.Persistence.class) - private boolean fillAnon = false; + @JsonAlias("fillAnon") + private boolean primaryId = false; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/DatabaseConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/DatabaseConfig.java index 86a6aaa039..db23de6553 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/DatabaseConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/DatabaseConfig.java @@ -1,12 +1,16 @@ package com.bakdata.conquery.models.config; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.jackson.Jacksonized; @Data @Builder @Jacksonized +@NoArgsConstructor +@AllArgsConstructor public class DatabaseConfig { private static final String DEFAULT_PRIMARY_COLUMN = "pid"; diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/IdColumnConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/IdColumnConfig.java index 25d6c050e5..9c8d1202f6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/IdColumnConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/IdColumnConfig.java @@ -10,9 +10,6 @@ import java.util.Set; import java.util.stream.Collectors; -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotEmpty; - import com.bakdata.conquery.apiv1.query.concept.specific.external.DateFormat; import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; import com.bakdata.conquery.models.query.resultinfo.LocalizedDefaultResultInfo; @@ -23,6 +20,8 @@ import com.google.common.base.Functions; import com.google.common.collect.MoreCollectors; import io.dropwizard.validation.ValidationMethod; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -42,6 +41,11 @@ @Slf4j public class IdColumnConfig { + /** + * Relevant in SQL-Mode, used as AllIdsTable for CQExternal and CQYes. + */ + private String table = "entities"; + /** * List of resolvable and printable ids. * @@ -52,10 +56,9 @@ public class IdColumnConfig { private List ids = List.of( ColumnConfig.builder() .name("ID") - .field("result") + .field("pid") .label(Map.of(Locale.ROOT, "result")) - .resolvable(true) - .fillAnon(true) + .primaryId(true) .print(true) .build() ); @@ -64,9 +67,15 @@ public class IdColumnConfig { @JsonIgnore @Setter(AccessLevel.NONE) @Getter(lazy = true, value = AccessLevel.PUBLIC) - private final Map idMappers = ids.stream().filter(ColumnConfig::isResolvable) - .collect(Collectors.toMap(ColumnConfig::getName, Functions.identity())); + private final Map idMappers = ids.stream().collect(Collectors.toMap(ColumnConfig::getName, Functions.identity())); + @JsonIgnore + public ColumnConfig findPrimaryIdColumn() { + return ids.stream() + .filter(ColumnConfig::isPrimaryId) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Requiring at least 1 primary key column in IdColumnConfig")); + } @ValidationMethod(message = "Duplicate Claims for Mapping Columns.") @JsonIgnore @@ -101,7 +110,7 @@ public boolean isIdColsHaveMapping() { public boolean isExactlyOnePseudo() { return ids.stream() .filter(conf -> conf.getField() != null) - .filter(ColumnConfig::isFillAnon) + .filter(ColumnConfig::isPrimaryId) .count() == 1; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/PluginConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/PluginConfig.java index 8629074ccd..07f13b1082 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/PluginConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/PluginConfig.java @@ -3,11 +3,10 @@ import com.bakdata.conquery.commands.ManagerNode; import com.bakdata.conquery.io.cps.CPSBase; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import freemarker.core.Environment; @JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type") @CPSBase public interface PluginConfig { - public default void initialize(ManagerNode managerNode){}; + default void initialize(ManagerNode managerNode){} } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java index cfce88537b..c8140ef205 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/SqlConnectorConfig.java @@ -6,11 +6,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import io.dropwizard.validation.ValidationMethod; import jakarta.validation.Valid; -import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; -import lombok.Getter; import lombok.NoArgsConstructor; import lombok.extern.jackson.Jacksonized; @@ -21,7 +19,7 @@ @AllArgsConstructor public class SqlConnectorConfig { - boolean enabled; + private boolean enabled; /** * Determines if generated SQL should be formatted. @@ -31,7 +29,6 @@ public class SqlConnectorConfig { /** * Keys must match the name of existing {@link Dataset}s. */ - @Getter(AccessLevel.PRIVATE) private Map databaseConfigs; public DatabaseConfig getDatabaseConfig(Dataset dataset) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java b/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java index f79907fa0a..78ec8aecea 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java @@ -12,11 +12,7 @@ import com.bakdata.conquery.models.auth.entities.Group; import com.bakdata.conquery.models.auth.entities.Role; import com.bakdata.conquery.models.auth.entities.User; -import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.datasets.Import; -import com.bakdata.conquery.models.datasets.PreviewConfig; -import com.bakdata.conquery.models.datasets.SecondaryIdDescription; -import com.bakdata.conquery.models.datasets.Table; +import com.bakdata.conquery.models.datasets.*; import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.StructureNode; import com.bakdata.conquery.models.events.Bucket; @@ -27,7 +23,6 @@ import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; import com.bakdata.conquery.models.index.InternToExternMapper; import com.bakdata.conquery.models.index.search.SearchIndex; -import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.WorkerInformation; import com.bakdata.conquery.models.worker.WorkerToBucketsMap; import com.fasterxml.jackson.annotation.JsonTypeInfo; @@ -67,9 +62,9 @@ public interface StoreFactory { SingletonStore createStructureStore(String pathName, CentralRegistry centralRegistry, ObjectMapper objectMapper); // MetaStorage - IdentifiableStore createExecutionsStore(CentralRegistry centralRegistry, DatasetRegistry datasetRegistry, String pathName, ObjectMapper objectMapper); + IdentifiableStore createExecutionsStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper); - IdentifiableStore createFormConfigStore(CentralRegistry centralRegistry, DatasetRegistry datasetRegistry, String pathName, ObjectMapper objectMapper); + IdentifiableStore createFormConfigStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper); IdentifiableStore createUserStore(CentralRegistry centralRegistry, String pathName, MetaStorage storage, ObjectMapper objectMapper); diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java index ae74ba9f23..7529e92934 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java @@ -52,7 +52,6 @@ import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap; import com.bakdata.conquery.models.index.InternToExternMapper; import com.bakdata.conquery.models.index.search.SearchIndex; -import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.WorkerInformation; import com.bakdata.conquery.models.worker.WorkerToBucketsMap; import com.bakdata.conquery.util.io.ConqueryMDC; @@ -191,7 +190,7 @@ public ExecutorService getReaderExecutorService() { @Override public Collection discoverNamespaceStorages() { - return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath, getValidator()), NAMESPACE_STORES); + return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath), NAMESPACE_STORES); } @Override @@ -258,22 +257,22 @@ public SingletonStore createDatasetStore(String pathName, ObjectMapper @Override public IdentifiableStore createSecondaryIdDescriptionStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, SECONDARY_IDS, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, SECONDARY_IDS, objectMapper), centralRegistry); } @Override public IdentifiableStore createInternToExternMappingStore(String pathName, CentralRegistry centralRegistry, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, INTERN_TO_EXTERN, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, INTERN_TO_EXTERN, objectMapper), centralRegistry); } @Override public IdentifiableStore createSearchIndexStore(String pathName, CentralRegistry centralRegistry, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, SEARCH_INDEX, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, SEARCH_INDEX, objectMapper), centralRegistry); } @Override public SingletonStore createPreviewStore(String pathName, CentralRegistry centralRegistry, ObjectMapper objectMapper) { - return StoreMappings.singleton(createStore(findEnvironment(pathName), validator, ENTITY_PREVIEW, centralRegistry.injectIntoNew(objectMapper))); + return StoreMappings.singleton(createStore(findEnvironment(pathName), validator, ENTITY_PREVIEW, objectMapper)); } @Override @@ -283,27 +282,27 @@ public CachedStore createEntity2BucketStore(String pathName, Ob @Override public IdentifiableStore createTableStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, TABLES, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, TABLES, objectMapper), centralRegistry); } @Override public IdentifiableStore> createConceptStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, CONCEPTS, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, CONCEPTS, objectMapper), centralRegistry); } @Override public IdentifiableStore createImportStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, IMPORTS, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, IMPORTS, objectMapper), centralRegistry); } @Override public IdentifiableStore createCBlockStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, C_BLOCKS, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, C_BLOCKS, objectMapper), centralRegistry); } @Override public IdentifiableStore createBucketStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, BUCKETS, centralRegistry.injectIntoNew(objectMapper)), centralRegistry); + return StoreMappings.identifiable(createStore(findEnvironment(pathName), validator, BUCKETS, objectMapper), centralRegistry); } @Override @@ -332,17 +331,17 @@ public SingletonStore createWorkerToBucketsStore(String path @Override public SingletonStore createStructureStore(String pathName, CentralRegistry centralRegistry, ObjectMapper objectMapper) { - return StoreMappings.singleton(createStore(findEnvironment(pathName), validator, STRUCTURE, centralRegistry.injectIntoNew(objectMapper))); + return StoreMappings.singleton(createStore(findEnvironment(pathName), validator, STRUCTURE, objectMapper)); } @Override - public IdentifiableStore createExecutionsStore(CentralRegistry centralRegistry, DatasetRegistry datasetRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(resolveSubDir(pathName, "executions")), validator, EXECUTIONS, datasetRegistry.injectInto(centralRegistry.injectIntoNew(objectMapper))), centralRegistry); + public IdentifiableStore createExecutionsStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { + return StoreMappings.identifiable(createStore(findEnvironment(resolveSubDir(pathName, "executions")), validator, EXECUTIONS, objectMapper), centralRegistry); } @Override - public IdentifiableStore createFormConfigStore(CentralRegistry centralRegistry, DatasetRegistry datasetRegistry, String pathName, ObjectMapper objectMapper) { - return StoreMappings.identifiable(createStore(findEnvironment(resolveSubDir(pathName, "formConfigs")), validator, FORM_CONFIG, datasetRegistry.injectInto(centralRegistry.injectIntoNew(objectMapper))), centralRegistry); + public IdentifiableStore createFormConfigStore(CentralRegistry centralRegistry, String pathName, ObjectMapper objectMapper) { + return StoreMappings.identifiable(createStore(findEnvironment(resolveSubDir(pathName, "formConfigs")), validator, FORM_CONFIG, objectMapper), centralRegistry); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java index 5d8fc1d451..e43fbd8d06 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java @@ -54,6 +54,7 @@ public class PreviewConfig { * @implSpec the order of selects is the order of the output fields. */ @Valid + @NotNull private List infoCardSelects = List.of(); @Valid @@ -62,11 +63,13 @@ public class PreviewConfig { /** * Columns that should not be displayed to users in entity preview. */ + @NotNull private Set hidden = Collections.emptySet(); /** * SecondaryIds where the columns should be grouped together. */ + @NotNull private Set grouping = Collections.emptySet(); /** @@ -74,11 +77,13 @@ public class PreviewConfig { * * @implNote This is purely for the frontend, the backend can theoretically be queried for all Connectors. */ + @NotNull private Set allConnectors = Collections.emptySet(); /** * Connectors that shall be selected by default by the frontend. */ + @NotNull private Set defaultConnectors = Collections.emptySet(); /** @@ -88,6 +93,7 @@ public class PreviewConfig { *

* The Frontend will use the concepts filters to render a search for entity preview. */ + @NotNull private Set searchFilters = Collections.emptySet(); @JacksonInject(useInput = OptBoolean.FALSE) @@ -186,24 +192,19 @@ public List