Skip to content

Commit

Permalink
Split imports on preprocess (#3389)
Browse files Browse the repository at this point in the history
Implementation of buckets split at preprocess:

Prior implementation split data on import, which meant loading large amounts of data only to chunk them. This merge offloads that responsibility onto the preprocess using stable hashing of the entities ID. The hashing is dependent on the number of buckets, defined at preprocess.

This Merge serves to reduce the memory requirements of the manager node, and improve upload time. It also lays the foundation to make preprocessing more memory efficient by compressing buckets separetly.
  • Loading branch information
awildturtok authored Aug 5, 2024
1 parent c766d18 commit bcad24d
Show file tree
Hide file tree
Showing 39 changed files with 660 additions and 699 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jetbrains.annotations.NotNull;

@Slf4j
@FieldNameConstants
Expand All @@ -52,7 +53,7 @@ public class PreprocessorCommand extends ConqueryCommand {
private final List<String> failed = Collections.synchronizedList(new ArrayList<>());
private final List<String> success = Collections.synchronizedList(new ArrayList<>());
private ExecutorService pool;
private boolean isFailFast = false;
private boolean isFailFast;
private boolean isStrict = true;

public PreprocessorCommand() {
Expand All @@ -71,14 +72,14 @@ public static boolean requiresProcessing(PreprocessingJob preprocessingJob) {

log.info("EXISTS ALREADY");

int currentHash = preprocessingJob.getDescriptor()
.calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag());
final int currentHash = preprocessingJob.getDescriptor()
.calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag());


final ObjectMapper om = Jackson.BINARY_MAPPER.copy();
try (final PreprocessedReader parser = new PreprocessedReader(new GZIPInputStream(new FileInputStream(preprocessingJob.getPreprocessedFile())), om)) {

PreprocessedHeader header = parser.readHeader();
final PreprocessedHeader header = parser.readHeader();

if (header.getValidityHash() == currentHash) {
log.info("\tHASH STILL VALID");
Expand Down Expand Up @@ -133,13 +134,18 @@ public void configure(Subparser subparser) {
group.addArgument("--fast-fail")
.action(Arguments.storeTrue())
.setDefault(false)
.help("Stop preprocessing and exit with failure if an error occures that prevents the generation of a cqpp.");
.help("Stop preprocessing and exit with failure if an error occurs that prevents the generation of a cqpp.");

group.addArgument("--strict")
.type(new BooleanArgumentType())
.setDefault(true)
.help("Escalate missing files to errors.");

group.addArgument("--buckets")
.type(Integer.class)
.setDefault(100)
.help("Number of buckets to use for id-hashing. This value is required to be a constant per-dataset.");

}

@Override
Expand All @@ -150,41 +156,49 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig

// Tag if present is appended to input-file csvs, output-file cqpp and used as id of cqpps

// Seems to be a bug with dropwizard and boolean default-values
isFailFast = Optional.ofNullable(namespace.getBoolean("fast-fail")).orElse(false);
isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(true);
isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(false);

final List<String> tags = namespace.<String>getList("tag");
final List<String> tags = namespace.getList("tag");

final File inDir = namespace.get("in");
final File outDir = namespace.get("out");
final List<File> descriptionFiles = namespace.<File>getList("desc");
final List<File> descriptionFilesRoot = namespace.getList("desc");
final int buckets = namespace.getInt("buckets");


log.info("Preprocessing from command line config.");

final Collection<PreprocessingJob> jobs = new ArrayList<>();
final Collection<PreprocessingJob> jobs = collectJobs(descriptionFilesRoot, tags, inDir, outDir, environment);

if (tags == null || tags.isEmpty()) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> descriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator());
jobs.addAll(descriptions);
}
final List<PreprocessingJob> broken = validateJobs(jobs, environment);

jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing));

preprocessJobs(jobs, buckets, config);


log.info("Successfully Preprocess {} Jobs:", success.size());
success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc));

if (!broken.isEmpty()) {
log.warn("Did not find {} Files", broken.size());
broken.forEach(desc -> log.warn("\tDid not find file for {}", desc));
}
else {
for (String tag : tags) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> jobDescriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator());

jobs.addAll(jobDescriptions);
}
}
if (isFailed()) {
log.error("Failed {} Preprocessing Jobs:", failed.size());
failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc));
doFail();
}
}

List<PreprocessingJob> broken = new ArrayList<>();
@NotNull
private List<PreprocessingJob> validateJobs(Collection<PreprocessingJob> jobs, Environment environment) {
final List<PreprocessingJob> broken = new ArrayList<>();

for (Iterator<PreprocessingJob> iterator = jobs.iterator(); iterator.hasNext(); ) {
for (final Iterator<PreprocessingJob> iterator = jobs.iterator(); iterator.hasNext(); ) {
final PreprocessingJob job = iterator.next();

try {
Expand Down Expand Up @@ -213,22 +227,48 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
log.error("FAILED Preprocessing, files are missing or invalid.");
doFail();
}
return broken;
}

jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing));
@NotNull
private Collection<PreprocessingJob> collectJobs(List<File> descriptionFiles, List<String> tags, File inDir, File outDir, Environment environment)
throws IOException {
final Collection<PreprocessingJob> jobs = new ArrayList<>();

if (tags == null || tags.isEmpty()) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> descriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator());
jobs.addAll(descriptions);
}
}
else {
for (String tag : tags) {
for (File desc : descriptionFiles) {
final List<PreprocessingJob> jobDescriptions =
findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator());

jobs.addAll(jobDescriptions);
}
}
}
return jobs;
}

