Skip to content

Commit

Permalink
move ConceptTreeCaches into BucketManager (#3598)
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok authored Oct 15, 2024
1 parent 985bf94 commit 64504ca
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
Expand All @@ -21,7 +20,6 @@
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptElementId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.query.PrintSettings;
import com.bakdata.conquery.util.CalculatedValue;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -49,8 +47,6 @@ public class TreeConcept extends Concept<ConceptTreeConnector> implements Concep

@JsonIgnore
private final List<ConceptTreeNode<?>> localIdMap = new ArrayList<>();
@JsonIgnore
private final Map<ImportId, ConceptTreeCache> caches = new ConcurrentHashMap<>();
@Getter
@Setter
@Valid
Expand All @@ -73,10 +69,6 @@ public Concept<?> findConcept() {
return getConcept();
}

public ConceptTreeCache getCache(ImportId imp) {
return caches.get(imp);
}

@Override
public ConceptTreeNode<?> getParent() {
return null;
Expand Down Expand Up @@ -144,13 +136,7 @@ private ConceptTreeChild findMostSpecificChild(
return best;
}

public void initializeIdCache(ImportId importId) {
caches.computeIfAbsent(importId, id -> new ConceptTreeCache(this));
}

public void removeImportCache(ImportId imp) {
caches.remove(imp);
}

/**
* Method to get the element of this concept tree that has the specified local ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.ids.specific.BucketId;
import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConnectorId;
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
Expand Down Expand Up @@ -69,6 +72,8 @@ public class BucketManager {
@Getter
private final int entityBucketSize;

private final Map<ConceptId, Map<ImportId, ConceptTreeCache>> treeCaches = new ConcurrentHashMap<>();

public static BucketManager create(Worker worker, WorkerStorage storage, int entityBucketSize) {
final Map<ConnectorId, Int2ObjectMap<Map<BucketId, CBlockId>>> connectorCBlocks = new HashMap<>();
final Map<TableId, Int2ObjectMap<List<BucketId>>> tableBuckets = new HashMap<>();
Expand All @@ -85,9 +90,7 @@ public static BucketManager create(Worker worker, WorkerStorage storage, int ent
registerBucket(bucket, entity2Bucket, tableBuckets);
});

storage.getAllCBlocks().forEach(cBlock ->
registerCBlock(cBlock, connectorCBlocks)
);
storage.getAllCBlocks().forEach(cBlock -> registerCBlock(cBlock, connectorCBlocks));

return new BucketManager(worker.getJobManager(), storage, worker, entity2Bucket, connectorCBlocks, tableBuckets, entityBucketSize);
}
Expand All @@ -107,10 +110,9 @@ private static void registerBucket(Bucket bucket, Object2IntMap<String> entity2B
entity2Bucket.put(entity, bucket.getBucket());
}

tableBuckets
.computeIfAbsent(bucket.getTable(), id -> new Int2ObjectAVLTreeMap<>())
.computeIfAbsent(bucket.getBucket(), n -> new ArrayList<>())
.add(bucket.getId());
tableBuckets.computeIfAbsent(bucket.getTable(), id -> new Int2ObjectAVLTreeMap<>())
.computeIfAbsent(bucket.getBucket(), n -> new ArrayList<>())
.add(bucket.getId());
}

/**
Expand All @@ -127,9 +129,7 @@ private static void registerCBlock(CBlock cBlock, Map<ConnectorId, Int2ObjectMap
public void fullUpdate() {
final CalculateCBlocksJob job = new CalculateCBlocksJob(storage, this, worker.getJobsExecutorService());

storage.getAllConcepts()
.filter(TreeConcept.class::isInstance)
.flatMap(concept -> concept.getConnectors().stream().map(ConceptTreeConnector.class::cast))
storage.getAllConcepts().filter(TreeConcept.class::isInstance).flatMap(concept -> concept.getConnectors().stream().map(ConceptTreeConnector.class::cast))

.forEach(connector -> storage.getAllBucketIds().forEach(bucketId -> {

Expand Down Expand Up @@ -182,23 +182,16 @@ public void removeTable(TableId table) {

// It's possible no buckets were registered yet
if (removed != null) {
removed.values()
.stream()
.flatMap(List::stream)
.forEach(this::removeBucket);
removed.values().stream().flatMap(List::stream).forEach(this::removeBucket);
}

storage.removeTable(table);
}

public void removeBucket(BucketId bucket) {
storage.getAllCBlockIds()
.filter(cblock -> cblock.getBucket().equals(bucket))
.forEach(this::removeCBlock);
storage.getAllCBlockIds().filter(cblock -> cblock.getBucket().equals(bucket)).forEach(this::removeCBlock);

tableToBuckets.getOrDefault(bucket.getImp().getTable(), Int2ObjectMaps.emptyMap())
.getOrDefault(bucket.getBucket(), Collections.emptyList())
.remove(bucket);
tableToBuckets.getOrDefault(bucket.getImp().getTable(), Int2ObjectMaps.emptyMap()).getOrDefault(bucket.getBucket(), Collections.emptyList()).remove(bucket);

storage.removeBucket(bucket);
}
Expand All @@ -221,23 +214,20 @@ public Set<String> getEntities() {
* Remove all buckets comprising the import. Which will in-turn remove all CBLocks.
*/
public void removeImport(ImportId imp) {
storage.getAllBucketIds()
.filter(bucket -> bucket.getImp().equals(imp))
.forEach(this::removeBucket);
storage.getAllBucketIds().filter(bucket -> bucket.getImp().equals(imp)).forEach(this::removeBucket);


storage.getAllConcepts()
.filter(TreeConcept.class::isInstance)
.forEach(concept -> ((TreeConcept) concept).removeImportCache(imp));
.forEach(concept -> removeConceptTreeCacheByImport(concept.getId(), imp));

storage.removeImport(imp);
}

public List<BucketId> getEntityBucketsForTable(Entity entity, TableId table) {
final int bucketId = getBucket(entity.getId());

return tableToBuckets.getOrDefault(table, Int2ObjectMaps.emptyMap())
.getOrDefault(bucketId, Collections.emptyList());
return tableToBuckets.getOrDefault(table, Int2ObjectMaps.emptyMap()).getOrDefault(bucketId, Collections.emptyList());
}

private int getBucket(String id) {
Expand Down Expand Up @@ -276,14 +266,12 @@ public Set<String> getEntitiesWithConcepts(Collection<ConceptElement<?>> concept
public Map<BucketId, CBlockId> getEntityCBlocksForConnector(Entity entity, ConnectorId connector) {
final int bucketId = getBucket(entity.getId());

return connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap())
.getOrDefault(bucketId, Collections.emptyMap());
return connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap()).getOrDefault(bucketId, Collections.emptyMap());
}

public boolean hasEntityCBlocksForConnector(Entity entity, ConnectorId connector) {
final int bucketId = getBucket(entity.getId());
final Map<BucketId, CBlockId> cblocks = connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap())
.getOrDefault(bucketId, Collections.emptyMap());
final Map<BucketId, CBlockId> cblocks = connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap()).getOrDefault(bucketId, Collections.emptyMap());

for (BucketId bucket : cblocks.keySet()) {
if (bucket.resolve().containsEntity(entity.getId())) {
Expand Down Expand Up @@ -311,13 +299,12 @@ public void removeConcept(Concept<?> concept) {

// It's possible that no data has been loaded yet
if (removed != null) {
removed.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.forEach(storage::removeCBlock);
removed.values().stream().map(Map::values).flatMap(Collection::stream).forEach(storage::removeCBlock);
}
}

removeConceptTreeCacheByConcept(concept.getId());

storage.removeConcept(concept.getId());
}

Expand All @@ -341,4 +328,16 @@ public void addConcept(Concept<?> concept) {
}


public ConceptTreeCache getConceptTreeCache(TreeConcept concept, ImportId imp) {
return treeCaches.computeIfAbsent(concept.getId(), (ignored) -> new ConcurrentHashMap<>()).computeIfAbsent(imp, (ignored) -> new ConceptTreeCache(concept));
}

public void removeConceptTreeCacheByImport(ConceptId concept, ImportId imp) {
treeCaches.get(concept).remove(imp);
}

public void removeConceptTreeCacheByConcept(ConceptId concept) {
treeCaches.remove(concept);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ public static long estimateMemoryBytes(long entities, long entries, double depth
);
}

public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, int bucketSize) {
public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, BucketManager bucketManager) {
final int bucketSize = bucketManager.getEntityBucketSize();
final int root = bucket.getBucket() * bucketSize;

final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector);
final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector, bucketManager);
//TODO Object2LongMap
final Map<String, Long> includedConcepts = calculateConceptElementPathBloomFilter(bucketSize, bucket, mostSpecificChildren);
final Map<String, CDateRange> entitySpans = calculateEntityDateIndices(bucket);
Expand All @@ -105,12 +106,12 @@ public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket,
* Calculates the path for each event from the root of the {@link TreeConcept} to the most specific {@link ConceptTreeChild}
* denoted by the individual {@link ConceptTreeChild#getPrefix()}.
*/
private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector) {
private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector, BucketManager bucketManager) {
if (connector.getColumn() == null) {
return calculateSpecificChildrenPathsWithoutColumn(bucket, connector);
}

return calculateSpecificChildrenPathsWithColumn(bucket, connector);
return calculateSpecificChildrenPathsWithColumn(bucket, connector, bucketManager);
}

