Skip to content

Commit

Permalink
Implement Select filter value auto-completion for SQL mode (#3277)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld authored Jan 24, 2024
1 parent d9b1016 commit 867236f
Show file tree
Hide file tree
Showing 27 changed files with 350 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.Manager;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.auth.AuthorizationController;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormScanner;
import com.bakdata.conquery.models.i18n.I18n;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.resources.ResourcesProvider;
import com.bakdata.conquery.resources.admin.AdminServlet;
Expand Down Expand Up @@ -244,19 +246,21 @@ public void loadNamespaces() {


ExecutorService loaders = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
DatasetRegistry<? extends Namespace> registry = getDatasetRegistry();

// Namespaces load their storage themselves, so they can inject Namespace relevant objects into stored objects
final Collection<NamespaceStorage> namespaceStorages = getConfig().getStorage().discoverNamespaceStorages();
StorageHandler storageHandler = registry.getStorageHandler();
final Collection<NamespaceStorage> namespaceStorages = getConfig().getStorage().discoverNamespaceStorages(storageHandler);
for (NamespaceStorage namespaceStorage : namespaceStorages) {
loaders.submit(() -> {
getDatasetRegistry().createNamespace(namespaceStorage);
registry.createNamespace(namespaceStorage);
});
}


loaders.shutdown();
while (!loaders.awaitTermination(1, TimeUnit.MINUTES)) {
final int coundLoaded = getDatasetRegistry().getDatasets().size();
final int coundLoaded = registry.getDatasets().size();
log.debug("Waiting for Worker namespaces to load. {} are already finished. {} pending.", coundLoaded, namespaceStorages.size()
- coundLoaded);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.bakdata.conquery.ConqueryConstants;
import com.bakdata.conquery.io.storage.xodus.stores.KeyIncludingStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.config.StoreFactory;
import com.bakdata.conquery.models.datasets.PreviewConfig;
import com.bakdata.conquery.models.datasets.concepts.StructureNode;
Expand Down Expand Up @@ -40,8 +41,8 @@ public class NamespaceStorage extends NamespacedStorage {

protected SingletonStore<Dictionary> primaryDictionary;

public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator) {
super(storageFactory, pathName, validator);
public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator, StorageHandler storageHandler) {
super(storageFactory, pathName, validator, storageHandler);
}

public EncodedDictionary getPrimaryDictionary() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.bakdata.conquery.io.storage.xodus.stores.KeyIncludingStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.config.StoreFactory;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Dataset;
Expand Down Expand Up @@ -52,6 +53,9 @@ public abstract class NamespacedStorage extends ConqueryStorage {
@Getter
private final StoreFactory storageFactory;

@Getter
private final StorageHandler storageHandler;

@Getter
private final Validator validator;

Expand All @@ -62,10 +66,11 @@ public abstract class NamespacedStorage extends ConqueryStorage {
protected IdentifiableStore<Import> imports;
protected IdentifiableStore<Concept<?>> concepts;

public NamespacedStorage(StoreFactory storageFactory, String pathName, Validator validator) {
public NamespacedStorage(StoreFactory storageFactory, String pathName, Validator validator, StorageHandler storageHandler) {
this.pathName = pathName;
this.storageFactory = storageFactory;
this.validator = validator;
this.storageHandler = storageHandler;
}

public void openStores(ObjectMapper objectMapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.bakdata.conquery.io.storage.xodus.stores.KeyIncludingStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.cluster.ClusterStorageHandler;
import com.bakdata.conquery.models.config.StoreFactory;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.events.Bucket;
Expand All @@ -28,7 +29,7 @@ public class WorkerStorage extends NamespacedStorage {
private IdentifiableStore<CBlock> cBlocks;

public WorkerStorage(StoreFactory storageFactory, Validator validator, String pathName) {
super(storageFactory, pathName, validator);
super(storageFactory, pathName, validator, new ClusterStorageHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import javax.validation.Validator;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.mode.cluster.ClusterStorageHandler;
import com.bakdata.conquery.mode.local.SqlStorageHandler;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import io.dropwizard.setup.Environment;

/**
Expand All @@ -27,16 +32,39 @@ static InternalObjectMapperCreator newInternalObjectMapperCreator(ConqueryConfig
return new InternalObjectMapperCreator(config, validator);
}

static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(NamespaceHandler<N> namespaceHandler, ConqueryConfig config,
InternalObjectMapperCreator creator) {
static DatasetRegistry<DistributedNamespace> createDistributedDatasetRegistry(
NamespaceHandler<DistributedNamespace> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator
) {
ClusterStorageHandler storageHandler = new ClusterStorageHandler();
return createDatasetRegistry(namespaceHandler, creator, storageHandler, config);
}

static DatasetRegistry<LocalNamespace> createLocalDatasetRegistry(
NamespaceHandler<LocalNamespace> namespaceHandler,
ConqueryConfig config,
InternalObjectMapperCreator creator,
SqlExecutionService sqlExecutionService
) {
SqlStorageHandler storageHandler = new SqlStorageHandler(sqlExecutionService);
return createDatasetRegistry(namespaceHandler, creator, storageHandler, config);
}

private static <N extends Namespace> DatasetRegistry<N> createDatasetRegistry(
NamespaceHandler<N> namespaceHandler,
InternalObjectMapperCreator creator,
StorageHandler storageHandler,
ConqueryConfig config
) {
final IndexService indexService = new IndexService(config.getCsv().createCsvParserSettings(), config.getIndex().getEmptyLabel());
DatasetRegistry<N> datasetRegistry = new DatasetRegistry<>(
config.getCluster().getEntityBucketSize(),
config,
creator,
namespaceHandler,
indexService
indexService,
storageHandler
);
MetaStorage storage = new MetaStorage(config.getStorage(), datasetRegistry);
datasetRegistry.setMetaStorage(storage);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.bakdata.conquery.mode;

import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.models.datasets.Column;

public interface StorageHandler {

Stream<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm
final InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
final ClusterState clusterState = new ClusterState();
final NamespaceHandler<DistributedNamespace> namespaceHandler = new ClusterNamespaceHandler(clusterState, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);
final DatasetRegistry<DistributedNamespace> datasetRegistry = ManagerProvider.createDistributedDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);

final ClusterConnectionManager connectionManager =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.bakdata.conquery.mode.cluster;

import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.ImportColumn;
import com.bakdata.conquery.models.events.stores.root.StringStore;

public class ClusterStorageHandler implements StorageHandler {

@Override
public Stream<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
return namespaceStorage.getAllImports().stream()
.filter(imp -> imp.getTable().equals(column.getTable()))
.flatMap(imp -> {
final ImportColumn importColumn = imp.getColumns()[column.getPosition()];
return ((StringStore) importColumn.getTypeDescription()).iterateValues();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.bakdata.conquery.sql.conversion.dialect.HanaSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.PostgreSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import io.dropwizard.setup.Environment;
import org.jooq.DSLContext;

Expand All @@ -34,8 +36,14 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env
DSLContext dslContext = DslContextFactory.create(sqlConnectorConfig);
SqlDialect sqlDialect = createSqlDialect(sqlConnectorConfig, dslContext);
SqlContext sqlContext = new SqlContext(sqlConnectorConfig, sqlDialect);
NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, creator);

SqlExecutionService sqlExecutionService = new SqlExecutionService(
sqlDialect.getDSLContext(),
ResultSetProcessorFactory.create(sqlDialect)
);

NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext, sqlExecutionService);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator, sqlExecutionService);
creator.init(datasetRegistry);

return new DelegateManager<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.sql.SqlContext;
import com.bakdata.conquery.sql.conquery.SqlExecutionManager;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
Expand All @@ -20,16 +21,18 @@ public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {
private final ConqueryConfig config;
private final InternalObjectMapperCreator mapperCreator;
private final SqlContext sqlContext;
private final SqlExecutionService sqlExecutionService;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService) {
NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, mapperCreator, indexService);
ExecutionManager executionManager = new SqlExecutionManager(sqlContext, metaStorage);
ExecutionManager executionManager = new SqlExecutionManager(sqlContext, sqlExecutionService, metaStorage);
return new LocalNamespace(
namespaceData.getPreprocessMapper(),
namespaceData.getCommunicationMapper(),
namespaceStorage,
executionManager,
sqlExecutionService,
namespaceData.getJobManager(),
namespaceData.getFilterSearch(),
namespaceData.getIndexService(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.bakdata.conquery.mode.local;

import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.jooq.Record1;
import org.jooq.Select;
import org.jooq.impl.DSL;

@Slf4j
public class SqlStorageHandler implements StorageHandler {

private final SqlExecutionService sqlExecutionService;
private final DSLContext dslContext;

public SqlStorageHandler(SqlExecutionService sqlExecutionService) {
this.sqlExecutionService = sqlExecutionService;
this.dslContext = sqlExecutionService.getDslContext();
}

@Override
public Stream<String> lookupColumnValues(NamespaceStorage namespaceStorage, Column column) {
Select<Record1<Object>> columValuesQuery = dslContext.selectDistinct(DSL.field(DSL.name(column.getName())))
.from(DSL.table(DSL.name(column.getTable().getName())));
return queryForDistinctValues(columValuesQuery);
}

private Stream<String> queryForDistinctValues(Select<Record1<Object>> columValuesQuery) {
try {
return sqlExecutionService.fetchStream(columValuesQuery)
.map(record -> record.get(0, String.class))
// the database might return null or a blank string as a distinct value
.filter(value -> value != null && !value.isBlank());
}
catch (Exception e) {
log.error("Expecting exactly 1 column in Result when querying for distinct values of a column. Query: {}.", columValuesQuery, e);
}
return Stream.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.auth.entities.Group;
import com.bakdata.conquery.models.auth.entities.Role;
import com.bakdata.conquery.models.auth.entities.User;
Expand Down Expand Up @@ -37,7 +38,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "type")
public interface StoreFactory {

Collection<NamespaceStorage> discoverNamespaceStorages();
Collection<NamespaceStorage> discoverNamespaceStorages(StorageHandler storageHandler);

Collection<WorkerStorage> discoverWorkerStorages();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.bakdata.conquery.io.storage.xodus.stores.StoreInfo;
import com.bakdata.conquery.io.storage.xodus.stores.WeakCachedStore;
import com.bakdata.conquery.io.storage.xodus.stores.XodusStore;
import com.bakdata.conquery.mode.StorageHandler;
import com.bakdata.conquery.models.auth.entities.Group;
import com.bakdata.conquery.models.auth.entities.Role;
import com.bakdata.conquery.models.auth.entities.User;
Expand Down Expand Up @@ -93,7 +94,6 @@
@CPSType(id = "XODUS", base = StoreFactory.class)
public class XodusStoreFactory implements StoreFactory {


/**
* The store names are created by hand here because the abstraction of {@link BigStore}
* creates two stores. Defining the expected stores like this, does not require a lot or complicated logic.
Expand Down Expand Up @@ -198,8 +198,8 @@ public ExecutorService getReaderExecutorService() {
Multimaps.synchronizedSetMultimap(MultimapBuilder.hashKeys().hashSetValues().build());

@Override
public Collection<NamespaceStorage> discoverNamespaceStorages() {
return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath, getValidator()), NAMESPACE_STORES);
public Collection<NamespaceStorage> discoverNamespaceStorages(StorageHandler storageHandler) {
return loadNamespacedStores("dataset_", (storePath) -> new NamespaceStorage(this, storePath, getValidator(), storageHandler), NAMESPACE_STORES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.bakdata.conquery.models.dictionary.Dictionary;
import com.bakdata.conquery.models.dictionary.MapDictionary;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.bakdata.conquery.models.events.stores.root.StringStore;
import com.bakdata.conquery.models.identifiable.IdMutex;
import com.bakdata.conquery.models.identifiable.Labeled;
import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable;
Expand Down Expand Up @@ -156,16 +155,10 @@ public TrieSearch<FrontendValue> createTrieSearch(IndexConfig config, NamespaceS

final TrieSearch<FrontendValue> search = new TrieSearch<>(suffixLength, config.getSearchSplitChars());

storage.getAllImports().stream()
.filter(imp -> imp.getTable().equals(getTable()))
.flatMap(imp -> {
final ImportColumn importColumn = imp.getColumns()[getPosition()];

return ((StringStore) importColumn.getTypeDescription()).iterateValues();
})
storage.getStorageHandler()
.lookupColumnValues(storage, this)
.map(value -> new FrontendValue(value, value))
.onClose(() -> log.debug("DONE processing values for {}", getId()))

.forEach(feValue -> search.addItem(feValue, FilterSearch.extractKeywords(feValue)));


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.bakdata.conquery.models.error;

import java.sql.SQLException;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -256,7 +255,7 @@ public String getMessageTemplate(ErrorMessages errorMessages) {
@CPSType(base = ConqueryError.class, id = "CQ_SQL_ERROR")
@RequiredArgsConstructor(onConstructor_ = {@JsonCreator})
public static class SqlError extends ConqueryError {
private final SQLException error;
private final Exception error;

@Override
public String getMessageTemplate(ErrorMessages errorMessages) {
Expand Down
Loading

0 comments on commit 867236f

Please sign in to comment.