private void preprocessJobs(Collection<PreprocessingJob> jobs, int buckets, ConqueryConfig config) throws InterruptedException {
final long totalSize = jobs.stream()
.mapToLong(PreprocessingJob::estimateTotalCsvSizeBytes)
.sum();

log.info("Required to preprocess {} in total", BinaryByteUnit.format(totalSize));

ProgressBar totalProgress = new ProgressBar(totalSize, System.out);
final ProgressBar totalProgress = new ProgressBar(totalSize, System.out);

for (PreprocessingJob job : jobs) {
pool.submit(() -> {
ConqueryMDC.setLocation(job.toString());
try {
Preprocessor.preprocess(job, totalProgress, config);
Preprocessor.preprocess(job, totalProgress, config, buckets);
success.add(job.toString());
}
catch (FileNotFoundException e) {
Expand All @@ -246,23 +286,6 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
pool.awaitTermination(24, TimeUnit.HOURS);

ConqueryMDC.clearLocation();


if (!success.isEmpty()) {
log.info("Successfully Preprocess {} Jobs:", success.size());
success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc));
}

if (!broken.isEmpty()) {
log.warn("Did not find {} Files", broken.size());
broken.forEach(desc -> log.warn("\tDid not find file for {}", desc));
}

if (isFailed()) {
log.error("Failed {} Preprocessing Jobs:", failed.size());
failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc));
doFail();
}
}

private void addMissing(PreprocessingJob job) {
Expand All @@ -281,7 +304,7 @@ private void addFailed(PreprocessingJob job) {

public List<PreprocessingJob> findPreprocessingDescriptions(File descriptionFiles, File inDir, File outputDir, Optional<String> tag, Validator validator)
throws IOException {
List<PreprocessingJob> out = new ArrayList<>();
final List<PreprocessingJob> out = new ArrayList<>();

final File[] files = descriptionFiles.isFile()
? new File[]{descriptionFiles}
Expand All @@ -302,8 +325,7 @@ private boolean isFailed() {
return !failed.isEmpty();
}

private Optional<PreprocessingJob> tryExtractDescriptor(Validator validator, Optional<String> tag, File descriptionFile, File outputDir, File csvDir)
throws IOException {
private Optional<PreprocessingJob> tryExtractDescriptor(Validator validator, Optional<String> tag, File descriptionFile, File outputDir, File csvDir) {
try {
final TableImportDescriptor
descriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Collection;
import java.util.Objects;
import java.util.OptionalInt;

import com.bakdata.conquery.io.storage.xodus.stores.CachedStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
Expand Down Expand Up @@ -109,22 +108,13 @@ public int getNumberOfEntities() {
return entity2Bucket.count();
}

public OptionalInt getEntityBucket(String entity) {
final Integer bucket = entity2Bucket.get(entity);

if(bucket == null){
return OptionalInt.empty();
}

return OptionalInt.of(bucket);
public boolean containsEntity(String entity) {
return entity2Bucket.get(entity) != null;
}

public int assignEntityBucket(String entity, int bucketSize) {
final int bucket = (int) Math.ceil((1d + getNumberOfEntities()) / (double) bucketSize);

entity2Bucket.add(entity, bucket);

return bucket;
public void registerEntity(String entity, int bucket) {
entity2Bucket.update(entity, bucket);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private void decorateCBlockStore(IdentifiableStore<CBlock> baseStoreCreator) {


public void addCBlock(CBlock cBlock) {
log.debug("Adding CBlock[{}]", cBlock.getId());
log.trace("Adding CBlock[{}]", cBlock.getId());
cBlocks.add(cBlock);
}

Expand All @@ -81,7 +81,7 @@ public CBlock getCBlock(CBlockId id) {
}

public void removeCBlock(CBlockId id) {
log.debug("Removing CBlock[{}]", id);
log.trace("Removing CBlock[{}]", id);
cBlocks.remove(id);
}

Expand All @@ -90,7 +90,7 @@ public Collection<CBlock> getAllCBlocks() {
}

public void addBucket(Bucket bucket) {
log.debug("Adding Bucket[{}]", bucket.getId());
log.trace("Adding Bucket[{}]", bucket.getId());
buckets.add(bucket);
}

Expand All @@ -99,7 +99,7 @@ public Bucket getBucket(BucketId id) {
}

public void removeBucket(BucketId id) {
log.debug("Removing Bucket[{}]", id);
log.trace("Removing Bucket[{}]", id);
buckets.remove(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public void add(KEY key, VALUE value) {

@Override
public VALUE get(KEY key) {
// TODO: 08.01.2020 fk: This assumes that all values have been read at some point!
return cache.get(key);
return cache.computeIfAbsent(key, store::get);
}

@Override
Expand Down
Loading

0 comments on commit bcad24d

Please sign in to comment.