/**
Expand Down Expand Up @@ -244,13 +245,11 @@ private static int[][] calculateSpecificChildrenPathsWithoutColumn(Bucket bucket
* Calculates the path for each event from the root of the {@link TreeConcept} to the most specific {@link ConceptTreeChild}
* denoted by the individual {@link ConceptTreeChild#getPrefix()}.
*/
private static int[][] calculateSpecificChildrenPathsWithColumn(Bucket bucket, ConceptTreeConnector connector) {
private static int[][] calculateSpecificChildrenPathsWithColumn(Bucket bucket, ConceptTreeConnector connector, BucketManager bucketManager) {

final Column column = connector.getColumn().resolve();

connector.getConcept().initializeIdCache(bucket.getImp());

final ConceptTreeCache cache = connector.getConcept().getCache(bucket.getImp());
final ConceptTreeCache cache = bucketManager.getConceptTreeCache(connector.getConcept(), bucket.getImp());
final int[] rootPrefix = connector.getConcept().getPrefix();

final IntFunction<Map<String, Object>> mapCalculator = bucket.mapCalculator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void run() {

log.trace("BEGIN calculating CBlock for {}", getCBlockId());

final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize());
final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager);

log.trace("DONE calculating CBlock for {}", getCBlockId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,7 @@ public void cBlock() throws IOException, JSONException {

workerStorage.addBucket(bucket);

final CBlock cBlock = CBlock.createCBlock(connector, bucket, 10);

final CBlock cBlock = new CBlock(bucket.getId(), connector.getId(), 0, Collections.emptyMap(), Collections.emptyMap(), new int[0][]);

SerializationTestUtil.forType(CBlock.class)
.objectMappers(getShardInternalMapper())
Expand Down

0 comments on commit 64504ca

Please sign in to comment.