Skip to content

Commit

Permalink
Merge branch 'develop' into feature/use-caffeine-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Dec 16, 2024
2 parents d674c77 + ef752fc commit a7f8f7a
Show file tree
Hide file tree
Showing 66 changed files with 973 additions and 1,473 deletions.
14 changes: 1 addition & 13 deletions backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,9 @@
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-core</artifactId>
<version>1.13.0</version>
<version>2.0.2</version>
<exclusions>
<!-- All these transitive deps are already bundled in shiro core-->
<exclusion>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-cache</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-config-ogdl</artifactId>
Expand All @@ -252,14 +248,6 @@
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-config-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-event</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-lang</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
14 changes: 4 additions & 10 deletions backend/src/main/java/com/bakdata/conquery/Conquery.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.dropwizard.core.setup.Environment;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.text.StringSubstitutor;
import org.glassfish.jersey.internal.inject.AbstractBinder;
Expand All @@ -38,8 +37,6 @@
public class Conquery extends Application<ConqueryConfig> {

private final String name;
@Setter
private ManagerNode managerNode;

public Conquery() {
this("Conquery");
Expand All @@ -59,7 +56,7 @@ public void initialize(Bootstrap<ConqueryConfig> bootstrap) {

bootstrap.addCommand(new ShardCommand());
bootstrap.addCommand(new PreprocessorCommand());
bootstrap.addCommand(new DistributedStandaloneCommand(this));
bootstrap.addCommand(new DistributedStandaloneCommand());
bootstrap.addCommand(new RecodeStoreCommand());
bootstrap.addCommand(new MigrateCommand());

Expand Down Expand Up @@ -104,13 +101,10 @@ protected Level bootstrapLogLevel() {
public void run(ConqueryConfig configuration, Environment environment) throws Exception {
ManagerProvider provider = configuration.getSqlConnectorConfig().isEnabled() ?
new LocalManagerProvider() : new ClusterManagerProvider();
run(provider.provideManager(configuration, environment));
}
Manager manager = provider.provideManager(configuration, environment);

ManagerNode managerNode = new ManagerNode();

public void run(Manager manager) throws InterruptedException {
if (managerNode == null) {
managerNode = new ManagerNode();
}
managerNode.run(manager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,35 +107,41 @@ public class QueryProcessor {
private Validator validator;


public List<ExecutionStatus> getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) {
public List<? extends ExecutionStatus> getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) {
try(Stream<ManagedExecution> allQueries = storage.getAllExecutions()) {
return getQueriesFiltered(dataset.getId(), RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders).toList();
}
}

public Stream<ExecutionStatus> getQueriesFiltered(DatasetId datasetId, UriBuilder uriBuilder, Subject subject, Stream<ManagedExecution> allQueries, boolean allProviders) {
public Stream<? extends ExecutionStatus> getQueriesFiltered(DatasetId datasetId, UriBuilder uriBuilder, Subject subject, Stream<ManagedExecution> allQueries, boolean allProviders) {

return allQueries
// The following only checks the dataset, under which the query was submitted, but a query can target more that
// one dataset.
.filter(q -> q.getDataset().equals(datasetId))
// to exclude subtypes from somewhere else
.filter(QueryProcessor::canFrontendRender)
.filter(Predicate.not(ManagedExecution::isSystem))
.filter(q -> {
ExecutionState state = q.getState();
return state == ExecutionState.NEW || state == ExecutionState.DONE;
}
)
.filter(q -> subject.isPermitted(q, Ability.READ))
.map(mq -> {
final OverviewExecutionStatus status = mq.buildStatusOverview(subject);

if (mq.isReadyToDownload()) {
status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders));
}
return status;
});
// The following only checks the dataset, under which the query was submitted, but a query can target more that
// one dataset.
.filter(q -> q.getDataset().equals(datasetId))
// to exclude subtypes from somewhere else
.filter(QueryProcessor::canFrontendRender)
.filter(Predicate.not(ManagedExecution::isSystem))
.filter(q -> {
ExecutionState state = q.getState();
return state == ExecutionState.NEW || state == ExecutionState.DONE;
})
.filter(q -> subject.isPermitted(q, Ability.READ))
.map(mq -> {
try {
final OverviewExecutionStatus status = mq.buildStatusOverview(subject);

if (mq.isReadyToDownload()) {
status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders));
}
return status;
}
catch (Exception e) {
log.error("FAILED building status for {}", mq, e);
}
return null;
})
.filter(Objects::nonNull);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private static Map<ColumnId, Integer> calculateColumnPositions(
for (Column column : table.getConnector().resolve().getResolvedTable().getColumns()) {

// ValidityDates are handled separately in column=0
if (validityDates.stream().anyMatch(vd -> vd.containsColumn(column))) {
if (validityDates.stream().anyMatch(vd -> vd.containsColumn(column.getId()))) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ public String defaultLabel(Locale locale) {
builder.append(" ");

for (ConceptElementId<?> id : elements) {
ConceptElement<?> conceptElement = id.resolve();
if (conceptElement.equals(getConcept())) {
if (id.equals(getConceptId())) {
continue;
}

ConceptElement<?> conceptElement = id.resolve();
builder.append(conceptElement.getLabel()).append("+");
}

Expand Down Expand Up @@ -274,9 +275,7 @@ public RequiredEntities collectRequiredEntities(QueryExecutionContext context) {
final Set<ConnectorId> connectors = getTables().stream().map(CQTable::getConnector).collect(Collectors.toSet());

return new RequiredEntities(context.getBucketManager()
.getEntitiesWithConcepts(getElements().stream()
.<ConceptElement<?>>map(ConceptElementId::resolve)
.toList(),
.getEntitiesWithConcepts(getElements(),
connectors, context.getDateRestriction()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public void readDates(String value, DateReader dateReader, CDateSet out) {
DATE_SET {
@Override
public void readDates(String value, DateReader dateReader, CDateSet out) {
out.addAll(dateReader.parseToCDateSet(value));
CDateSet parsed = dateReader.parseToCDateSet(value);
if (parsed == null ) {
return;
}
out.addAll(parsed);
}
},
ALL {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.bakdata.conquery.models.identifiable.mapping.ExternalId;
import com.bakdata.conquery.util.DateReader;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
public class EntityResolverUtil {
Expand All @@ -41,7 +42,7 @@ public static CDateSet[] readDates(String[][] values, List<String> format, DateR
but can also don't contribute to any date aggregation.
*/
if (dateFormats.stream().allMatch(Objects::isNull)) {
// Initialize empty
// Initialize empty, so all lines appear als resolved
for (int row = 0; row < values.length; row++) {
out[row] = CDateSet.createEmpty();
}
Expand All @@ -59,10 +60,19 @@ public static CDateSet[] readDates(String[][] values, List<String> format, DateR
if (dateFormat == null) {
continue;
}
dateFormat.readDates(values[row][col], dateReader, dates);
String value = values[row][col];

if (StringUtils.isBlank(value)) {
log.trace("Found blank/null value in {}/{} (row/col)", row,col);
continue;
}

dateFormat.readDates(value, dateReader, dates);
}

if (dates.isEmpty()) {
// Don't set an empty dateset here, because this flags the line as: unresolvedDate
// TODO It might be better to set an empty dateset nonetheless, because it seems to be intentionally empty, as we had no problem while parsing a value
continue;
}

Expand All @@ -73,7 +83,9 @@ public static CDateSet[] readDates(String[][] values, List<String> format, DateR
out[row].addAll(dates);
}
catch (Exception e) {
log.warn("Failed to parse Date from {}", row, e);
// If a value is not parsable, it is included in the exceptions cause message (see DateReader)
log.trace("Failed to parse Date in row {}", row, e);
// This catch causes `out[row]` to remain `null` which later flags this line as: unresolvedDate
}
}

Expand Down Expand Up @@ -142,6 +154,7 @@ public static String tryResolveId(String[] row, List<Function<String[], External
*/
public static Map<String, String>[] readExtras(String[][] values, List<String> format) {
final String[] names = values[0];
@SuppressWarnings("unchecked")
final Map<String, String>[] extrasByRow = new Map[values.length];


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
import java.util.List;
import java.util.Vector;

import com.bakdata.conquery.Conquery;
import com.bakdata.conquery.mode.cluster.ClusterManager;
import com.bakdata.conquery.mode.cluster.ClusterManagerProvider;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.XodusStoreFactory;
import com.bakdata.conquery.util.commands.NoOpConquery;
import com.bakdata.conquery.util.io.ConqueryMDC;
import io.dropwizard.core.cli.ServerCommand;
import io.dropwizard.core.setup.Bootstrap;
import io.dropwizard.core.setup.Environment;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -21,68 +20,42 @@
@Getter
public class DistributedStandaloneCommand extends ServerCommand<ConqueryConfig> implements StandaloneCommand {

private final Conquery conquery;
private ClusterManager manager;
private final ManagerNode managerNode = new ManagerNode();
private final List<ShardNode> shardNodes = new Vector<>();
private ClusterManager manager;

// TODO clean up the command structure, so we can use the Environment from EnvironmentCommand
private Environment environment;

public DistributedStandaloneCommand(Conquery conquery) {
super(conquery, "standalone", "starts a server and a client at the same time.");
this.conquery = conquery;
public DistributedStandaloneCommand() {
super(new NoOpConquery(), "standalone", "starts a manager node and shard node(s) at the same time in a single JVM.");
}

// this must be overridden so that
@Override
public void run(Bootstrap<ConqueryConfig> bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception {
environment = new Environment(
bootstrap.getApplication().getName(),
bootstrap.getObjectMapper(),
bootstrap.getValidatorFactory(),
bootstrap.getMetricRegistry(),
bootstrap.getClassLoader(),
bootstrap.getHealthCheckRegistry(),
configuration
);
configuration.getMetricsFactory().configure(environment.lifecycle(), bootstrap.getMetricRegistry());
configuration.getServerFactory().configure(environment);

bootstrap.run(configuration, environment);
startStandalone(environment, namespace, configuration);
}
protected void run(Environment environment, Namespace namespace, ConqueryConfig configuration) throws Exception {

public void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception {
// start ManagerNode
ConqueryMDC.setLocation("ManagerNode");
log.debug("Starting ManagerNode");

ConqueryConfig managerConfig = config;
ConqueryConfig managerConfig = configuration;

if (config.getStorage() instanceof XodusStoreFactory) {
final Path managerDir = ((XodusStoreFactory) config.getStorage()).getDirectory().resolve("manager");
managerConfig = config.withStorage(((XodusStoreFactory) config.getStorage()).withDirectory(managerDir));
if (configuration.getStorage() instanceof XodusStoreFactory) {
final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("manager");
managerConfig = configuration.withStorage(((XodusStoreFactory) configuration.getStorage()).withDirectory(managerDir));
}

manager = new ClusterManagerProvider().provideManager(managerConfig, environment);

conquery.setManagerNode(managerNode);
conquery.run(manager);
managerNode.run(manager);

for (int id = 0; id < config.getStandalone().getNumberOfShardNodes(); id++) {
for (int id = 0; id < configuration.getStandalone().getNumberOfShardNodes(); id++) {

ShardNode sc = new ShardNode(ShardNode.DEFAULT_NAME + id);

shardNodes.add(sc);

ConqueryMDC.setLocation(sc.getName());

ConqueryConfig clone = config;
ConqueryConfig clone = configuration;

if (config.getStorage() instanceof XodusStoreFactory) {
final Path managerDir = ((XodusStoreFactory) config.getStorage()).getDirectory().resolve("shard-node" + id);
clone = config.withStorage(((XodusStoreFactory) config.getStorage()).withDirectory(managerDir));
if (configuration.getStorage() instanceof XodusStoreFactory) {
final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("shard-node" + id);
clone = configuration.withStorage(((XodusStoreFactory) configuration.getStorage()).withDirectory(managerDir));
}

sc.run(clone, environment);
Expand All @@ -93,6 +66,6 @@ public void startStandalone(Environment environment, Namespace namespace, Conque
ConqueryMDC.setLocation("ManagerNode");
log.debug("Starting REST Server");
ConqueryMDC.setLocation(null);
super.run(environment, namespace, config);
super.run(environment, namespace, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,15 @@

import java.util.List;

import com.bakdata.conquery.Conquery;
import com.bakdata.conquery.mode.Manager;
import com.bakdata.conquery.models.config.ConqueryConfig;
import io.dropwizard.core.setup.Bootstrap;
import io.dropwizard.core.setup.Environment;
import net.sourceforge.argparse4j.inf.Namespace;

public interface StandaloneCommand {

void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception;

Manager getManager();

List<ShardNode> getShardNodes();

void run(Bootstrap<ConqueryConfig> bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception;

Conquery getConquery();

ManagerNode getManagerNode();

Environment getEnvironment();
Expand Down
Loading

0 comments on commit a7f8f7a

Please sign in to comment.