From bcad24d1b97dee82fd53c1cf24933a2e5c8ef69b Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Mon, 5 Aug 2024 16:00:51 +0200 Subject: [PATCH] Split imports on preprocess (#3389) Implementation of buckets split at preprocess: Prior implementation split data on import, which meant loading large amounts of data only to chunk them. This merge offloads that responsibility onto the preprocess using stable hashing of the entities ID. The hashing is dependent on the number of buckets, defined at preprocess. This Merge serves to reduce the memory requirements of the manager node, and improve upload time. It also lays the foundation to make preprocessing more memory efficient by compressing buckets separetly. --- .../commands/PreprocessorCommand.java | 118 +++--- .../conquery/io/storage/NamespaceStorage.java | 18 +- .../conquery/io/storage/WorkerStorage.java | 8 +- .../io/storage/xodus/stores/CachedStore.java | 3 +- .../mode/cluster/ClusterImportHandler.java | 160 ++++++-- .../mode/cluster/ClusterManagerProvider.java | 2 +- .../mode/cluster/RegisterImportEntities.java | 42 ++ .../concepts/tree/ConceptTreeCache.java | 7 +- .../datasets/concepts/tree/TreeConcept.java | 4 +- .../conquery/models/events/Bucket.java | 25 +- .../conquery/models/events/BucketManager.java | 13 +- .../conquery/models/events/CBlock.java | 3 +- .../conquery/models/events/EmptyBucket.java | 2 +- .../InjectingCentralRegistry.java | 26 -- .../identifiable/mapping/EntityIdMap.java | 2 +- .../models/jobs/CalculateCBlocksJob.java | 103 +++-- .../conquery/models/jobs/ImportJob.java | 372 ------------------ .../namespaces/specific/AddImport.java | 2 +- .../namespaces/specific/ImportBucket.java | 6 +- .../conquery/models/preproc/Preprocessed.java | 79 +++- .../models/preproc/PreprocessedData.java | 3 +- .../models/preproc/PreprocessedHeader.java | 72 ++++ .../models/preproc/PreprocessedReader.java | 44 +-- .../conquery/models/preproc/Preprocessor.java | 4 +- .../models/worker/DistributedNamespace.java | 14 +- .../models/worker/ShardNodeInformation.java | 7 +- .../conquery/models/worker/WorkerHandler.java | 28 +- .../models/worker/WorkerInformation.java | 21 +- .../models/worker/WorkerToBucketsMap.java | 27 +- .../resources/api/ConceptsProcessor.java | 4 +- .../ProgressReporterImpl.java | 30 +- .../integration/common/LoadingUtil.java | 65 ++- .../integration/tests/ImportUpdateTest.java | 12 +- .../integration/tests/RestartTest.java | 2 + .../ConceptUpdateAndDeletionTest.java | 4 + .../tests/deletion/ImportDeletionTest.java | 8 +- .../conquery/models/SerializationTests.java | 4 +- .../concepts/tree/GroovyIndexedTest.java | 9 +- .../util/support/StandaloneSupport.java | 6 +- 39 files changed, 660 insertions(+), 699 deletions(-) create mode 100644 backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java delete mode 100644 backend/src/main/java/com/bakdata/conquery/models/identifiable/InjectingCentralRegistry.java delete mode 100644 backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java diff --git a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java index ea5f91cd78..686e8200eb 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java @@ -44,6 +44,7 @@ import net.sourceforge.argparse4j.inf.ArgumentGroup; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; +import org.jetbrains.annotations.NotNull; @Slf4j @FieldNameConstants @@ -52,7 +53,7 @@ public class PreprocessorCommand extends ConqueryCommand { private final List failed = Collections.synchronizedList(new ArrayList<>()); private final List success = Collections.synchronizedList(new ArrayList<>()); private ExecutorService pool; - private boolean isFailFast = false; + private boolean isFailFast; private boolean isStrict = true; public PreprocessorCommand() { @@ -71,14 +72,14 @@ public static boolean requiresProcessing(PreprocessingJob preprocessingJob) { log.info("EXISTS ALREADY"); - int currentHash = preprocessingJob.getDescriptor() - .calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag()); + final int currentHash = preprocessingJob.getDescriptor() + .calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag()); final ObjectMapper om = Jackson.BINARY_MAPPER.copy(); try (final PreprocessedReader parser = new PreprocessedReader(new GZIPInputStream(new FileInputStream(preprocessingJob.getPreprocessedFile())), om)) { - PreprocessedHeader header = parser.readHeader(); + final PreprocessedHeader header = parser.readHeader(); if (header.getValidityHash() == currentHash) { log.info("\tHASH STILL VALID"); @@ -133,13 +134,18 @@ public void configure(Subparser subparser) { group.addArgument("--fast-fail") .action(Arguments.storeTrue()) .setDefault(false) - .help("Stop preprocessing and exit with failure if an error occures that prevents the generation of a cqpp."); + .help("Stop preprocessing and exit with failure if an error occurs that prevents the generation of a cqpp."); group.addArgument("--strict") .type(new BooleanArgumentType()) .setDefault(true) .help("Escalate missing files to errors."); + group.addArgument("--buckets") + .type(Integer.class) + .setDefault(100) + .help("Number of buckets to use for id-hashing. This value is required to be a constant per-dataset."); + } @Override @@ -150,41 +156,49 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig // Tag if present is appended to input-file csvs, output-file cqpp and used as id of cqpps + // Seems to be a bug with dropwizard and boolean default-values isFailFast = Optional.ofNullable(namespace.getBoolean("fast-fail")).orElse(false); - isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(true); + isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(false); - final List tags = namespace.getList("tag"); + final List tags = namespace.getList("tag"); final File inDir = namespace.get("in"); final File outDir = namespace.get("out"); - final List descriptionFiles = namespace.getList("desc"); + final List descriptionFilesRoot = namespace.getList("desc"); + final int buckets = namespace.getInt("buckets"); log.info("Preprocessing from command line config."); - final Collection jobs = new ArrayList<>(); + final Collection jobs = collectJobs(descriptionFilesRoot, tags, inDir, outDir, environment); - if (tags == null || tags.isEmpty()) { - for (File desc : descriptionFiles) { - final List descriptions = - findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator()); - jobs.addAll(descriptions); - } + final List broken = validateJobs(jobs, environment); + + jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing)); + + preprocessJobs(jobs, buckets, config); + + + log.info("Successfully Preprocess {} Jobs:", success.size()); + success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc)); + + if (!broken.isEmpty()) { + log.warn("Did not find {} Files", broken.size()); + broken.forEach(desc -> log.warn("\tDid not find file for {}", desc)); } - else { - for (String tag : tags) { - for (File desc : descriptionFiles) { - final List jobDescriptions = - findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator()); - jobs.addAll(jobDescriptions); - } - } + if (isFailed()) { + log.error("Failed {} Preprocessing Jobs:", failed.size()); + failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc)); + doFail(); } + } - List broken = new ArrayList<>(); + @NotNull + private List validateJobs(Collection jobs, Environment environment) { + final List broken = new ArrayList<>(); - for (Iterator iterator = jobs.iterator(); iterator.hasNext(); ) { + for (final Iterator iterator = jobs.iterator(); iterator.hasNext(); ) { final PreprocessingJob job = iterator.next(); try { @@ -213,22 +227,48 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig log.error("FAILED Preprocessing, files are missing or invalid."); doFail(); } + return broken; + } - jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing)); + @NotNull + private Collection collectJobs(List descriptionFiles, List tags, File inDir, File outDir, Environment environment) + throws IOException { + final Collection jobs = new ArrayList<>(); + if (tags == null || tags.isEmpty()) { + for (File desc : descriptionFiles) { + final List descriptions = + findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator()); + jobs.addAll(descriptions); + } + } + else { + for (String tag : tags) { + for (File desc : descriptionFiles) { + final List jobDescriptions = + findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator()); + + jobs.addAll(jobDescriptions); + } + } + } + return jobs; + } + + private void preprocessJobs(Collection jobs, int buckets, ConqueryConfig config) throws InterruptedException { final long totalSize = jobs.stream() .mapToLong(PreprocessingJob::estimateTotalCsvSizeBytes) .sum(); log.info("Required to preprocess {} in total", BinaryByteUnit.format(totalSize)); - ProgressBar totalProgress = new ProgressBar(totalSize, System.out); + final ProgressBar totalProgress = new ProgressBar(totalSize, System.out); for (PreprocessingJob job : jobs) { pool.submit(() -> { ConqueryMDC.setLocation(job.toString()); try { - Preprocessor.preprocess(job, totalProgress, config); + Preprocessor.preprocess(job, totalProgress, config, buckets); success.add(job.toString()); } catch (FileNotFoundException e) { @@ -246,23 +286,6 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig pool.awaitTermination(24, TimeUnit.HOURS); ConqueryMDC.clearLocation(); - - - if (!success.isEmpty()) { - log.info("Successfully Preprocess {} Jobs:", success.size()); - success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc)); - } - - if (!broken.isEmpty()) { - log.warn("Did not find {} Files", broken.size()); - broken.forEach(desc -> log.warn("\tDid not find file for {}", desc)); - } - - if (isFailed()) { - log.error("Failed {} Preprocessing Jobs:", failed.size()); - failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc)); - doFail(); - } } private void addMissing(PreprocessingJob job) { @@ -281,7 +304,7 @@ private void addFailed(PreprocessingJob job) { public List findPreprocessingDescriptions(File descriptionFiles, File inDir, File outputDir, Optional tag, Validator validator) throws IOException { - List out = new ArrayList<>(); + final List out = new ArrayList<>(); final File[] files = descriptionFiles.isFile() ? new File[]{descriptionFiles} @@ -302,8 +325,7 @@ private boolean isFailed() { return !failed.isEmpty(); } - private Optional tryExtractDescriptor(Validator validator, Optional tag, File descriptionFile, File outputDir, File csvDir) - throws IOException { + private Optional tryExtractDescriptor(Validator validator, Optional tag, File descriptionFile, File outputDir, File csvDir) { try { final TableImportDescriptor descriptor = diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java index 7b29656e6d..7de472e7b3 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java @@ -2,7 +2,6 @@ import java.util.Collection; import java.util.Objects; -import java.util.OptionalInt; import com.bakdata.conquery.io.storage.xodus.stores.CachedStore; import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore; @@ -109,22 +108,13 @@ public int getNumberOfEntities() { return entity2Bucket.count(); } - public OptionalInt getEntityBucket(String entity) { - final Integer bucket = entity2Bucket.get(entity); - if(bucket == null){ - return OptionalInt.empty(); - } - - return OptionalInt.of(bucket); + public boolean containsEntity(String entity) { + return entity2Bucket.get(entity) != null; } - public int assignEntityBucket(String entity, int bucketSize) { - final int bucket = (int) Math.ceil((1d + getNumberOfEntities()) / (double) bucketSize); - - entity2Bucket.add(entity, bucket); - - return bucket; + public void registerEntity(String entity, int bucket) { + entity2Bucket.update(entity, bucket); } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java index adfe9b2841..84ff4e90d6 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java @@ -72,7 +72,7 @@ private void decorateCBlockStore(IdentifiableStore baseStoreCreator) { public void addCBlock(CBlock cBlock) { - log.debug("Adding CBlock[{}]", cBlock.getId()); + log.trace("Adding CBlock[{}]", cBlock.getId()); cBlocks.add(cBlock); } @@ -81,7 +81,7 @@ public CBlock getCBlock(CBlockId id) { } public void removeCBlock(CBlockId id) { - log.debug("Removing CBlock[{}]", id); + log.trace("Removing CBlock[{}]", id); cBlocks.remove(id); } @@ -90,7 +90,7 @@ public Collection getAllCBlocks() { } public void addBucket(Bucket bucket) { - log.debug("Adding Bucket[{}]", bucket.getId()); + log.trace("Adding Bucket[{}]", bucket.getId()); buckets.add(bucket); } @@ -99,7 +99,7 @@ public Bucket getBucket(BucketId id) { } public void removeBucket(BucketId id) { - log.debug("Removing Bucket[{}]", id); + log.trace("Removing Bucket[{}]", id); buckets.remove(id); } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java index 0840ca471a..bf6588683f 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java @@ -33,8 +33,7 @@ public void add(KEY key, VALUE value) { @Override public VALUE get(KEY key) { - // TODO: 08.01.2020 fk: This assumes that all values have been read at some point! - return cache.get(key); + return cache.computeIfAbsent(key, store::get); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java index 991d5efd64..f5a1b5179b 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java @@ -1,49 +1,155 @@ package com.bakdata.conquery.mode.cluster; +import java.io.IOException; import java.io.InputStream; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.bakdata.conquery.mode.ImportHandler; -import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Import; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.Connector; +import com.bakdata.conquery.models.events.Bucket; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.jobs.ImportJob; +import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; +import com.bakdata.conquery.models.identifiable.ids.specific.TableId; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; +import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; +import com.bakdata.conquery.models.preproc.PreprocessedData; +import com.bakdata.conquery.models.preproc.PreprocessedHeader; +import com.bakdata.conquery.models.preproc.PreprocessedReader; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.models.worker.Namespace; +import com.bakdata.conquery.models.worker.WorkerInformation; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response; import lombok.AllArgsConstructor; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; /** * Handler of {@link Import} requests that realizes them both on the manager and the cluster's shards. */ @AllArgsConstructor -public -class ClusterImportHandler implements ImportHandler { +@Slf4j +public class ClusterImportHandler implements ImportHandler { - private final ConqueryConfig config; private final DatasetRegistry datasetRegistry; @SneakyThrows @Override public void updateImport(Namespace namespace, InputStream inputStream) { - ImportJob job = ImportJob.createOrUpdate( - datasetRegistry.get(namespace.getDataset().getId()), - inputStream, - config.getCluster().getEntityBucketSize(), - true - ); + handleImport(namespace, inputStream, true); + } + + private static void handleImport(Namespace namespace, InputStream inputStream, boolean update) throws IOException { + try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { + // We parse semi-manually as the incoming file consist of multiple documents we read progressively: + // 1) the header to check metadata + // 2...) The chunked Buckets + + final PreprocessedHeader header = parser.readHeader(); + + final Table table = validateImportable(((DistributedNamespace) namespace), header, update); + + readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser); + + clearDependentConcepts(namespace.getStorage().getAllConcepts(), table); + } + } + + /** + * Handle validity and update logic. + */ + private static Table validateImportable(DistributedNamespace namespace, PreprocessedHeader header, boolean update) { + final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); + final ImportId importId = new ImportId(tableId, header.getName()); + + final Table table = namespace.getStorage().getTable(tableId); + + if (table == null) { + throw new BadRequestException("Table[%s] does not exist.".formatted(tableId)); + } + + // Ensure that Import and Table have the same schema + final List errors = header.assertMatch(table); + + if (!errors.isEmpty()) { + final String errorsMessage = String.join("\n - ", errors); + + log.error("Problems concerning Import `{}`:\n{}", importId, errorsMessage); + throw new BadRequestException("Headers[%s] do not match Table[%s]:\n%s".formatted(importId, table.getId(), errorsMessage)); + } + + final Import processedImport = namespace.getStorage().getImport(importId); + + if (update) { + if (processedImport == null) { + throw new NotFoundException("Import[%s] is not present.".formatted(importId)); + } + + // before updating the import, make sure that all workers removed the prior import + namespace.getWorkerHandler().sendToAll(new RemoveImportJob(processedImport)); + namespace.getStorage().removeImport(importId); + } + else if (processedImport != null) { + throw new WebApplicationException("Import[%s] is already present.".formatted(importId), Response.Status.CONFLICT); + } + + return table; + } - namespace.getJobManager().addSlowJob(job); + private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) { + final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); + final ImportId importId = new ImportId(tableId, header.getName()); + + log.info("BEGIN importing {} into {}", header.getName(), table); + + Import imp = null; + + final Map> collectedEntities = new HashMap<>(); + + for (PreprocessedData container : (Iterable) () -> reader) { + + if (imp == null) { + // We need a container to create a description. + imp = header.createImportDescription(table, container.getStores()); + + namespace.getWorkerHandler().sendToAll(new AddImport(imp)); + namespace.getStorage().updateImport(imp); + } + + + final Bucket bucket = Bucket.fromPreprocessed(table, container, imp); + + log.trace("DONE reading bucket `{}`, contains {} entities.", bucket.getId(), bucket.entities().size()); + + final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucket.getId()); + + sendBucket(bucket, responsibleWorker); + + // NOTE: I want the bucket to be GC'd as early as possible, so I just store the part(s) I need later. + + collectedEntities.put(bucket.getBucket(), bucket.entities()); + } + + namespace.getJobManager().addSlowJob(new RegisterImportEntities(collectedEntities, namespace, importId)); + + log.debug("Successfully read {} Buckets, containing {} entities for `{}`", header.getNumberOfBuckets(), header.getNumberOfEntities(), importId); + + namespace.getWorkerHandler().sendUpdatedWorkerInformation(); - clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable()); } - private void clearDependentConcepts(Collection> allConcepts, Table table) { + private static void clearDependentConcepts(Collection> allConcepts, Table table) { for (Concept c : allConcepts) { for (Connector con : c.getConnectors()) { if (!con.getTable().equals(table)) { @@ -55,24 +161,29 @@ private void clearDependentConcepts(Collection> allConcepts, Table ta } } + /** + * select, then send buckets. + */ + public static WorkerId sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { + + responsibleWorker.awaitFreeJobQueue(); + + log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); + responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); + + return responsibleWorker.getId(); + } + @SneakyThrows @Override public void addImport(Namespace namespace, InputStream inputStream) { - ImportJob job = ImportJob.createOrUpdate( - datasetRegistry.get(namespace.getDataset().getId()), - inputStream, - config.getCluster().getEntityBucketSize(), - false - ); - namespace.getJobManager().addSlowJob(job); - - clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable()); + handleImport(namespace, inputStream, false); } @Override public void deleteImport(Import imp) { - DatasetId id = imp.getTable().getDataset().getId(); + final DatasetId id = imp.getTable().getDataset().getId(); final DistributedNamespace namespace = datasetRegistry.get(id); clearDependentConcepts(namespace.getStorage().getAllConcepts(), imp.getTable()); @@ -83,4 +194,5 @@ public void deleteImport(Import imp) { // Remove bucket assignments for consistency report namespace.getWorkerHandler().removeBucketAssignmentsForImportFormWorkers(imp); } + } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java index cafc855713..ed682e7112 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java @@ -30,7 +30,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm final ClusterConnectionManager connectionManager = new ClusterConnectionManager(datasetRegistry, jobManager, environment.getValidator(), config, creator, clusterState); - final ImportHandler importHandler = new ClusterImportHandler(config, datasetRegistry); + final ImportHandler importHandler = new ClusterImportHandler(datasetRegistry); final StorageListener extension = new ClusterStorageListener(jobManager, datasetRegistry); final Supplier> nodeProvider = () -> clusterState.getShardNodes().values(); final List adminTasks = List.of(new ReportConsistencyTask(clusterState)); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java new file mode 100644 index 0000000000..e6c89725c1 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java @@ -0,0 +1,42 @@ +package com.bakdata.conquery.mode.cluster; + +import java.util.Collection; +import java.util.Map; + +import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; +import com.bakdata.conquery.models.jobs.Job; +import com.bakdata.conquery.models.worker.DistributedNamespace; +import lombok.Data; + +/** + * This class handles registration of entities. Relevant for counting and resolving entities from external sources. + */ +@Data +class RegisterImportEntities extends Job { + + private final Map> collectedEntities; + + + private final DistributedNamespace namespace; + private final ImportId importId; + + @Override + public void execute() { + // This task is quite slow, so be delay it as far as possible. + for (Map.Entry> bucket2Entities : collectedEntities.entrySet()) { + for (String entity : bucket2Entities.getValue()) { + + if (namespace.getStorage().containsEntity(entity)) { + continue; + } + + namespace.getStorage().registerEntity(entity, bucket2Entities.getKey()); + } + } + } + + @Override + public String getLabel() { + return "Handle Bucket %s assignments.".formatted(importId); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java index 5875e89264..128dcf61bd 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java @@ -4,6 +4,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.exceptions.ConceptConfigurationException; import com.bakdata.conquery.util.CalculatedValue; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -35,7 +36,7 @@ public class ConceptTreeCache { * @implNote ConcurrentHashMap does not allow null values, but we want to have null values in the map. So we wrap the values in Optional. */ @JsonIgnore - private final Map> cached = new ConcurrentHashMap<>();; + private final Map>> cached = new ConcurrentHashMap<>();; /** @@ -43,7 +44,7 @@ public class ConceptTreeCache { * * @param value */ - public ConceptTreeChild findMostSpecificChild(String value, CalculatedValue> rowMap) throws ConceptConfigurationException { + public ConceptElement findMostSpecificChild(String value, CalculatedValue> rowMap) throws ConceptConfigurationException { if(cached.containsKey(value)) { hits++; @@ -52,7 +53,7 @@ public ConceptTreeChild findMostSpecificChild(String value, CalculatedValue child = treeConcept.findMostSpecificChild(value, rowMap); if(!rowMap.isCalculated()) { cached.put(value, Optional.ofNullable(child)); diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java index ab302384e9..0c2c90d8fd 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java @@ -126,11 +126,11 @@ public void initElements() throws ConfigurationException, JSONException { } } - public ConceptTreeChild findMostSpecificChild(String stringValue, CalculatedValue> rowMap) throws ConceptConfigurationException { + public ConceptElement findMostSpecificChild(String stringValue, CalculatedValue> rowMap) throws ConceptConfigurationException { return findMostSpecificChild(stringValue, rowMap, null, getChildren()); } - private ConceptTreeChild findMostSpecificChild(String stringValue, CalculatedValue> rowMap, ConceptTreeChild best, List currentList) + private ConceptElement findMostSpecificChild(String stringValue, CalculatedValue> rowMap, ConceptElement best, List currentList) throws ConceptConfigurationException { while (currentList != null && !currentList.isEmpty()) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java index 2257ab8b2c..530aa94187 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java @@ -5,6 +5,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import com.bakdata.conquery.io.jackson.serializer.NsIdRef; import com.bakdata.conquery.models.common.CDateSet; @@ -26,12 +27,14 @@ import com.bakdata.conquery.models.identifiable.IdentifiableImpl; import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable; import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; +import com.bakdata.conquery.models.preproc.PreprocessedData; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.google.common.collect.ImmutableSet; import io.dropwizard.validation.ValidationMethod; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import lombok.AccessLevel; @@ -50,16 +53,16 @@ @FieldNameConstants @Getter @Setter -@ToString(of = {"numberOfEvents", "stores"}, callSuper = true) +@ToString(onlyExplicitlyIncluded = true, callSuper = true) @AllArgsConstructor @RequiredArgsConstructor(onConstructor_ = {@JsonCreator}, access = AccessLevel.PROTECTED) + public class Bucket extends IdentifiableImpl implements NamespacedIdentifiable { @Min(0) private final int bucket; - @Min(0) - private final int numberOfEvents; + @ToString.Include @JsonManagedReference @Setter(AccessLevel.PROTECTED) private ColumnStore[] stores; @@ -74,9 +77,25 @@ public class Bucket extends IdentifiableImpl implements NamespacedIden */ private final Object2IntMap ends; + private final int numberOfEvents; + @NsIdRef private final Import imp; + private static ColumnStore[] sortColumns(Table table, Map stores) { + return Arrays.stream(table.getColumns()) + .map(Column::getName) + .map(stores::get) + .map(Objects::requireNonNull) + .toArray(ColumnStore[]::new); + } + + public static Bucket fromPreprocessed(Table table, PreprocessedData container, Import imp) { + final ColumnStore[] storesSorted = sortColumns(table, container.getStores()); + final int numberOfEvents = container.getEnds().values().stream().mapToInt(i -> i).max().orElse(0); + + return new Bucket(container.getBucketId(), storesSorted, new Object2IntOpenHashMap<>(container.getStarts()), new Object2IntOpenHashMap<>(container.getEnds()),numberOfEvents, imp); + } @JsonIgnore @ValidationMethod(message = "Number of events does not match the length of some stores.") diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java index fe91ec2ba8..d7f2a99f4a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java @@ -124,7 +124,11 @@ private static void registerCBlock(CBlock cBlock, Map c : storage.getAllConcepts()) { + final Collection> allConcepts = storage.getAllConcepts(); + + log.info("BEGIN full update for {} concepts.", allConcepts.size()); + + for (Concept c : allConcepts) { if (!(c instanceof TreeConcept)) { continue; } @@ -142,7 +146,7 @@ public void fullUpdate() { continue; } - log.warn("CBlock[{}] missing in Storage. Queuing recalculation", cBlockId); + log.trace("CBlock[{}] missing in Storage. Queuing recalculation", cBlockId); job.addCBlock(bucket, con); } } @@ -365,6 +369,11 @@ public void addConcept(Concept concept) { job.addCBlock(bucket, connector); } } + + if(job.isEmpty()){ + return; + } + jobManager.addSlowJob(job); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java index 424fe60dc1..cfc5e0182b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.datasets.concepts.conditions.CTCondition; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache; @@ -164,7 +165,7 @@ else if (treeConcept.countElements() == 1) { continue; } - final ConceptTreeChild child = cache == null + final ConceptElement child = cache == null ? treeConcept.findMostSpecificChild(stringValue, rowMap) : cache.findMostSpecificChild(stringValue, rowMap); diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java b/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java index f0b6f64951..5d1551cb8d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java @@ -20,7 +20,7 @@ public class EmptyBucket extends Bucket { private static final EmptyBucket Instance = new EmptyBucket(); public EmptyBucket() { - super(0, 0, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), null); + super(0, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), 0, null); this.setStores(new ColumnStore[0]); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/identifiable/InjectingCentralRegistry.java b/backend/src/main/java/com/bakdata/conquery/models/identifiable/InjectingCentralRegistry.java deleted file mode 100644 index 05d4395dde..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/identifiable/InjectingCentralRegistry.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.bakdata.conquery.models.identifiable; - -import java.util.Map; - -import com.bakdata.conquery.models.identifiable.ids.Id; -import lombok.Data; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; - -/** - * Central Registry used to wire up incoming ids with already established ids. - */ -@RequiredArgsConstructor -@Data -public class InjectingCentralRegistry extends CentralRegistry{ - /** - * This map is intentionally NOT an IdMap as it allows wiring up mismatched ids. - */ - @NonNull - private final Map, Identifiable> injections; - - @Override - protected > T get(Id name) { - return (T) injections.get(name); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java b/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java index 52b56dd00c..6bd986ed3d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java +++ b/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java @@ -147,7 +147,7 @@ public String resolve(ExternalId key) { } // Maybe we can find them directly in the dictionary? - if (storage.getEntityBucket(key.getId()).isPresent()) { + if (storage.containsEntity(key.getId())) { return key.getId(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java index afad884d47..91287384b6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java @@ -3,6 +3,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import com.bakdata.conquery.io.storage.WorkerStorage; @@ -15,9 +18,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import lombok.Getter; +import lombok.Data; import lombok.RequiredArgsConstructor; -import lombok.Setter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** @@ -27,44 +30,65 @@ */ @RequiredArgsConstructor @Slf4j - +@Data +@ToString(onlyExplicitlyIncluded = true) public class CalculateCBlocksJob extends Job { - private final List infos = new ArrayList<>(); + private final List tasks = new ArrayList<>(); private final WorkerStorage storage; private final BucketManager bucketManager; private final ExecutorService executorService; + @ToString.Include @Override public String getLabel() { - return "Calculate CBlocks[" + infos.size() + "]"; + return "Calculate CBlocks[" + tasks.size() + "]"; } public void addCBlock(Bucket bucket, ConceptTreeConnector connector) { - infos.add(new CalculationInformation(connector, bucket)); + tasks.add(createInformationProcessor(connector, bucket)); + } + + private CalculationInformationProcessor createInformationProcessor(ConceptTreeConnector connector, Bucket bucket) { + return new CalculationInformationProcessor(connector, bucket, bucketManager, storage); } @Override public void execute() throws Exception { - if (infos.isEmpty()) { + if (tasks.isEmpty()) { return; } - getProgressReporter().setMax(infos.size()); + log.info("BEGIN calculate CBlocks for {} entries.", tasks.size()); - final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(this.executorService); + getProgressReporter().setMax(tasks.size()); - final List> futures = infos.stream() - .map(this::createInformationProcessor) - .map(executorService::submit) - .peek(f -> f.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor())) - .collect(Collectors.toList()); + final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(getExecutorService()); - Futures.allAsList(futures).get(); - } + final List> futures = + tasks.stream() + .map(executorService::submit) + .peek(fut -> fut.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor())) + .collect(Collectors.toList()); + + + final ListenableFuture all = Futures.allAsList(futures); + + while (!all.isDone()) { + try { + all.get(1, TimeUnit.MINUTES); + } + catch (TimeoutException exception) { + log.debug("submitted={}, pool={}", tasks.size(), getExecutorService()); + + if (log.isTraceEnabled() && getExecutorService() instanceof ThreadPoolExecutor) { + log.trace("Waiting for {}", ((ThreadPoolExecutor) getExecutorService()).getQueue()); + } + } + } + + log.debug("DONE CalculateCBlocks for {} entries.", tasks.size()); - private CalculationInformationProcessor createInformationProcessor(CalculationInformation info) { - return new CalculationInformationProcessor(info, bucketManager, storage); } private void incrementProgressReporter() { @@ -72,52 +96,45 @@ private void incrementProgressReporter() { } public boolean isEmpty() { - return infos.isEmpty(); + return tasks.isEmpty(); } - @RequiredArgsConstructor - @Getter - @Setter - private static class CalculationInformation { + + @Data + @ToString(onlyExplicitlyIncluded = true) + private static class CalculationInformationProcessor implements Runnable { private final ConceptTreeConnector connector; private final Bucket bucket; - public CBlockId getCBlockId() { - return new CBlockId(getBucket().getId(), getConnector().getId()); - } - } - - - @RequiredArgsConstructor - private static class CalculationInformationProcessor implements Runnable { - private final CalculationInformation info; private final BucketManager bucketManager; private final WorkerStorage storage; @Override public void run() { try { - if (bucketManager.hasCBlock(info.getCBlockId())) { - log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", info.getCBlockId()); + if (bucketManager.hasCBlock(getCBlockId())) { + log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", getCBlockId()); return; } - CBlock cBlock = CBlock.createCBlock(info.getConnector(), info.getBucket(), bucketManager.getEntityBucketSize()); + log.trace("BEGIN calculating CBlock for {}", getCBlockId()); + + final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize()); + + log.trace("DONE calculating CBlock for {}", getCBlockId()); bucketManager.addCalculatedCBlock(cBlock); storage.addCBlock(cBlock); } catch (Exception e) { - throw new RuntimeException( - String.format( - "Exception in CalculateCBlocksJob (CBlock=%s, connector=%s)", - info.getCBlockId(), - info.getConnector() - ), - e - ); + throw new RuntimeException("Exception in CalculateCBlocksJob %s".formatted(getCBlockId()), e); } } + @ToString.Include + public CBlockId getCBlockId() { + return new CBlockId(getBucket().getId(), getConnector().getId()); + } + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java deleted file mode 100644 index 92fbd6ee47..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ /dev/null @@ -1,372 +0,0 @@ -package com.bakdata.conquery.models.jobs; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - -import com.bakdata.conquery.models.datasets.Column; -import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.datasets.Import; -import com.bakdata.conquery.models.datasets.ImportColumn; -import com.bakdata.conquery.models.datasets.Table; -import com.bakdata.conquery.models.events.Bucket; -import com.bakdata.conquery.models.events.MajorTypeId; -import com.bakdata.conquery.models.events.stores.root.ColumnStore; -import com.bakdata.conquery.models.exceptions.JSONException; -import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; -import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; -import com.bakdata.conquery.models.identifiable.ids.specific.TableId; -import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; -import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; -import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; -import com.bakdata.conquery.models.preproc.PPColumn; -import com.bakdata.conquery.models.preproc.PreprocessedData; -import com.bakdata.conquery.models.preproc.PreprocessedHeader; -import com.bakdata.conquery.models.preproc.PreprocessedReader; -import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.WorkerHandler; -import com.bakdata.conquery.models.worker.WorkerInformation; -import com.bakdata.conquery.util.progressreporter.ProgressReporter; -import com.google.common.base.Functions; -import it.unimi.dsi.fastutil.ints.IntArrayList; -import it.unimi.dsi.fastutil.ints.IntList; -import it.unimi.dsi.fastutil.objects.Object2IntMap; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; -import jakarta.ws.rs.BadRequestException; -import jakarta.ws.rs.WebApplicationException; -import jakarta.ws.rs.core.Response; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * This is the main routine to load data into Conquery. - */ -@RequiredArgsConstructor -@Slf4j -public class ImportJob extends Job { - - private static final int NUMBER_OF_STEPS = /* directly in execute = */4; - private final DistributedNamespace namespace; - @Getter - private final Table table; - private final int bucketSize; - private final PreprocessedHeader header; - private final PreprocessedData container; - - public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStream inputStream, int entityBucketSize, boolean update) - throws IOException { - - try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { - - final Dataset ds = namespace.getDataset(); - - // We parse semi-manually as the incoming file consist of multiple documents we only read progressively: - // 1) the header to check metadata - // 2) The Dictionaries to be imported and transformed - // 3) The ColumnStores themselves which contain references to the previously imported dictionaries. - - - final PreprocessedHeader header = parser.readHeader(); - - final TableId tableId = new TableId(ds.getId(), header.getTable()); - final Table table = namespace.getStorage().getTable(tableId); - - if (table == null) { - throw new BadRequestException(String.format("Table[%s] does not exist.", tableId)); - } - - // Ensure that Import and Table have the same schema - final List validationErrors = ensureHeadersMatch(table, header); - - if(!validationErrors.isEmpty()){ - final String errorMessage = String.join("\n -", validationErrors); - - log.error("Problems concerning Import `{}`:{}", header.getName(), errorMessage); - throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]:%s", header.getTable(), header.getName(), table.getId(), errorMessage)); - } - - final ImportId importId = new ImportId(table.getId(), header.getName()); - final Import processedImport = namespace.getStorage().getImport(importId); - - if (update) { - if (processedImport == null) { - throw new WebApplicationException(String.format("Import[%s] is not present.", importId), Response.Status.NOT_FOUND); - } - // before updating the import, make sure that all workers removed the last import - namespace.getWorkerHandler().sendToAll(new RemoveImportJob(processedImport)); - namespace.getStorage().removeImport(importId); - } - else if (processedImport != null) { - throw new WebApplicationException(String.format("Import[%s] is already present.", importId), Response.Status.CONFLICT); - } - - - log.trace("Begin reading data."); - - final PreprocessedData container = parser.readData(); - - log.debug("Done reading data. Contains {} Entities.", container.size()); - - log.info("Importing {} into {}", header.getName(), tableId); - - return new ImportJob( - namespace, - table, - entityBucketSize, - header, - container - ); - } - } - - /** - * Verify that the supplied table matches the preprocessed data in shape. - */ - public static List ensureHeadersMatch(Table table, PreprocessedHeader importHeaders) { -// final StringJoiner errors = new StringJoiner("\n - ", "\n - ", ""); - - final List errors = new ArrayList<>(); - - if (table.getColumns().length != importHeaders.getColumns().length) { - errors.add(String.format("Import column count=%d does not match table column count=%d", importHeaders.getColumns().length, table.getColumns().length)); - } - - final Map typesByName = Arrays.stream(importHeaders.getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); - - for (PPColumn column : importHeaders.getColumns()) { - if (!typesByName.containsKey(column.getName())) { - errors.add("Column[%s] is missing." - .formatted(column.getName())); - } - else if (!typesByName.get(column.getName()).equals(column.getType())) { - errors.add("Column[%s] Types do not match %s != %s" - .formatted(column.getName(), typesByName.get(column.getName()), column.getType())); - } - } - - return errors; - } - - - @Override - public void execute() throws JSONException, InterruptedException, IOException { - - getProgressReporter().setMax(NUMBER_OF_STEPS); - - log.trace("Updating primary dictionary"); - - getProgressReporter().report(1); - - // Distribute the new IDs among workers - distributeWorkerResponsibilities(container.entities()); - - getProgressReporter().report(1); - - final Import imp = createImport(header, container.getStores(), table.getColumns(), container.size()); - - namespace.getStorage().updateImport(imp); - - final Map> buckets2LocalEntities = groupEntitiesByBucket(container.entities(), bucketSize); - - - final ColumnStore[] storesSorted = Arrays.stream(table.getColumns()) - .map(Column::getName) - .map(container.getStores()::get) - .map(Objects::requireNonNull) - .toArray(ColumnStore[]::new); - - - log.info("Start sending {} Buckets", buckets2LocalEntities.size()); - - // we use this to track assignment to workers. - final Map> workerAssignments = - sendBuckets(container.getStarts(), container.getLengths(), imp, buckets2LocalEntities, storesSorted); - - final WorkerHandler handler = namespace.getWorkerHandler(); - workerAssignments.forEach(handler::addBucketsToWorker); - - } - - private void distributeWorkerResponsibilities(Set entities) { - log.debug("Updating bucket assignments."); - - synchronized (namespace) { - for (String entity : entities) { - final int bucket = namespace.getBucket(entity, bucketSize); - - if (namespace.getWorkerHandler().getResponsibleWorkerForBucket(bucket) != null) { - continue; - } - - namespace.getWorkerHandler().addResponsibility(bucket); - } - } - } - - private Import createImport(PreprocessedHeader header, Map stores, Column[] columns, int size) { - final Import imp = new Import(table); - - imp.setName(header.getName()); - imp.setNumberOfEntries(header.getRows()); - imp.setNumberOfEntities(size); - - final ImportColumn[] importColumns = new ImportColumn[columns.length]; - - for (int i = 0; i < columns.length; i++) { - final ColumnStore store = stores.get(columns[i].getName()); - - final ImportColumn col = new ImportColumn(imp, store.createDescription(), store.getLines(), store.estimateMemoryConsumptionBytes()); - - col.setName(columns[i].getName()); - - importColumns[i] = col; - } - - imp.setColumns(importColumns); - - namespace.getWorkerHandler().sendToAll(new AddImport(imp)); - return imp; - } - - /** - * Group entities by their global bucket id. - */ - private Map> groupEntitiesByBucket(Set entities, int bucketSize) { - return entities.stream() - .collect(Collectors.groupingBy(entity -> namespace.getBucket(entity, bucketSize))); - - } - - /** - * select, then send buckets. - */ - private Map> sendBuckets(Map starts, Map lengths, Import imp, Map> buckets2LocalEntities, ColumnStore[] storesSorted) { - - final Map> newWorkerAssignments = new HashMap<>(); - - final ProgressReporter subJob = getProgressReporter().subJob(buckets2LocalEntities.size()); - - for (Map.Entry> bucket2entities : buckets2LocalEntities.entrySet()) { - - - final int bucketId = bucket2entities.getKey(); - final List entities = bucket2entities.getValue(); - - final WorkerInformation responsibleWorker = Objects.requireNonNull( - namespace - .getWorkerHandler() - .getResponsibleWorkerForBucket(bucketId), - () -> "No responsible worker for Bucket#" + bucketId - ); - - awaitFreeJobQueue(responsibleWorker); - - final Map bucketStarts = entities.stream() - .filter(starts::containsKey) - .collect(Collectors.toMap(Functions.identity(), starts::get)); - - final Map bucketLengths = entities.stream() - .filter(lengths::containsKey) - .collect(Collectors.toMap(Functions.identity(), lengths::get)); - - - assert !Collections.disjoint(bucketStarts.keySet(), bucketLengths.keySet()); - - - final Bucket bucket = - selectBucket(bucketStarts, bucketLengths, storesSorted, imp, bucketId); - - newWorkerAssignments.computeIfAbsent(responsibleWorker.getId(), (ignored) -> new HashSet<>()) - .add(bucket.getId()); - - log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); - responsibleWorker.send(ImportBucket.forBucket(bucket)); - - subJob.report(1); - } - - return newWorkerAssignments; - } - - private void awaitFreeJobQueue(WorkerInformation responsibleWorker) { - try { - responsibleWorker.getConnectedShardNode().waitForFreeJobQueue(); - } - catch (InterruptedException e) { - log.error("Interrupted while waiting for worker[{}] to have free space in queue", responsibleWorker, e); - } - } - - /** - * - remap Entity-Ids to global - * - calculate per-Entity regions of Bucklet (start/end) - * - split stores - */ - private Bucket selectBucket(Map localStarts, Map localLengths, ColumnStore[] stores, Import imp, int bucketId) { - - - final IntList selectionStart = new IntArrayList(); - final IntList selectionLength = new IntArrayList(); - - - // First entity of Bucket starts at 0, the following are appended. - final Object2IntMap entityStarts = new Object2IntOpenHashMap<>(); - final Object2IntMap entityEnds = new Object2IntOpenHashMap<>(); - - - int currentStart = 0; - - for (Map.Entry entity2Start : localStarts.entrySet()) { - final String entity = entity2Start.getKey(); - final int start = entity2Start.getValue(); - - final int length = localLengths.get(entity); - - selectionStart.add(start); - - selectionLength.add(length); - - entityStarts.put(entity, currentStart); - entityEnds.put(entity, currentStart + length); - - currentStart += length; - } - - // copy only the parts of the bucket we need - final ColumnStore[] bucketStores = - Arrays.stream(stores) - .map(store -> store.select(selectionStart.toIntArray(), selectionLength.toIntArray())) - .toArray(ColumnStore[]::new); - - return new Bucket( - bucketId, - selectionLength.intStream().sum(), - bucketStores, - entityStarts, - entityEnds, - imp - ); - } - - private Dataset getDataset() { - return namespace.getDataset(); - } - - - @Override - public String getLabel() { - return "Importing into " + table + " from " + header.getName(); - } - -} diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java index 5470d205c9..659f2a2493 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java @@ -20,7 +20,7 @@ public class AddImport extends WorkerMessage { @Override public void react(Worker context) throws Exception { - log.info("Received Import[{}], containing {} entries.", imp.getId(), imp.getNumberOfEntries()); + log.trace("Received Import[{}], containing {} entries.", imp.getId(), imp.getNumberOfEntries()); context.addImport(imp); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java index b2b02666d8..def021735d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java @@ -22,13 +22,9 @@ public class ImportBucket extends WorkerMessage { private final Bucket bucket; - public static ImportBucket forBucket(Bucket bucket) { - return new ImportBucket(bucket.getId().toString(),bucket); - } - @Override public void react(Worker context) throws Exception { - log.trace("Received {}", bucket.getId()); + log.debug("Received {}, containing {} entities", bucket.getId(), bucket.entities().size()); context.addBucket(bucket); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java index 3bb69761f2..e32f51b359 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java @@ -6,7 +6,9 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.IntSummaryStatistics; import java.util.List; import java.util.Map; @@ -22,11 +24,14 @@ import com.bakdata.conquery.models.preproc.parser.Parser; import com.bakdata.conquery.models.preproc.parser.specific.StringParser; import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntLists; import it.unimi.dsi.fastutil.objects.Object2IntAVLTreeMap; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -79,7 +84,7 @@ public Preprocessed(ConqueryConfig config, PreprocessingJob preprocessingJob) th } - public void write(File file) throws IOException { + public void write(File file, int buckets) throws IOException { final Object2IntMap entityStart = new Object2IntAVLTreeMap<>(); final Object2IntMap entityLength = new Object2IntAVLTreeMap<>(); @@ -95,15 +100,23 @@ public void write(File file) throws IOException { log.debug("Writing Headers"); - final int hash = descriptor.calculateValidityHash(job.getCsvDirectory(), job.getTag()); + //TODO this could actually be done at read-time, avoiding large allocations entirely. But in a different smaller PR. + final Map> bucket2Entity = entityStart.keySet().stream() + .collect(Collectors.groupingBy(id -> getEntityBucket(buckets, id))) + .entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> new HashSet<>(entry.getValue()))); - final PreprocessedHeader header = new PreprocessedHeader(descriptor.getName(), descriptor.getTable(), rows, columns, hash); + final int hash = descriptor.calculateValidityHash(job.getCsvDirectory(), job.getTag()); - final PreprocessedData data = new PreprocessedData(entityStart, entityLength, columnStores); + final PreprocessedHeader header = + new PreprocessedHeader(descriptor.getName(), descriptor.getTable(), rows, entityStart.size(), bucket2Entity.size(), columns, hash); + writePreprocessed(file, header, entityStart, entityLength, columnStores, bucket2Entity); + } - writePreprocessed(file, header, data); + public static int getEntityBucket(int buckets, String id) { + return Hashing.consistentHash(id.hashCode(), buckets); } /** @@ -172,7 +185,7 @@ private Map combineStores(Object2IntMap entityStart return columnStores; } - private static void writePreprocessed(File file, PreprocessedHeader header, PreprocessedData data) throws IOException { + private static void writePreprocessed(File file, PreprocessedHeader header, Map globalStarts, Map globalLengths, Map data, Map> bucket2Entities) throws IOException { final OutputStream out = new GZIPOutputStream(new FileOutputStream(file)); try (JsonGenerator generator = Jackson.BINARY_MAPPER.copy().enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET).getFactory().createGenerator(out)) { @@ -180,11 +193,61 @@ private static void writePreprocessed(File file, PreprocessedHeader header, Prep generator.writeObject(header); - log.debug("Writing data"); - generator.writeObject(data); + for (Map.Entry> bucketIds : bucket2Entities.entrySet()) { + final Collection entities = bucketIds.getValue(); + + final Map starts = Maps.filterKeys(globalStarts, entities::contains); + final Map lengths = Maps.filterKeys(globalLengths, entities::contains); + + final PreprocessedData preprocessedData = selectBucket(bucketIds.getKey(), starts, lengths, data); + + generator.writeObject(preprocessedData); + } + } + } + + private static PreprocessedData selectBucket(int bucket, Map localStarts, Map localLengths, Map stores) { + + + final IntList selectionStart = new IntArrayList(); + final IntList selectionLength = new IntArrayList(); + + + // First entity of Bucket starts at 0, the following are appended. + final Object2IntMap entityStarts = new Object2IntOpenHashMap<>(); + final Object2IntMap entityEnds = new Object2IntOpenHashMap<>(); + + + int currentStart = 0; + + for (Map.Entry entity2Start : localStarts.entrySet()) { + final String entity = entity2Start.getKey(); + final int start = entity2Start.getValue(); + + final int length = localLengths.get(entity); + + selectionStart.add(start); + + selectionLength.add(length); + + entityStarts.put(entity, currentStart); + entityEnds.put(entity, currentStart + length); + + currentStart += length; } + + final Map selected = new HashMap<>(); + + for (Map.Entry entry : stores.entrySet()) { + final String name = entry.getKey(); + final ColumnStore store = entry.getValue(); + + selected.put(name, store.select(selectionStart.toIntArray(), selectionLength.toIntArray())); + } + + return new PreprocessedData(bucket, entityStarts, entityEnds, selected); } public synchronized String addPrimary(String primary) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java index 313d44ee09..0df0e7ccc7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java @@ -12,9 +12,10 @@ @Data @AllArgsConstructor(onConstructor_ = @JsonCreator) public class PreprocessedData { + private final int bucketId; private final Map starts; - private final Map lengths; + private final Map ends; private final Map stores; diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java index b032dbb8a8..6b33c7abd8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java @@ -1,5 +1,17 @@ package com.bakdata.conquery.models.preproc; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.bakdata.conquery.models.datasets.Column; +import com.bakdata.conquery.models.datasets.Import; +import com.bakdata.conquery.models.datasets.ImportColumn; +import com.bakdata.conquery.models.datasets.Table; +import com.bakdata.conquery.models.events.MajorTypeId; +import com.bakdata.conquery.models.events.stores.root.ColumnStore; import com.fasterxml.jackson.annotation.JsonCreator; import lombok.AllArgsConstructor; import lombok.Data; @@ -33,6 +45,10 @@ public class PreprocessedHeader { * Number of rows in the Preprocessed file. */ private long rows; + private long numberOfEntities; + + //TODO use Set to track actually included buckets,to split phase bucket assignment. + private int numberOfBuckets; /** * The specific columns and their associated MajorType for validation. @@ -44,4 +60,60 @@ public class PreprocessedHeader { */ private int validityHash; + public Import createImportDescription(Table table, Map stores) { + final Import imp = new Import(table); + + imp.setName(getName()); + imp.setNumberOfEntries(getRows()); + imp.setNumberOfEntities(getNumberOfEntities()); + + final ImportColumn[] importColumns = new ImportColumn[columns.length]; + + for (int i = 0; i < columns.length; i++) { + final ColumnStore store = stores.get(columns[i].getName()); + + final ImportColumn col = new ImportColumn(imp, store.createDescription(), store.getLines(), numberOfBuckets * store.estimateMemoryConsumptionBytes()); + + col.setName(columns[i].getName()); + + importColumns[i] = col; + } + + imp.setColumns(importColumns); + + return imp; + } + + + /** + * Verify that the supplied table matches the preprocessed' data in shape. + * + * @return + */ + public List assertMatch(Table table) { + final List errors = new ArrayList<>(); + + if (table.getColumns().length != getColumns().length) { + errors.add("Import column count=`%d` does not match table column count=`%d`".formatted(getColumns().length, table.getColumns().length)); + } + + final Map typesByName = Arrays.stream(getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); + + for (int i = 0; i < Math.min(table.getColumns().length, getColumns().length); i++) { + final Column column = table.getColumns()[i]; + + if (!typesByName.containsKey(column.getName())) { + errors.add("Column[%s] is missing.".formatted(column.getName())); + continue; + } + + if (!typesByName.get(column.getName()).equals(column.getType())) { + errors.add("Column[%s] Types do not match %s != %s".formatted(column.getName(), typesByName.get(column.getName()), column.getType())); + } + } + + return errors; + } + + } diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java index abdd8574d1..340433ae6c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java @@ -2,13 +2,8 @@ import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; +import java.util.Iterator; -import com.bakdata.conquery.models.identifiable.Identifiable; -import com.bakdata.conquery.models.identifiable.InjectingCentralRegistry; -import com.bakdata.conquery.models.identifiable.ids.Id; -import com.bakdata.conquery.models.worker.SingletonNamespaceCollection; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; @@ -16,6 +11,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.experimental.Accessors; /** @@ -23,7 +19,7 @@ * Header then Dictionaries then Data. Only this order is possible. */ @RequiredArgsConstructor(access = AccessLevel.PACKAGE) -public class PreprocessedReader implements AutoCloseable { +public class PreprocessedReader implements AutoCloseable, Iterator { @Override public void close() throws IOException { parser.close(); @@ -40,45 +36,39 @@ public enum LastRead { @Getter private LastRead lastRead = LastRead.BEGIN; + private int bucketsRemaining; private final JsonParser parser; - private final Map, Identifiable> replacements = new HashMap<>(); public PreprocessedReader(InputStream inputStream, ObjectMapper objectMapper) throws IOException { - final InjectingCentralRegistry injectingCentralRegistry = new InjectingCentralRegistry(replacements); - final SingletonNamespaceCollection namespaceCollection = new SingletonNamespaceCollection(injectingCentralRegistry); - parser = namespaceCollection.injectIntoNew(objectMapper) - .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) - .getFactory() - .createParser(inputStream); + parser = objectMapper.copy().enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .getFactory() + .createParser(inputStream); } - public void addReplacement(Id id, Identifiable replacement) { - this.replacements.put(id, replacement); - } - - public , V extends Identifiable> void addAllReplacements(Map replacements) { - this.replacements.putAll(replacements); - } public PreprocessedHeader readHeader() throws IOException { Preconditions.checkState(lastRead.equals(LastRead.BEGIN)); final PreprocessedHeader header = parser.readValueAs(PreprocessedHeader.class); + bucketsRemaining = header.getNumberOfBuckets(); lastRead = lastRead.next(); return header; } + @Override + public boolean hasNext() { + return bucketsRemaining > 0; + } - public PreprocessedData readData() throws IOException { - Preconditions.checkState(lastRead.equals(LastRead.HEADER)); - - final PreprocessedData dictionaries = parser.readValueAs(PreprocessedData.class); + @SneakyThrows + @Override + public PreprocessedData next() { + bucketsRemaining--; - lastRead = lastRead.next(); - return dictionaries; + return parser.readValueAs(PreprocessedData.class); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java index a7908aa324..f3e31e6078 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java @@ -54,7 +54,7 @@ public static File getTaggedVersion(File file, String tag, String extension) { *

* Reads CSV file, per row extracts the primary key, then applies other transformations on each row, then compresses the data with {@link ColumnStore}. */ - public static void preprocess(PreprocessingJob preprocessingJob, ProgressBar totalProgress, ConqueryConfig config) throws IOException { + public static void preprocess(PreprocessingJob preprocessingJob, ProgressBar totalProgress, ConqueryConfig config, int buckets) throws IOException { final File preprocessedFile = preprocessingJob.getPreprocessedFile(); TableImportDescriptor descriptor = preprocessingJob.getDescriptor(); @@ -209,7 +209,7 @@ else if (errors == config.getPreprocessor().getMaximumPrintedErrors()) { exceptions.forEach((clazz, count) -> log.warn("Got {} `{}`", count, clazz.getSimpleName())); } - result.write(tmp); + result.write(tmp, buckets); if (errors > 0) { log.warn("Had {}% faulty lines ({} of ~{} lines)", String.format("%.2f", 100d * (double) errors / (double) lineId), errors, lineId); diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java index 76a5d4664f..dda525ee97 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java @@ -54,18 +54,12 @@ public DistributedNamespace( this.workerHandler = workerHandler; } - public int getBucket(String entity, int bucketSize) { - final NamespaceStorage storage = getStorage(); - return storage.getEntityBucket(entity) - .orElseGet(() -> storage.assignEntityBucket(entity, bucketSize)); - } - @Override void updateMatchingStats() { - final Collection> concepts = this.getStorage().getAllConcepts() - .stream() - .filter(concept -> concept.getMatchingStats() == null) - .collect(Collectors.toSet()); + final Collection> concepts = getStorage().getAllConcepts() + .stream() + .filter(concept -> concept.getMatchingStats() == null) + .collect(Collectors.toSet()); getWorkerHandler().sendToAll(new UpdateMatchingStatsMessage(concepts)); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java index fe15ed6524..cc6c780ddf 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.models.messages.network.MessageToShardNode; import com.codahale.metrics.SharedMetricRegistries; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Stopwatch; import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -102,8 +103,12 @@ public void waitForFreeJobQueue() throws InterruptedException { } synchronized (jobManagerSync) { - log.trace("Have to wait for free JobQueue"); + final Stopwatch waiting = Stopwatch.createStarted(); + log.trace("Shard {}, have to wait for free JobQueue (backpressure={})", session, backpressure); + jobManagerSync.wait(); + + log.debug("Shard {}, Waited {} for free JobQueue", session, waiting.stop()); } } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java index 070e546974..c6c65dae41 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java @@ -103,24 +103,24 @@ public synchronized void removeBucketAssignmentsForImportFormWorkers(@NonNull Im sendUpdatedWorkerInformation(); } - private synchronized void sendUpdatedWorkerInformation() { + public synchronized void sendUpdatedWorkerInformation() { for (WorkerInformation w : workers.values()) { w.send(new UpdateWorkerBucket(w)); } } - public synchronized void addBucketsToWorker(@NonNull WorkerId id, @NonNull Set bucketIds) { + public synchronized void registerBucketForWorker(@NonNull WorkerId id, @NonNull BucketId bucketId) { // Ensure that add and remove are not executed at the same time. // We don't make assumptions about the underlying implementation regarding thread safety WorkerToBucketsMap workerBuckets = storage.getWorkerBuckets(); + if (workerBuckets == null) { workerBuckets = createWorkerBucketsMap(); } - workerBuckets.addBucketForWorker(id, bucketIds); - storage.setWorkerToBucketsMap(workerBuckets); + workerBuckets.addBucketForWorker(id, bucketId); - sendUpdatedWorkerInformation(); + storage.setWorkerToBucketsMap(workerBuckets); } private synchronized WorkerToBucketsMap createWorkerBucketsMap() { @@ -138,11 +138,12 @@ public synchronized WorkerInformation getResponsibleWorkerForBucket(int bucket) } /** + * @return * @implNote Currently the least occupied Worker receives a new Bucket, this can change in later implementations. (For example for * dedicated Workers, or entity weightings) */ - public synchronized void addResponsibility(int bucket) { + public synchronized WorkerInformation addResponsibility(int bucket) { final WorkerInformation smallest = workers .stream() .min(Comparator.comparing(si -> si.getIncludedBuckets().size())) @@ -153,6 +154,8 @@ public synchronized void addResponsibility(int bucket) { bucket2WorkerMap.put(bucket, smallest); smallest.getIncludedBuckets().add(bucket); + + return smallest; } public void register(ShardNodeInformation node, WorkerInformation info) { @@ -192,6 +195,19 @@ public Set getBucketsForWorker(WorkerId workerId) { return workerBuckets.getBucketsForWorker(workerId); } + public synchronized WorkerInformation assignResponsibleWorker(BucketId bucket) { + + WorkerInformation responsibleWorkerForBucket = getResponsibleWorkerForBucket(bucket.getBucket()); + + if (responsibleWorkerForBucket == null) { + responsibleWorkerForBucket = addResponsibility(bucket.getBucket()); + } + + registerBucketForWorker(responsibleWorkerForBucket.getId(), bucket); + + return responsibleWorkerForBucket; + } + private record PendingReaction(UUID callerId, Set pendingWorkers, ActionReactionMessage parent) { /** diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java index def028d002..8eb023e427 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java @@ -12,13 +12,13 @@ import it.unimi.dsi.fastutil.ints.IntArraySet; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; -@Getter -@Setter -@EqualsAndHashCode(callSuper = true) +@Data +@NoArgsConstructor +@Slf4j public class WorkerInformation extends NamedImpl implements MessageSender.Transforming { @NotNull private DatasetId dataset; @@ -32,6 +32,15 @@ public class WorkerInformation extends NamedImpl implements MessageSen @Min(0) private int entityBucketSize; + public void awaitFreeJobQueue() { + try { + getConnectedShardNode().waitForFreeJobQueue(); + } + catch (InterruptedException e) { + log.error("Interrupted while waiting for worker[{}] to have free space in queue", this, e); + } + } + @Override public WorkerId createId() { diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java index 4f7b03072f..70b98cd832 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java @@ -9,7 +9,6 @@ import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.google.common.collect.ImmutableSet; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -21,23 +20,23 @@ @Getter @Setter public class WorkerToBucketsMap { - private Map> map = new ConcurrentHashMap<>(); + private Map> map = new ConcurrentHashMap<>(); - public Set getBucketsForWorker(WorkerId workerId) { - // Don't modify the underlying map here - Set buckets = map.get(workerId); - if (buckets != null) { + public Set getBucketsForWorker(WorkerId workerId) { + // Don't modify the underlying map here + Set buckets = map.get(workerId); + if (buckets != null) { // Don't allow modification return Collections.unmodifiableSet(buckets); } - return Collections.emptySet(); - } + return Collections.emptySet(); + } - public void addBucketForWorker(@NonNull WorkerId id, @NonNull Set bucketIds) { - map.computeIfAbsent(id, k -> new HashSet<>()).addAll(bucketIds); - } + public void addBucketForWorker(@NonNull WorkerId id, @NonNull BucketId bucketId) { + map.computeIfAbsent(id, k -> new HashSet<>()).add(bucketId); + } - public void removeBucketsOfImport(@NonNull ImportId importId) { - map.values().forEach(set -> set.removeIf(bucketId -> bucketId.getImp().equals(importId))); - } + public void removeBucketsOfImport(@NonNull ImportId importId) { + map.values().forEach(set -> set.removeIf(bucketId -> bucketId.getImp().equals(importId))); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java b/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java index 9f643cd18b..908aa69235 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java @@ -26,9 +26,9 @@ import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.PreviewConfig; import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.datasets.concepts.FrontEndConceptBuilder; import com.bakdata.conquery.models.datasets.concepts.filters.specific.SelectFilter; -import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild; import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; import com.bakdata.conquery.models.exceptions.ConceptConfigurationException; import com.bakdata.conquery.models.exceptions.ValidatorHelper; @@ -294,7 +294,7 @@ public ResolvedConceptsResult resolveConceptElements(TreeConcept concept, List(Collections::emptyMap)); + final ConceptElement child = concept.findMostSpecificChild(conceptCode, new CalculatedValue<>(Collections::emptyMap)); if (child != null) { resolvedCodes.add(child.getId()); diff --git a/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java b/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java index c6c12cf9e0..ce429d4d82 100644 --- a/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java +++ b/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import com.fasterxml.jackson.annotation.JsonValue; import lombok.Getter; @@ -10,11 +11,13 @@ @Slf4j public class ProgressReporterImpl implements ProgressReporter { + public long getMax() { + return max.get(); + } - @Getter(onMethod_ = @Override) - private long max = 1; - private long innerProgress = 0; - private long reservedForChildren = 0; + private final AtomicLong max = new AtomicLong(1); + private final AtomicLong innerProgress = new AtomicLong(0); + private final AtomicLong reservedForChildren = new AtomicLong(0); private final List children = new ArrayList<>(); @Getter @@ -53,7 +56,7 @@ public double getProgress() { } public long getAbsoluteProgress() { - long absoluteProgress = innerProgress; + long absoluteProgress = innerProgress.get(); for (ProgressReporterImpl child : children) { absoluteProgress += child.getAbsoluteProgress(); @@ -63,7 +66,7 @@ public long getAbsoluteProgress() { } public long getAbsoluteMax() { - long absoluteMax = max; + long absoluteMax = max.get(); for (ProgressReporterImpl child : children) { absoluteMax += child.getAbsoluteMax(); @@ -78,7 +81,7 @@ public ProgressReporter subJob(long steps) { throw new IllegalStateException("You need to start the Progress Reporter before you can add subjobs"); } - reservedForChildren += steps; + reservedForChildren.addAndGet(steps); ProgressReporterImpl childPr = new ProgressReporterImpl(); childPr.start(); @@ -98,12 +101,12 @@ public void report(int steps) { log.warn("Progress reporter was not started"); return; } - if (innerProgress + steps > max) { + if (innerProgress.get() + steps > max.get()) { log.warn("Progress({}) + ChildProgressReserve({}) + Steps({}) is bigger than the maximum Progress({}). There might be to many reports in the code.", innerProgress, reservedForChildren, steps, max); return; } - innerProgress += steps; + innerProgress.addAndGet(steps); } @Override @@ -114,12 +117,7 @@ public void setMax(long max) { return; } - if (this.max > max) { - log.warn("Max cannot be decreased."); - return; - } - - this.max = max; + this.max.set(max); } @Override @@ -138,7 +136,7 @@ public void done() { log.trace("Done was called before all steps were been reported. There might be missing reporting steps in the code."); } - innerProgress = max - reservedForChildren; + innerProgress.set(max.get() - reservedForChildren.get()); } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java index 3b64377522..ef12549871 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java @@ -17,11 +17,6 @@ import java.util.Map; import java.util.UUID; -import jakarta.ws.rs.client.Entity; -import jakarta.ws.rs.client.Invocation; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; - import com.bakdata.conquery.ConqueryConstants; import com.bakdata.conquery.apiv1.query.ConceptQuery; import com.bakdata.conquery.apiv1.query.Query; @@ -53,6 +48,10 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.univocity.parsers.csv.CsvParser; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.Invocation; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; import lombok.NonNull; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -132,10 +131,6 @@ private static void uploadTable(StandaloneSupport support, Table table) { } } - public static void importTableContents(StandaloneSupport support, RequiredTable[] tables) throws Exception { - importTableContents(support, Arrays.asList(tables)); - } - public static List generateCqpp(StandaloneSupport support, Collection tables) throws Exception { List preprocessedFiles = new ArrayList<>(); List descriptions = new ArrayList<>(); @@ -177,57 +172,49 @@ public static List generateCqpp(StandaloneSupport support, Collection> entity = Entity.entity(Entity.json(""), MediaType.APPLICATION_JSON_TYPE); final Invocation.Builder request = support.getClient() .target(addImport) .request(MediaType.APPLICATION_JSON); - try (final Response response = request - .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))) { - - assertThat(response.getStatusInfo().getFamily()) - .describedAs(new LazyTextDescription(() -> response.readEntity(String.class))) - .isEqualTo(Response.Status.Family.SUCCESSFUL); - } - } - public static void updateCqppFile(StandaloneSupport support, File cqpp, Response.Status.Family expectedResponseFamily, String expectedReason) { - assertThat(cqpp).exists(); + final Invocation invocation = update ? request.buildPut(entity) : request.buildPost(entity); - final URI addImport = HierarchyHelper.hierarchicalPath(support.defaultAdminURIBuilder(), AdminDatasetResource.class, "updateImport") - .queryParam("file", cqpp) - .buildFromMap(Map.of( - ResourceConstants.DATASET, support.getDataset().getId() - )); + log.info("sending CQPP with {}", invocation); - final Invocation.Builder request = support.getClient() - .target(addImport) - .request(MediaType.APPLICATION_JSON); - try (final Response response = request - .put(Entity.entity(Entity.json(""), MediaType.APPLICATION_JSON_TYPE))) { + try (final Response response = invocation.invoke()) { assertThat(response.getStatusInfo().getFamily()) .describedAs(new LazyTextDescription(() -> response.readEntity(String.class))) .isEqualTo(expectedResponseFamily); - assertThat(response.getStatusInfo().getReasonPhrase()) - .describedAs(new LazyTextDescription(() -> response.readEntity(String.class))) - .isEqualTo(expectedReason); } } public static void importCqppFiles(StandaloneSupport support, List cqppFiles) { for (File cqpp : cqppFiles) { - importCqppFile(support, cqpp); + uploadCqpp(support, cqpp, false, Response.Status.Family.SUCCESSFUL); } + + support.waitUntilWorkDone(); + + } public static void importTableContents(StandaloneSupport support, Collection tables) throws Exception { List cqpps = generateCqpp(support, tables); + importCqppFiles(support, cqpps); } @@ -275,6 +262,8 @@ public static void updateConcepts(StandaloneSupport support, ArrayNode rawConcep for (Concept concept : concepts) { updateConcept(support, concept, expectedResponseFamily); } + + } private static void updateConcept(@NonNull StandaloneSupport support, @NonNull Concept concept, @NonNull Response.Status.Family expectedResponseFamily) { diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java index 7e35786d72..3291d3ac4f 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java @@ -6,8 +6,6 @@ import java.io.File; import java.util.List; -import jakarta.ws.rs.core.Response; - import com.bakdata.conquery.ConqueryConstants; import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.commands.ShardNode; @@ -32,6 +30,7 @@ import com.bakdata.conquery.util.support.StandaloneSupport; import com.bakdata.conquery.util.support.TestConquery; import com.github.powerlibraries.io.In; +import jakarta.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -78,7 +77,9 @@ public void execute(String name, TestConquery testConquery) throws Exception { assertThat(cqpps.size()).isEqualTo(tables.size()); LoadingUtil.importCqppFiles(conquery, List.of(cqpps.get(0))); + conquery.waitUntilWorkDone(); + } final Query query = IntegrationUtils.parseQuery(conquery, test.getRawQuery()); @@ -125,7 +126,7 @@ public void execute(String name, TestConquery testConquery) throws Exception { } //Try to update an import that does not exist should throw a Not-Found Webapplication Exception - LoadingUtil.updateCqppFile(conquery, cqpps.get(1), Response.Status.Family.CLIENT_ERROR, "Not Found"); + LoadingUtil.uploadCqpp(conquery, cqpps.get(1), true, Response.Status.Family.CLIENT_ERROR); conquery.waitUntilWorkDone(); //Load manually new data for import and update the concerned import @@ -170,7 +171,10 @@ public void execute(String name, TestConquery testConquery) throws Exception { log.info("updating import"); //correct update of the import - LoadingUtil.updateCqppFile(conquery, newPreprocessedFile, Response.Status.Family.SUCCESSFUL, "No Content"); + LoadingUtil.uploadCqpp(conquery, newPreprocessedFile, true, Response.Status.Family.SUCCESSFUL); + conquery.waitUntilWorkDone(); + + conquery.waitUntilWorkDone(); } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java index 5b3e8280fc..d1f0360479 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java @@ -148,6 +148,8 @@ public void execute(String name, TestConquery testConquery) throws Exception { test.executeTest(support); + storage = support.getMetaStorage(); + {// Auth actual tests User userStored = storage.getUser(user.getId()); assertThat(userStored).isEqualTo(user); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java index 8795093de6..0b1e1ee887 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java @@ -114,9 +114,13 @@ public void execute(String name, TestConquery testConquery) throws Exception { // To perform the update, the old concept will be deleted first and the new concept will be added. That means the deletion of concept is also covered here { log.info("Executing update"); + LoadingUtil.updateConcepts(conquery, test2.getRawConcepts(), Response.Status.Family.SUCCESSFUL); conquery.waitUntilWorkDone(); + log.info("Update executed"); + + } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java index e123998d0c..5536f967cf 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java @@ -10,9 +10,6 @@ import java.util.Map; import java.util.zip.GZIPInputStream; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; - import com.bakdata.conquery.ConqueryConstants; import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.commands.ShardNode; @@ -42,6 +39,8 @@ import com.bakdata.conquery.util.support.StandaloneSupport; import com.bakdata.conquery.util.support.TestConquery; import com.github.powerlibraries.io.In; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -242,6 +241,9 @@ public void execute(String name, TestConquery testConquery) throws Exception { //import preprocessedFiles conquery.getDatasetsProcessor().addImport(conquery.getNamespace(), new GZIPInputStream(new FileInputStream(preprocessedFile))); conquery.waitUntilWorkDone(); + + + conquery.waitUntilWorkDone(); } // State after reimport. diff --git a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java index 2ae22e13bf..99e0d08580 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java @@ -220,7 +220,7 @@ public void bucketCompoundDateRange() throws JSONException, IOException { ColumnStore startStore = new IntegerDateStore(new ShortArrayStore(new short[]{1, 2, 3, 4}, Short.MIN_VALUE)); ColumnStore endStore = new IntegerDateStore(new ShortArrayStore(new short[]{5, 6, 7, 8}, Short.MIN_VALUE)); - Bucket bucket = new Bucket(0, 4, new ColumnStore[]{startStore, endStore, compoundStore}, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), imp); + Bucket bucket = new Bucket(0, new ColumnStore[]{startStore, endStore, compoundStore}, Object2IntMaps.singleton("0", 0), Object2IntMaps.singleton("0", 4),4, imp); compoundStore.setParent(bucket); @@ -583,7 +583,7 @@ public void serialize() throws IOException, JSONException { final Import imp = new Import(table); imp.setName("import"); - final Bucket bucket = new Bucket(0, 0, new ColumnStore[0], Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), imp); + final Bucket bucket = new Bucket(0, new ColumnStore[0], Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(),0, imp); final CBlock cBlock = CBlock.createCBlock(connector, bucket, 10); diff --git a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java index 3afe94bb22..97cef7a5df 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java +++ b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java @@ -8,7 +8,6 @@ import java.util.Random; import java.util.function.Supplier; import java.util.stream.Stream; -import jakarta.validation.Validator; import com.bakdata.conquery.io.jackson.Injectable; import com.bakdata.conquery.io.jackson.Jackson; @@ -16,6 +15,7 @@ import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.events.MajorTypeId; import com.bakdata.conquery.models.exceptions.ConfigurationException; import com.bakdata.conquery.models.exceptions.JSONException; @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.github.powerlibraries.io.In; import io.dropwizard.jersey.validation.Validators; +import jakarta.validation.Validator; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.parallel.Execution; @@ -111,11 +112,11 @@ public static void init() throws IOException, JSONException, ConfigurationExcept public void basic(String key, CalculatedValue> rowMap) throws JSONException { log.trace("Searching for {}", key); - ConceptTreeChild idxResult = indexedConcept.findMostSpecificChild(key, rowMap); - ConceptTreeChild oldResult = oldConcept.findMostSpecificChild(key, rowMap); + ConceptElement idxResult = indexedConcept.findMostSpecificChild(key, rowMap); + ConceptElement oldResult = oldConcept.findMostSpecificChild(key, rowMap); assertThat(oldResult.getId()).describedAs("%s hierarchical name", key).isEqualTo(idxResult.getId()); } -} \ No newline at end of file +} diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java b/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java index 49042dada3..8f3410c085 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java @@ -73,8 +73,10 @@ public void preprocessTmp(File tmpDir, List descriptions) throws Exception Map.of( "in", tmpDir, "out", tmpDir, - "desc", descriptions - + "desc", descriptions, + "buckets", 10, + "strict", true, + "fast-fail", true ) );