diff --git a/backend/pom.xml b/backend/pom.xml index ab20155260..0baa0b2fe7 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -225,13 +225,9 @@ org.apache.shiro shiro-core - 1.13.0 + 2.0.2 - - org.apache.shiro - shiro-cache - org.apache.shiro shiro-config-ogdl @@ -252,14 +248,6 @@ org.apache.shiro shiro-config-core - - org.apache.shiro - shiro-event - - - org.apache.shiro - shiro-lang - diff --git a/backend/src/main/java/com/bakdata/conquery/Conquery.java b/backend/src/main/java/com/bakdata/conquery/Conquery.java index 32cd0a43ca..54215b3e5e 100644 --- a/backend/src/main/java/com/bakdata/conquery/Conquery.java +++ b/backend/src/main/java/com/bakdata/conquery/Conquery.java @@ -27,7 +27,6 @@ import io.dropwizard.core.setup.Environment; import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.text.StringSubstitutor; import org.glassfish.jersey.internal.inject.AbstractBinder; @@ -38,8 +37,6 @@ public class Conquery extends Application { private final String name; - @Setter - private ManagerNode managerNode; public Conquery() { this("Conquery"); @@ -59,7 +56,7 @@ public void initialize(Bootstrap bootstrap) { bootstrap.addCommand(new ShardCommand()); bootstrap.addCommand(new PreprocessorCommand()); - bootstrap.addCommand(new DistributedStandaloneCommand(this)); + bootstrap.addCommand(new DistributedStandaloneCommand()); bootstrap.addCommand(new RecodeStoreCommand()); bootstrap.addCommand(new MigrateCommand()); @@ -104,13 +101,10 @@ protected Level bootstrapLogLevel() { public void run(ConqueryConfig configuration, Environment environment) throws Exception { ManagerProvider provider = configuration.getSqlConnectorConfig().isEnabled() ? new LocalManagerProvider() : new ClusterManagerProvider(); - run(provider.provideManager(configuration, environment)); - } + Manager manager = provider.provideManager(configuration, environment); + + ManagerNode managerNode = new ManagerNode(); - public void run(Manager manager) throws InterruptedException { - if (managerNode == null) { - managerNode = new ManagerNode(); - } managerNode.run(manager); } } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java index e4ab20ddc8..b9ef950d6c 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/QueryProcessor.java @@ -107,35 +107,41 @@ public class QueryProcessor { private Validator validator; - public List getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) { + public List getAllQueries(Dataset dataset, HttpServletRequest req, Subject subject, boolean allProviders) { try(Stream allQueries = storage.getAllExecutions()) { return getQueriesFiltered(dataset.getId(), RequestAwareUriBuilder.fromRequest(req), subject, allQueries, allProviders).toList(); } } - public Stream getQueriesFiltered(DatasetId datasetId, UriBuilder uriBuilder, Subject subject, Stream allQueries, boolean allProviders) { + public Stream getQueriesFiltered(DatasetId datasetId, UriBuilder uriBuilder, Subject subject, Stream allQueries, boolean allProviders) { return allQueries - // The following only checks the dataset, under which the query was submitted, but a query can target more that - // one dataset. - .filter(q -> q.getDataset().equals(datasetId)) - // to exclude subtypes from somewhere else - .filter(QueryProcessor::canFrontendRender) - .filter(Predicate.not(ManagedExecution::isSystem)) - .filter(q -> { - ExecutionState state = q.getState(); - return state == ExecutionState.NEW || state == ExecutionState.DONE; - } - ) - .filter(q -> subject.isPermitted(q, Ability.READ)) - .map(mq -> { - final OverviewExecutionStatus status = mq.buildStatusOverview(subject); - - if (mq.isReadyToDownload()) { - status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders)); - } - return status; - }); + // The following only checks the dataset, under which the query was submitted, but a query can target more that + // one dataset. + .filter(q -> q.getDataset().equals(datasetId)) + // to exclude subtypes from somewhere else + .filter(QueryProcessor::canFrontendRender) + .filter(Predicate.not(ManagedExecution::isSystem)) + .filter(q -> { + ExecutionState state = q.getState(); + return state == ExecutionState.NEW || state == ExecutionState.DONE; + }) + .filter(q -> subject.isPermitted(q, Ability.READ)) + .map(mq -> { + try { + final OverviewExecutionStatus status = mq.buildStatusOverview(subject); + + if (mq.isReadyToDownload()) { + status.setResultUrls(getResultAssets(config.getResultProviders(), mq, uriBuilder, allProviders)); + } + return status; + } + catch (Exception e) { + log.error("FAILED building status for {}", mq, e); + } + return null; + }) + .filter(Objects::nonNull); } /** diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java index 115fb6b28c..9fd20f9c6c 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/TableExportQuery.java @@ -200,7 +200,7 @@ private static Map calculateColumnPositions( for (Column column : table.getConnector().resolve().getResolvedTable().getColumns()) { // ValidityDates are handled separately in column=0 - if (validityDates.stream().anyMatch(vd -> vd.containsColumn(column))) { + if (validityDates.stream().anyMatch(vd -> vd.containsColumn(column.getId()))) { continue; } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java index c52fe7341b..aeecd4471a 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/CQConcept.java @@ -148,10 +148,11 @@ public String defaultLabel(Locale locale) { builder.append(" "); for (ConceptElementId id : elements) { - ConceptElement conceptElement = id.resolve(); - if (conceptElement.equals(getConcept())) { + if (id.equals(getConceptId())) { continue; } + + ConceptElement conceptElement = id.resolve(); builder.append(conceptElement.getLabel()).append("+"); } @@ -274,9 +275,7 @@ public RequiredEntities collectRequiredEntities(QueryExecutionContext context) { final Set connectors = getTables().stream().map(CQTable::getConnector).collect(Collectors.toSet()); return new RequiredEntities(context.getBucketManager() - .getEntitiesWithConcepts(getElements().stream() - .>map(ConceptElementId::resolve) - .toList(), + .getEntitiesWithConcepts(getElements(), connectors, context.getDateRestriction())); } diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/DateFormat.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/DateFormat.java index ece1eacaaa..4deffc64a2 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/DateFormat.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/DateFormat.java @@ -58,7 +58,11 @@ public void readDates(String value, DateReader dateReader, CDateSet out) { DATE_SET { @Override public void readDates(String value, DateReader dateReader, CDateSet out) { - out.addAll(dateReader.parseToCDateSet(value)); + CDateSet parsed = dateReader.parseToCDateSet(value); + if (parsed == null ) { + return; + } + out.addAll(parsed); } }, ALL { diff --git a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolverUtil.java b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolverUtil.java index 3a9be46a25..b26e3a15f5 100644 --- a/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolverUtil.java +++ b/backend/src/main/java/com/bakdata/conquery/apiv1/query/concept/specific/external/EntityResolverUtil.java @@ -15,6 +15,7 @@ import com.bakdata.conquery.models.identifiable.mapping.ExternalId; import com.bakdata.conquery.util.DateReader; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; @Slf4j public class EntityResolverUtil { @@ -41,7 +42,7 @@ public static CDateSet[] readDates(String[][] values, List format, DateR but can also don't contribute to any date aggregation. */ if (dateFormats.stream().allMatch(Objects::isNull)) { - // Initialize empty + // Initialize empty, so all lines appear als resolved for (int row = 0; row < values.length; row++) { out[row] = CDateSet.createEmpty(); } @@ -59,10 +60,19 @@ public static CDateSet[] readDates(String[][] values, List format, DateR if (dateFormat == null) { continue; } - dateFormat.readDates(values[row][col], dateReader, dates); + String value = values[row][col]; + + if (StringUtils.isBlank(value)) { + log.trace("Found blank/null value in {}/{} (row/col)", row,col); + continue; + } + + dateFormat.readDates(value, dateReader, dates); } if (dates.isEmpty()) { + // Don't set an empty dateset here, because this flags the line as: unresolvedDate + // TODO It might be better to set an empty dateset nonetheless, because it seems to be intentionally empty, as we had no problem while parsing a value continue; } @@ -73,7 +83,9 @@ public static CDateSet[] readDates(String[][] values, List format, DateR out[row].addAll(dates); } catch (Exception e) { - log.warn("Failed to parse Date from {}", row, e); + // If a value is not parsable, it is included in the exceptions cause message (see DateReader) + log.trace("Failed to parse Date in row {}", row, e); + // This catch causes `out[row]` to remain `null` which later flags this line as: unresolvedDate } } @@ -142,6 +154,7 @@ public static String tryResolveId(String[] row, List[] readExtras(String[][] values, List format) { final String[] names = values[0]; + @SuppressWarnings("unchecked") final Map[] extrasByRow = new Map[values.length]; diff --git a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java index 311c72cf5f..97b67aa1f1 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/DistributedStandaloneCommand.java @@ -4,14 +4,13 @@ import java.util.List; import java.util.Vector; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.mode.cluster.ClusterManager; import com.bakdata.conquery.mode.cluster.ClusterManagerProvider; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.XodusStoreFactory; +import com.bakdata.conquery.util.commands.NoOpConquery; import com.bakdata.conquery.util.io.ConqueryMDC; import io.dropwizard.core.cli.ServerCommand; -import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -21,56 +20,30 @@ @Getter public class DistributedStandaloneCommand extends ServerCommand implements StandaloneCommand { - private final Conquery conquery; - private ClusterManager manager; private final ManagerNode managerNode = new ManagerNode(); private final List shardNodes = new Vector<>(); + private ClusterManager manager; - // TODO clean up the command structure, so we can use the Environment from EnvironmentCommand - private Environment environment; - - public DistributedStandaloneCommand(Conquery conquery) { - super(conquery, "standalone", "starts a server and a client at the same time."); - this.conquery = conquery; + public DistributedStandaloneCommand() { + super(new NoOpConquery(), "standalone", "starts a manager node and shard node(s) at the same time in a single JVM."); } - // this must be overridden so that @Override - public void run(Bootstrap bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception { - environment = new Environment( - bootstrap.getApplication().getName(), - bootstrap.getObjectMapper(), - bootstrap.getValidatorFactory(), - bootstrap.getMetricRegistry(), - bootstrap.getClassLoader(), - bootstrap.getHealthCheckRegistry(), - configuration - ); - configuration.getMetricsFactory().configure(environment.lifecycle(), bootstrap.getMetricRegistry()); - configuration.getServerFactory().configure(environment); - - bootstrap.run(configuration, environment); - startStandalone(environment, namespace, configuration); - } + protected void run(Environment environment, Namespace namespace, ConqueryConfig configuration) throws Exception { - public void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception { - // start ManagerNode - ConqueryMDC.setLocation("ManagerNode"); - log.debug("Starting ManagerNode"); - ConqueryConfig managerConfig = config; + ConqueryConfig managerConfig = configuration; - if (config.getStorage() instanceof XodusStoreFactory) { - final Path managerDir = ((XodusStoreFactory) config.getStorage()).getDirectory().resolve("manager"); - managerConfig = config.withStorage(((XodusStoreFactory) config.getStorage()).withDirectory(managerDir)); + if (configuration.getStorage() instanceof XodusStoreFactory) { + final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("manager"); + managerConfig = configuration.withStorage(((XodusStoreFactory) configuration.getStorage()).withDirectory(managerDir)); } manager = new ClusterManagerProvider().provideManager(managerConfig, environment); - conquery.setManagerNode(managerNode); - conquery.run(manager); + managerNode.run(manager); - for (int id = 0; id < config.getStandalone().getNumberOfShardNodes(); id++) { + for (int id = 0; id < configuration.getStandalone().getNumberOfShardNodes(); id++) { ShardNode sc = new ShardNode(ShardNode.DEFAULT_NAME + id); @@ -78,11 +51,11 @@ public void startStandalone(Environment environment, Namespace namespace, Conque ConqueryMDC.setLocation(sc.getName()); - ConqueryConfig clone = config; + ConqueryConfig clone = configuration; - if (config.getStorage() instanceof XodusStoreFactory) { - final Path managerDir = ((XodusStoreFactory) config.getStorage()).getDirectory().resolve("shard-node" + id); - clone = config.withStorage(((XodusStoreFactory) config.getStorage()).withDirectory(managerDir)); + if (configuration.getStorage() instanceof XodusStoreFactory) { + final Path managerDir = ((XodusStoreFactory) configuration.getStorage()).getDirectory().resolve("shard-node" + id); + clone = configuration.withStorage(((XodusStoreFactory) configuration.getStorage()).withDirectory(managerDir)); } sc.run(clone, environment); @@ -93,6 +66,6 @@ public void startStandalone(Environment environment, Namespace namespace, Conque ConqueryMDC.setLocation("ManagerNode"); log.debug("Starting REST Server"); ConqueryMDC.setLocation(null); - super.run(environment, namespace, config); + super.run(environment, namespace, configuration); } } diff --git a/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java index abf7ec7fe3..7545564291 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/StandaloneCommand.java @@ -2,25 +2,15 @@ import java.util.List; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.mode.Manager; -import com.bakdata.conquery.models.config.ConqueryConfig; -import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; -import net.sourceforge.argparse4j.inf.Namespace; public interface StandaloneCommand { - void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception; - Manager getManager(); List getShardNodes(); - void run(Bootstrap bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception; - - Conquery getConquery(); - ManagerNode getManagerNode(); Environment getEnvironment(); diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java index 5ba5177574..e54b5c3c3b 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/Jackson.java @@ -2,7 +2,8 @@ import java.util.Locale; -import com.bakdata.conquery.io.jackson.serializer.Object2IntMapMixin; +import com.bakdata.conquery.io.jackson.mixin.DefaultSocketSessionConfigMixIn; +import com.bakdata.conquery.io.jackson.mixin.Object2IntMapMixIn; import com.bakdata.conquery.models.auth.permissions.ConqueryPermission; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonGenerator; @@ -19,6 +20,7 @@ import com.fasterxml.jackson.module.blackbird.BlackbirdModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.mina.transport.socket.DefaultSocketSessionConfig; import org.apache.shiro.authz.Permission; public class Jackson { @@ -72,7 +74,8 @@ public static T configure(T objectMapper) { //.setAnnotationIntrospector(new RestrictingAnnotationIntrospector()) .setInjectableValues(new MutableInjectableValues()) .addMixIn(Permission.class, ConqueryPermission.class) - .addMixIn(Object2IntMap.class, Object2IntMapMixin.class); + .addMixIn(Object2IntMap.class, Object2IntMapMixIn.class) + .addMixIn(DefaultSocketSessionConfig.class, DefaultSocketSessionConfigMixIn.class); return objectMapper; } diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java index 56ddadc71c..a914394572 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/JacksonUtil.java @@ -1,17 +1,11 @@ package com.bakdata.conquery.io.jackson; -import java.io.ByteArrayInputStream; import java.io.InputStream; -import java.io.SequenceInputStream; -import com.bakdata.conquery.io.mina.ChunkedMessage; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonMappingException; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.IteratorUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.translate.JavaUnicodeEscaper; import org.apache.mina.core.buffer.IoBuffer; @@ -19,14 +13,6 @@ @Slf4j @UtilityClass public class JacksonUtil { - public static String toJsonDebug(byte[] bytes) { - return toJsonDebug(IoBuffer.wrap(bytes)); - } - - public static String toJsonDebug(IoBuffer buffer) { - return toJsonDebug(stream(buffer)); - } - /** * Partially read and parse InputStream as Json, directly storing it into String, just for debugging purposes. */ @@ -72,7 +58,7 @@ public static String toJsonDebug(InputStream is) { sb.append('"').append(value).append("\","); break; default: - sb.append(t.toString()); + sb.append(t); log.warn("I don't know how to handle {}", t); break; } @@ -85,33 +71,4 @@ public static String toJsonDebug(InputStream is) { return sb.toString(); } } - - public static InputStream stream(IoBuffer buffer) { - return new ByteArrayInputStream( - buffer.array(), - buffer.position() + buffer.arrayOffset(), - buffer.remaining() - ); - } - - public static InputStream stream(Iterable list) { - return new SequenceInputStream( - IteratorUtils.asEnumeration( - IteratorUtils.transformedIterator( - list.iterator(), - JacksonUtil::stream - ) - ) - ); - } - - public static String toJsonDebug(ChunkedMessage msg) { - return toJsonDebug(msg.createInputStream()); - } - - public static void expect(Class parseTargetType, DeserializationContext ctxt, JsonToken token, JsonToken expected) throws JsonMappingException { - if (token != expected) { - ctxt.reportInputMismatch(parseTargetType, "Expected " + expected + " but found " + token); - } - } } diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/DefaultSocketSessionConfigMixIn.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/DefaultSocketSessionConfigMixIn.java new file mode 100644 index 0000000000..0c86bf275d --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/DefaultSocketSessionConfigMixIn.java @@ -0,0 +1,17 @@ +package com.bakdata.conquery.io.jackson.mixin; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.mina.transport.socket.DefaultSocketSessionConfig; + +/** + * MixIn to suppress artificial properties of {@link DefaultSocketSessionConfig}. + */ +@JsonIgnoreProperties(value = { + "throughputCalculationIntervalInMillis", + "readerIdleTimeInMillis", + "writeTimeoutInMillis", + "writerIdleTimeInMillis", + "bothIdleTimeInMillis" +}) +public class DefaultSocketSessionConfigMixIn extends DefaultSocketSessionConfig { +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/Object2IntMapMixin.java b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/Object2IntMapMixIn.java similarity index 95% rename from backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/Object2IntMapMixin.java rename to backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/Object2IntMapMixIn.java index 76b660ffbd..eac1ca8346 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/jackson/serializer/Object2IntMapMixin.java +++ b/backend/src/main/java/com/bakdata/conquery/io/jackson/mixin/Object2IntMapMixIn.java @@ -1,4 +1,4 @@ -package com.bakdata.conquery.io.jackson.serializer; +package com.bakdata.conquery.io.jackson.mixin; import java.io.IOException; import java.util.Map; @@ -25,9 +25,9 @@ /** * (De-)Serialization Mixin for {@link Object2IntMap}. */ -@JsonDeserialize(using = Object2IntMapMixin.Deserializer.class) +@JsonDeserialize(using = Object2IntMapMixIn.Deserializer.class) @Slf4j -public class Object2IntMapMixin { +public class Object2IntMapMixIn { public static class Deserializer extends StdDeserializer> implements ContextualDeserializer { diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/BinaryJacksonCoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/BinaryJacksonCoder.java deleted file mode 100644 index a60d742f96..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/BinaryJacksonCoder.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import jakarta.validation.Validator; - -import com.bakdata.conquery.models.exceptions.ValidatorHelper; -import com.bakdata.conquery.models.identifiable.NamespacedStorageProvider; -import com.bakdata.conquery.models.messages.network.NetworkMessage; -import com.bakdata.conquery.util.io.EndCheckableInputStream; -import com.fasterxml.jackson.core.JsonParser.Feature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class BinaryJacksonCoder implements CQCoder> { - - private final Validator validator; - private final ObjectWriter writer; - private final ObjectReader reader; - - public BinaryJacksonCoder(NamespacedStorageProvider namespacedStorageProvider, Validator validator, ObjectMapper objectMapper) { - this.validator = validator; - writer = objectMapper.writerFor(NetworkMessage.class); - reader = namespacedStorageProvider.injectIntoNew(objectMapper.readerFor(NetworkMessage.class)).without(Feature.AUTO_CLOSE_SOURCE); - } - - @Override - public NetworkMessage decode(ChunkedMessage message) throws Exception { - try (EndCheckableInputStream is = message.createInputStream()) { - final Object obj = reader.readValue(is); - if (!is.isAtEnd()) { - throw new IllegalStateException("After reading the JSON message " + obj + " the buffer has still bytes available"); - } - ValidatorHelper.failOnError(log, validator.validate(obj)); - return (NetworkMessage) obj; - } - } - - @Override - public Chunkable encode(NetworkMessage message) throws Exception { - ValidatorHelper.failOnError(log, validator.validate(message)); - - return new Chunkable(message.getMessageId(), writer, message); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/CQCoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/CQCoder.java deleted file mode 100644 index aa76894d6f..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/CQCoder.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.bakdata.conquery.io.mina; - -public interface CQCoder { - - OUT decode(ChunkedMessage message) throws Exception; - - Chunkable encode(OUT message) throws Exception; -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/CQProtocolCodecFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/CQProtocolCodecFilter.java deleted file mode 100644 index f38b3b31a6..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/CQProtocolCodecFilter.java +++ /dev/null @@ -1,532 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.net.SocketAddress; -import java.util.Queue; - -import org.apache.mina.core.buffer.IoBuffer; -import org.apache.mina.core.file.FileRegion; -import org.apache.mina.core.filterchain.IoFilterAdapter; -import org.apache.mina.core.filterchain.IoFilterChain; -import org.apache.mina.core.future.DefaultWriteFuture; -import org.apache.mina.core.future.WriteFuture; -import org.apache.mina.core.session.AbstractIoSession; -import org.apache.mina.core.session.AttributeKey; -import org.apache.mina.core.session.IoSession; -import org.apache.mina.core.write.DefaultWriteRequest; -import org.apache.mina.core.write.NothingWrittenException; -import org.apache.mina.core.write.WriteRequest; -import org.apache.mina.filter.codec.AbstractProtocolDecoderOutput; -import org.apache.mina.filter.codec.AbstractProtocolEncoderOutput; -import org.apache.mina.filter.codec.ProtocolCodecFactory; -import org.apache.mina.filter.codec.ProtocolDecoder; -import org.apache.mina.filter.codec.ProtocolDecoderException; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.mina.filter.codec.ProtocolEncoder; -import org.apache.mina.filter.codec.ProtocolEncoderException; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.filter.codec.RecoverableProtocolDecoderException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -//see #167 this class is a copy of the one in the mina project -//it is used because mina on default dumps even very large hex values -public class CQProtocolCodecFilter extends IoFilterAdapter { - /** A logger for this class */ - private static final Logger LOGGER = LoggerFactory.getLogger(CQProtocolCodecFilter.class); - - private static final Class[] EMPTY_PARAMS = new Class[0]; - - private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); - - private static final AttributeKey ENCODER = new AttributeKey(CQProtocolCodecFilter.class, "encoder"); - - private static final AttributeKey DECODER = new AttributeKey(CQProtocolCodecFilter.class, "decoder"); - - private static final AttributeKey DECODER_OUT = new AttributeKey(CQProtocolCodecFilter.class, "decoderOut"); - - private static final AttributeKey ENCODER_OUT = new AttributeKey(CQProtocolCodecFilter.class, "encoderOut"); - - /** The factory responsible for creating the encoder and decoder */ - private final ProtocolCodecFactory factory; - - /** - * Creates a new instance of ProtocolCodecFilter, associating a factory - * for the creation of the encoder and decoder. - * - * @param factory The associated factory - */ - public CQProtocolCodecFilter(ProtocolCodecFactory factory) { - if (factory == null) { - throw new IllegalArgumentException("factory"); - } - - this.factory = factory; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder). - * - * @param encoder The class responsible for encoding the message - * @param decoder The class responsible for decoding the message - */ - public CQProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { - if (encoder == null) { - throw new IllegalArgumentException("encoder"); - } - if (decoder == null) { - throw new IllegalArgumentException("decoder"); - } - - // Create the inner Factory based on the two parameters - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) { - return decoder; - } - }; - } - - /** - * Creates a new instance of ProtocolCodecFilter, without any factory. - * The encoder/decoder factory will be created as an inner class, using - * the two parameters (encoder and decoder), which are class names. Instances - * for those classes will be created in this constructor. - * - * @param encoderClass The class responsible for encoding the message - * @param decoderClass The class responsible for decoding the message - */ - public CQProtocolCodecFilter(final Class encoderClass, - final Class decoderClass) { - if (encoderClass == null) { - throw new IllegalArgumentException("encoderClass"); - } - if (decoderClass == null) { - throw new IllegalArgumentException("decoderClass"); - } - if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { - throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); - } - if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { - throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); - } - try { - encoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("encoderClass doesn't have a public default constructor."); - } - try { - decoderClass.getConstructor(EMPTY_PARAMS); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("decoderClass doesn't have a public default constructor."); - } - - final ProtocolEncoder encoder; - - try { - encoder = encoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("encoderClass cannot be initialized"); - } - - final ProtocolDecoder decoder; - - try { - decoder = decoderClass.newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("decoderClass cannot be initialized"); - } - - // Create the inner factory based on the two parameters. - this.factory = new ProtocolCodecFactory() { - /** - * {@inheritDoc} - */ - @Override - public ProtocolEncoder getEncoder(IoSession session) throws Exception { - return encoder; - } - - /** - * {@inheritDoc} - */ - @Override - public ProtocolDecoder getDecoder(IoSession session) throws Exception { - return decoder; - } - }; - } - - /** - * {@inheritDoc} - */ - @Override - public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - if (parent.contains(this)) { - throw new IllegalArgumentException( - "You can't add the same filter instance more than once. Create another instance and add it."); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { - // Clean everything - disposeCodec(parent.getSession()); - } - - /** - * Process the incoming message, calling the session decoder. As the incoming - * buffer might contains more than one messages, we have to loop until the decoder - * throws an exception. - * - * while ( buffer not empty ) - * try - * decode ( buffer ) - * catch - * break; - * - */ - @Override - public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { - LOGGER.trace("Processing a MESSAGE_RECEIVED for session {}", session.getId()); - - if (!(message instanceof IoBuffer)) { - nextFilter.messageReceived(session, message); - return; - } - - IoBuffer in = (IoBuffer) message; - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session); - - // Loop until we don't have anymore byte in the buffer, - // or until the decoder throws an unrecoverable exception or - // can't decoder a message, because there are not enough - // data in the buffer - while (in.hasRemaining()) { - int oldPos = in.position(); - try { - synchronized (session) { - // Call the decoder with the read bytes - decoder.decode(session, in, decoderOut); - } - // Finish decoding if no exception was thrown. - decoderOut.flush(nextFilter, session); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - if (pde.getHexdump() == null) { - // Generate a message hex dump - int curPos = in.position(); - in.position(oldPos); - pde.setHexdump(in.getHexDump(300)); - in.position(curPos); - } - // Fire the exceptionCaught event. - decoderOut.flush(nextFilter, session); - nextFilter.exceptionCaught(session, pde); - // Retry only if the type of the caught exception is - // recoverable and the buffer position has changed. - // We check buffer position additionally to prevent an - // infinite loop. - if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) { - break; - } - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - if (writeRequest instanceof EncodedWriteRequest) { - return; - } - - if (writeRequest instanceof MessageWriteRequest wrappedRequest) { - nextFilter.messageSent(session, wrappedRequest.getOriginalRequest()); - } else { - nextFilter.messageSent(session, writeRequest); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { - Object message = writeRequest.getMessage(); - - // Bypass the encoding if the message is contained in a IoBuffer, - // as it has already been encoded before - if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { - nextFilter.filterWrite(session, writeRequest); - return; - } - - // Get the encoder in the session - ProtocolEncoder encoder = factory.getEncoder(session); - - ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); - - if (encoder == null) { - throw new ProtocolEncoderException("The encoder is null for the session " + session); - } - - try { - // The following encodes the message, chunks the message AND also flushes the chunks to the processor - // See ChunkWriter::finishBuffer and ProtocolEncoderOutputImpl::flush - encoder.encode(session, message, encoderOut); - - // Call the next filter - nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest)); - } catch (Exception e) { - ProtocolEncoderException pee; - - // Generate the correct exception - if (e instanceof ProtocolEncoderException) { - pee = (ProtocolEncoderException) e; - } else { - pee = new ProtocolEncoderException(e); - } - - throw pee; - } - } - - /** - * {@inheritDoc} - */ - @Override - public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { - // Call finishDecode() first when a connection is closed. - ProtocolDecoder decoder = factory.getDecoder(session); - ProtocolDecoderOutput decoderOut = getDecoderOut(session); - - try { - decoder.finishDecode(session, decoderOut); - } catch (Exception e) { - ProtocolDecoderException pde; - if (e instanceof ProtocolDecoderException) { - pde = (ProtocolDecoderException) e; - } else { - pde = new ProtocolDecoderException(e); - } - throw pde; - } finally { - // Dispose everything - disposeCodec(session); - decoderOut.flush(nextFilter, session); - } - - // Call the next filter - nextFilter.sessionClosed(session); - } - - private static class EncodedWriteRequest extends DefaultWriteRequest { - public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) { - super(encodedMessage, future, destination); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isEncoded() { - return true; - } - } - - /** - * Wrapper for write request that where filtered by {@link CQProtocolCodecFilter} to recognize the request - * when it's events are bubbled downstream through the filterchain. - * - */ - private static class MessageWriteRequest extends DefaultWriteRequest { - public MessageWriteRequest(WriteRequest writeRequest) { - super(writeRequest, writeRequest.getFuture()); - } - - @Override - public Object getMessage() { - return EMPTY_BUFFER; - } - - @Override - public String toString() { - return "MessageWriteRequest, parent : " + super.toString(); - } - } - - private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput { - public ProtocolDecoderOutputImpl() { - // Do nothing - } - - /** - * {@inheritDoc} - */ - @Override - public void flush(NextFilter nextFilter, IoSession session) { - Queue messageQueue = getMessageQueue(); - - while (!messageQueue.isEmpty()) { - nextFilter.messageReceived(session, messageQueue.poll()); - } - } - } - - private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput { - private final IoSession session; - - private final NextFilter nextFilter; - - /** The WriteRequest destination */ - private final SocketAddress destination; - - public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - this.session = session; - this.nextFilter = nextFilter; - - // Only store the destination, not the full WriteRequest. - destination = writeRequest.getDestination(); - } - - /** - * {@inheritDoc} - */ - @Override - public WriteFuture flush() { - Queue bufferQueue = getMessageQueue(); - WriteFuture future = null; - - while (!bufferQueue.isEmpty()) { - Object encodedMessage = bufferQueue.poll(); - - if (encodedMessage == null) { - break; - } - - // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { - future = new DefaultWriteFuture(session); - nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination)); - } - } - - if (future == null) { - // Creates an empty writeRequest containing the destination - future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST)); - } - - return future; - } - } - - //----------- Helper methods --------------------------------------------- - /** - * Dispose the encoder, decoder, and the callback for the decoded - * messages. - */ - private void disposeCodec(IoSession session) { - // We just remove the two instances of encoder/decoder to release resources - // from the session - disposeEncoder(session); - disposeDecoder(session); - - // We also remove the callback - disposeDecoderOut(session); - } - - /** - * Dispose the encoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeEncoder(IoSession session) { - ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER); - if (encoder == null) { - return; - } - - try { - encoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); - } - } - - /** - * Dispose the decoder, removing its instance from the - * session's attributes, and calling the associated - * dispose method. - */ - private void disposeDecoder(IoSession session) { - ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER); - if (decoder == null) { - return; - } - - try { - decoder.dispose(session); - } catch (Exception e) { - LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); - } - } - - /** - * Return a reference to the decoder callback. If it's not already created - * and stored into the session, we create a new instance. - */ - private ProtocolDecoderOutput getDecoderOut(IoSession session) { - ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolDecoderOutputImpl(); - session.setAttribute(DECODER_OUT, out); - } - - return out; - } - - private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { - ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); - - if (out == null) { - // Create a new instance, and stores it into the session - out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); - session.setAttribute(ENCODER_OUT, out); - } - - return out; - } - - /** - * Remove the decoder callback from the session's attributes. - */ - private void disposeDecoderOut(IoSession session) { - session.removeAttribute(DECODER_OUT); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java deleted file mode 100644 index a5fd805263..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkReader.java +++ /dev/null @@ -1,157 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import com.bakdata.conquery.io.jackson.JacksonUtil; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.mina.core.buffer.IoBuffer; -import org.apache.mina.core.session.AttributeKey; -import org.apache.mina.core.session.IoSession; -import org.apache.mina.filter.codec.CumulativeProtocolDecoder; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; - -@Slf4j @RequiredArgsConstructor -public class ChunkReader extends CumulativeProtocolDecoder { - - private static final AttributeKey MESSAGE_MANAGER = new AttributeKey(BinaryJacksonCoder.class, "messageManager"); - - private final CQCoder coder; - private final ObjectMapper mapper; - - @Override - protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) { - in.mark(); - if (in.remaining() < ChunkWriter.HEADER_SIZE) { - return false; - } - - boolean last = in.get() == ChunkWriter.LAST_MESSAGE; - int length = in.getInt(); - if(length<0) { - throw new IllegalStateException("Read message length "+length); - } - UUID id = new UUID(in.getLong(), in.getLong()); - - if (in.remaining() < length) { - in.reset(); - return false; - } - - MessageManager messageManager = getMessageManager(session); - - if (last) { - ChunkedMessage chunkedMessage = messageManager.finalBuffer(id, in, length); - - try { - out.write(coder.decode(chunkedMessage)); - } catch (Exception e) { - log.error( - "Failed while deserializing the message {}: `{}` (Trying to create a dump as {}.json", - chunkedMessage, - JacksonUtil.toJsonDebug(chunkedMessage), - id, - e - ); - - dumpFailed(id, chunkedMessage.createInputStream()); - } - } - //if not the last part of the message we just store it - else { - messageManager.addBuffer(id, in, length); - } - - return true; - } - - private void dumpFailed(UUID id, InputStream inputStream) { - Path dumps = Path.of("dumps"); - final File dumpFile = dumps.resolve("reading_" + id + "_" + Math.random() + ".json").toFile(); - - try (InputStream is = inputStream) { - Files.createDirectories(dumps); - - JsonNode tree = mapper.readTree(is); - try(OutputStream os = new FileOutputStream(dumpFile)) { - mapper.copy().enable(SerializationFeature.INDENT_OUTPUT).writeValue(os, tree); - } - } catch (Exception exception) { - log.error("Failed to write the error json dump {}.json", id, exception); - } - } - - private MessageManager getMessageManager(IoSession session) { - MessageManager messageManager = (MessageManager) session.getAttribute(MESSAGE_MANAGER); - - if (messageManager == null) { - messageManager = new MessageManager(); - session.setAttribute(MESSAGE_MANAGER, messageManager); - } - return messageManager; - } - - @Getter @RequiredArgsConstructor - public static class MessageManager { - - private final ConcurrentMap messages = new ConcurrentHashMap<>(); - private UUID lastId = null; - private ChunkedMessage.List lastMessage = null; - - public ChunkedMessage finalBuffer(UUID id, IoBuffer in, int length) { - if(Objects.equals(lastId, id) || messages.containsKey(id)) { - IoBuffer copy = IoBuffer.allocate(length); - copy.put(in.array(), in.arrayOffset() + in.position(), length); - copy.flip(); - in.skip(length); - ChunkedMessage.List chunkedMessage = getChunkedMessage(id); - remove(id); - chunkedMessage.addBuffer(copy); - return chunkedMessage; - } - return new ChunkedMessage.Singleton(in.getSlice(length)); - } - - public ChunkedMessage addBuffer(UUID id, IoBuffer in, int length) { - IoBuffer copy = IoBuffer.allocate(length); - copy.put(in.array(), in.arrayOffset() + in.position(), length); - copy.flip(); - in.skip(length); - ChunkedMessage.List chunkedMessage = getChunkedMessage(id); - chunkedMessage.addBuffer(copy); - return chunkedMessage; - } - - private ChunkedMessage.List getChunkedMessage(UUID id) { - if(id.equals(lastId)) { - return lastMessage; - } - ChunkedMessage.List msg = messages.computeIfAbsent(id, a->new ChunkedMessage.List()); - lastId = id; - lastMessage = msg; - return msg; - } - - private void remove(UUID id) { - if(lastId == id) { - lastId = null; - lastMessage = null; - } - messages.remove(id); - } - - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java deleted file mode 100644 index c5e38d316f..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkWriter.java +++ /dev/null @@ -1,123 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.UUID; - -import com.bakdata.conquery.models.config.ClusterConfig; -import com.bakdata.conquery.util.SoftPool; -import com.google.common.primitives.Ints; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.mina.core.buffer.IoBuffer; -import org.apache.mina.core.session.IoSession; -import org.apache.mina.filter.codec.ProtocolEncoderAdapter; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; - -@Slf4j -@RequiredArgsConstructor -public class ChunkWriter extends ProtocolEncoderAdapter { - - public static final int HEADER_SIZE = Integer.BYTES + Byte.BYTES + 2 * Long.BYTES; - public static final byte LAST_MESSAGE = 1; - public static final byte CONTINUED_MESSAGE = 0; - @SuppressWarnings("rawtypes") - private final CQCoder coder; - private final SoftPool bufferPool; - - public ChunkWriter(ClusterConfig config, CQCoder coder) { - this.coder = coder; - int bufferSize = Ints.checkedCast(config.getMessageChunkSize().toBytes()); - bufferPool = new SoftPool<>(config, () -> IoBuffer.allocate(bufferSize)); - } - - @SuppressWarnings("unchecked") - @Override - public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { - Chunkable ch = coder.encode(message); - try (ChunkOutputStream cos = new ChunkOutputStream(ch.getId(), out)) { - ch.writeMessage(cos); - } - } - - @RequiredArgsConstructor - private class ChunkOutputStream extends OutputStream { - private final UUID id; - private final ProtocolEncoderOutput out; - private IoBuffer buffer = null; - private boolean closed = false; - - @Override - public void write(int b) throws IOException { - if (closed) { - throw new IllegalStateException(); - } - newBuffer(1); - buffer.put((byte) b); - } - - private void newBuffer(int required) { - if (buffer == null || buffer.remaining() < required) { - if (buffer != null) { - finishBuffer(false); - } - buffer = bufferPool.borrow(); - buffer.position(HEADER_SIZE); - } - } - - private void finishBuffer(boolean end) { - buffer.flip(); - if (buffer.remaining() < HEADER_SIZE) { - throw new IllegalStateException("Buffer of size %s is too small for header of length %s".formatted(buffer.remaining(), HEADER_SIZE)); - } - buffer.put(0, end ? LAST_MESSAGE : CONTINUED_MESSAGE); - buffer.putInt(Byte.BYTES, buffer.remaining() - HEADER_SIZE); - buffer.putLong(Byte.BYTES + Integer.BYTES, id.getMostSignificantBits()); - buffer.putLong(Byte.BYTES + Integer.BYTES + Long.BYTES, id.getLeastSignificantBits()); - out.write(buffer); - final IoBuffer currentBuffer = buffer; - out.flush().addListener(future -> { - currentBuffer.clear(); - bufferPool.offer(currentBuffer); - }); - buffer = null; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (closed) { - throw new IllegalStateException(); - } - if (b == null) { - throw new NullPointerException(); - } - else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } - else if (len == 0) { - return; - } - - while (len > 0) { - if (buffer == null || !buffer.hasRemaining()) { - newBuffer(len); - } - - int write = Math.min(len, buffer.remaining()); - buffer.put(b, off, write); - len -= write; - off += write; - } - } - - @Override - public void close() throws IOException { - if (!closed) { - newBuffer(0); - finishBuffer(true); - closed = true; - } - } - } -} \ No newline at end of file diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/Chunkable.java b/backend/src/main/java/com/bakdata/conquery/io/mina/Chunkable.java deleted file mode 100644 index c5c3ae65f2..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/Chunkable.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.io.OutputStream; -import java.util.UUID; - -import com.fasterxml.jackson.databind.ObjectWriter; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@RequiredArgsConstructor -@Getter -@Slf4j -public class Chunkable { - - private final UUID id; - private final ObjectWriter writer; - private final Object message; - - public void writeMessage(OutputStream out) { - try (OutputStream os = out) { - writer.writeValue(os, message); - } - catch (Exception e) { - log.error("Failed to write message {}: {}", id, message, e); - } - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkedMessage.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkedMessage.java deleted file mode 100644 index ee645c4c70..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkedMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.bakdata.conquery.io.mina; - -import java.util.ArrayList; -import java.util.stream.Collectors; - -import com.bakdata.conquery.io.jackson.JacksonUtil; -import com.bakdata.conquery.util.io.EndCheckableInputStream; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import org.apache.mina.core.buffer.IoBuffer; - -public interface ChunkedMessage { - - long size(); - EndCheckableInputStream createInputStream(); - - @Getter @RequiredArgsConstructor - class Singleton implements ChunkedMessage { - - private final IoBuffer buffer; - - @Override - public EndCheckableInputStream createInputStream() { - return new EndCheckableInputStream(JacksonUtil.stream(buffer)); - } - - @Override - public long size() { - return buffer.remaining(); - } - - @Override - public String toString() { - return "ChunkedMessage [buffers=" + buffer.limit() + "]"; - } - } - - @Getter @RequiredArgsConstructor - class List implements ChunkedMessage { - - private final java.util.List buffers = new ArrayList<>(); - - @Override - public EndCheckableInputStream createInputStream() { - return new EndCheckableInputStream(JacksonUtil.stream(buffers)); - } - - public void addBuffer(IoBuffer copy) { - buffers.add(copy); - } - - @Override - public long size() { - long size = 0; - for(IoBuffer b:buffers) { - size+=b.remaining(); - } - return size; - } - - @Override - public String toString() { - return "ChunkedMessage [buffers=" + buffers.stream().map(b->b.limit()).collect(Collectors.toList()) + "]"; - } - } -} \ No newline at end of file diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java new file mode 100644 index 0000000000..48f588421d --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java @@ -0,0 +1,123 @@ +package com.bakdata.conquery.io.mina; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import io.dropwizard.util.DataSize; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.filterchain.IoFilterAdapter; +import org.apache.mina.core.future.DefaultWriteFuture; +import org.apache.mina.core.future.IoFuture; +import org.apache.mina.core.future.IoFutureListener; +import org.apache.mina.core.future.WriteFuture; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.DefaultWriteRequest; +import org.apache.mina.core.write.WriteRequest; +import org.jetbrains.annotations.NotNull; + +/** + * Chunks messages to fit them in the socket send buffer size, iff they are larger than the socket send buffer. + * The given ioBuffer is simply sliced into smaller ioBuffers up to the size of {@link ChunkingFilter#socketSendBufferSize}. + */ +@RequiredArgsConstructor +@Slf4j +public class ChunkingFilter extends IoFilterAdapter { + + private final int socketSendBufferSize; + + + + + @Override + public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { + if (!(writeRequest.getMessage() instanceof IoBuffer ioBuffer)) { + throw new IllegalStateException("Filter was added at the wrong place in the filter chain. Only expecting IoBuffers here. Got: " + writeRequest.getMessage()); + } + + // The first 4 bytes hold the object length in bytes + int objectLength = ioBuffer.getInt(ioBuffer.position()); + + + if (objectLength < socketSendBufferSize) { + // IoBuffer is shorter than socket buffer, we can just send it. + log.trace("Sending buffer without chunking: {} (limit = {})", DataSize.bytes(objectLength), DataSize.bytes(socketSendBufferSize)); + super.filterWrite(nextFilter, session, writeRequest); + return; + } + + // Split buffers + int oldPos = ioBuffer.position(); + int oldLimit = ioBuffer.limit(); + int totalSize = ioBuffer.remaining(); + + // TODO unsure if Atomic is needed here + final AtomicInteger writtenChunks = new AtomicInteger(); + final int totalChunks = divideAndRoundUp(totalSize, socketSendBufferSize); + + + ioBuffer.limit(oldPos + socketSendBufferSize); + int newLimit = ioBuffer.limit(); + + // Send the first resized (original) buffer + int chunkCount = 1; + log.trace("Sending {}. chunk: {} byte", chunkCount, ioBuffer.remaining()); + DefaultWriteFuture future = new DefaultWriteFuture(session); + + IoFutureListener listener = handleWrittenChunk(writeRequest, writtenChunks, totalChunks); + future.addListener(listener); + nextFilter.filterWrite(session, new DefaultWriteRequest(ioBuffer, future)); + + int remainingBytes = oldLimit - newLimit; + + do { + // Size a new Buffer + int nextBufSize = Math.min(remainingBytes, socketSendBufferSize); + IoBuffer nextBuffer = ioBuffer.duplicate(); + nextBuffer.limit(newLimit + nextBufSize); + nextBuffer.position(newLimit); + + // Write chunked buffer + chunkCount++; + log.trace("Sending {}. chunk: {} byte", chunkCount, nextBufSize); + future = new DefaultWriteFuture(session); + + nextFilter.filterWrite(session, new DefaultWriteRequest(nextBuffer, future)); + + future.addListener(listener); + + // Recalculate for next iteration + newLimit = newLimit + nextBufSize; + remainingBytes = remainingBytes - nextBufSize; + + } while(remainingBytes > 0); + } + + private static @NotNull IoFutureListener handleWrittenChunk(WriteRequest writeRequest, AtomicInteger writtenChunks, int totalChunks) { + return f -> { + // Count written chunk and notify original writeRequest on error or success + + WriteFuture chunkFuture = (WriteFuture) f; + WriteFuture originalFuture = writeRequest.getFuture(); + if (!chunkFuture.isWritten()) { + log.warn("Failed to write chunk"); + if (!originalFuture.isDone()) { + originalFuture.setException(new IllegalStateException("Failed to write a chunked ioBuffer", chunkFuture.getException())); + } + return; + } + int writtenChunk = writtenChunks.incrementAndGet(); + if (writtenChunk >= totalChunks) { + log.trace("Sent all {} chunks", writtenChunk); + originalFuture.setWritten(); + } + }; + } + + public static int divideAndRoundUp(int num, int divisor) { + // only for positive values + return (num + divisor - 1) / divisor; + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolDecoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolDecoder.java new file mode 100644 index 0000000000..9431afe8ae --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolDecoder.java @@ -0,0 +1,70 @@ +package com.bakdata.conquery.io.mina; + +import java.io.IOException; + +import com.bakdata.conquery.io.jackson.JacksonUtil; +import com.fasterxml.jackson.databind.ObjectReader; +import com.google.common.base.Stopwatch; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.buffer.BufferDataException; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.codec.CumulativeProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; + +@Slf4j +@RequiredArgsConstructor +public class JacksonProtocolDecoder extends CumulativeProtocolDecoder { + + private final ObjectReader objectReader; + + @Override + protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) { + if (!in.prefixedDataAvailable(4, Integer.MAX_VALUE)) { + // Not enough data available, cumulate more + return false; + } + + int length = in.getInt(); + if (length <= 4) { + throw new BufferDataException("Object length should be greater than 4: " + length); + } + + // Resize limit to frame only the object that we want to read now + int oldLimit = in.limit(); + int beforeReadPosition = in.position(); + in.limit(in.position() + length); + int objectEndLimit = in.limit(); + + try { + // Read the object we are interested in + Stopwatch stopwatch = Stopwatch.createStarted(); + log.trace("BEGIN Decoding message"); + Object o = objectReader.readValue(in.asInputStream()); + log.trace("FINISHED Decoding message in {}", stopwatch); + + out.write(o); + } + catch (IOException e) { // Includes JacksonException + String debuggedMessage = "enable TRACE for Message"; + if (log.isTraceEnabled()) { + // Rewind ordinary read attempt + in.position(beforeReadPosition); + + debuggedMessage = JacksonUtil.toJsonDebug(in.asInputStream()); + + } + log.error("Failed to decode message: {}", debuggedMessage , e); + } + finally { + // Set back the old limit, as the in buffer might already have data for a new object + in.limit(oldLimit); + + // If the debugging decoder did not read all bytes: forward the position to this object's supposed end + in.position(objectEndLimit); + } + + return true; + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java new file mode 100644 index 0000000000..838738f8b3 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java @@ -0,0 +1,55 @@ +package com.bakdata.conquery.io.mina; + +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.base.Stopwatch; +import io.dropwizard.util.DataSize; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.serialization.ObjectSerializationEncoder; + +@Slf4j +@RequiredArgsConstructor +public class JacksonProtocolEncoder extends ObjectSerializationEncoder { + + + private final ObjectWriter objectWriter; + + @Getter + @Setter + private int initialBufferCapacityBytes = 64; + + @Override + public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { + IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false); + buf.setAutoExpand(true); + + + int oldPos = buf.position(); + buf.skip(4); // Make a room for the length field. + + Stopwatch stopwatch = Stopwatch.createStarted(); + log.trace("BEGIN Encoding message"); + objectWriter.writeValue(buf.asOutputStream(), message); + + int objectSize = buf.position() - 4; + if (objectSize > getMaxObjectSize()) { + throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + getMaxObjectSize() + ')'); + } + + // Fill the length field + int newPos = buf.position(); + buf.position(oldPos); + buf.putInt(newPos - oldPos - 4); + buf.position(newPos); + + buf.flip(); + log.trace("FINISHED Encoding message in {}. Buffer size: {}. Message: {}", stopwatch, DataSize.bytes(buf.remaining()), message); + + out.write(buf); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java index f79dc4d820..a817b47110 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/MdcFilter.java @@ -10,7 +10,7 @@ public class MdcFilter extends CommonEventFilter { private final ThreadLocal callDepth = ThreadLocal.withInitial(() -> 0); - private final String locationFmt; + private final String location; /** * Adapted from {@link org.apache.mina.filter.logging.MdcInjectionFilter} @@ -25,7 +25,7 @@ protected void filter(IoFilterEvent event) throws Exception { if (currentCallDepth == 0) { /* copy context to the MDC when necessary. */ - ConqueryMDC.setLocation(String.format(locationFmt, event.getSession().getLocalAddress().toString())); + ConqueryMDC.setLocation(location + String.format("[%s]", event.getSession().getLocalAddress().toString())); } try { diff --git a/backend/src/main/java/com/bakdata/conquery/io/result/excel/ExcelRenderer.java b/backend/src/main/java/com/bakdata/conquery/io/result/excel/ExcelRenderer.java index 3169c321be..370d5eec4e 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/result/excel/ExcelRenderer.java +++ b/backend/src/main/java/com/bakdata/conquery/io/result/excel/ExcelRenderer.java @@ -12,13 +12,12 @@ import c10n.C10N; import com.bakdata.conquery.internationalization.ExcelSheetNameC10n; -import com.bakdata.conquery.models.common.CDate; +import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ExcelConfig; import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.i18n.I18n; import com.bakdata.conquery.models.identifiable.mapping.PrintIdMapper; -import com.bakdata.conquery.models.identifiable.ids.specific.UserId; import com.bakdata.conquery.models.query.PrintSettings; import com.bakdata.conquery.models.query.SingleTableResult; import com.bakdata.conquery.models.query.resultinfo.ResultInfo; @@ -54,6 +53,7 @@ public class ExcelRenderer { private final ExcelConfig config; private final PrintSettings settings; private final ImmutableMap styles; + public ExcelRenderer(ExcelConfig config, PrintSettings settings) { workbook = new SXSSFWorkbook(); this.config = config; @@ -61,11 +61,12 @@ public ExcelRenderer(ExcelConfig config, PrintSettings settings) { this.settings = settings; } - public void renderToStream(List idHeaders, E exec, OutputStream outputStream, OptionalLong limit, PrintSettings printSettings) + public void renderToStream( + List idHeaders, E exec, OutputStream outputStream, OptionalLong limit, PrintSettings printSettings, MetaStorage storage) throws IOException { final List resultInfosExec = exec.getResultInfos(); - setMetaData(exec); + setMetaData(exec, storage); final SXSSFSheet sheet = workbook.createSheet(C10N.get(ExcelSheetNameC10n.class, I18n.LOCALE.get()).result()); try { @@ -91,12 +92,21 @@ public void renderToStream(List /** * Include meta data in the xlsx such as the title, owner/author, tag and the name of this instance. */ - private void setMetaData(E exec) { + private void setMetaData(E exec, MetaStorage metaStorage) { final POIXMLProperties.CoreProperties coreProperties = workbook.getXSSFWorkbook().getProperties().getCoreProperties(); coreProperties.setTitle(exec.getLabelWithoutAutoLabelSuffix()); - final UserId owner = exec.getOwner(); - coreProperties.setCreator(owner != null ? owner.resolve().getLabel() : config.getApplicationName()); + String creator = config.getApplicationName(); + + if (exec.getOwner() != null) { + final User user = metaStorage.get(exec.getOwner()); + + if (user != null) { + creator = user.getLabel(); + } + } + + coreProperties.setCreator(creator); coreProperties.setKeywords(String.join(" ", exec.getTags())); final POIXMLProperties.ExtendedProperties extendedProperties = workbook.getXSSFWorkbook().getProperties().getExtendedProperties(); extendedProperties.setApplication(config.getApplicationName()); @@ -180,7 +190,8 @@ private int writeBody( // Row 0 is the Header the data starts at 1 final AtomicInteger currentRow = new AtomicInteger(1); - final TypeWriter[] writers = infos.stream().map(info -> writer(info.getType(), info.createPrinter(printerFactory, settings), settings)).toArray(TypeWriter[]::new); + final TypeWriter[] writers = + infos.stream().map(info -> writer(info.getType(), info.createPrinter(printerFactory, settings), settings)).toArray(TypeWriter[]::new); final PrintIdMapper idMapper = settings.getIdMapper(); final int writtenLines = resultLines.mapToInt(l -> writeRowsForEntity(infos, l, currentRow, sheet, writers, idMapper)).sum(); @@ -215,10 +226,32 @@ private void postProcessTable(SXSSFSheet sheet, XSSFTable table, int writtenLine sheet.createFreezePane(size, 1); } + private static TypeWriter writer(ResultType type, Printer printer, PrintSettings settings) { + if (type instanceof ResultType.ListT) { + //Excel cannot handle LIST types so we just toString them. + return (value, cell, styles) -> writeStringCell(cell, value, printer); + } + + return switch (((ResultType.Primitive) type)) { + case BOOLEAN -> (value, cell, styles) -> writeBooleanCell(value, cell, printer); + case INTEGER -> (value, cell, styles) -> writeIntegerCell(value, cell, printer, styles); + case MONEY -> (value, cell, styles) -> writeMoneyCell(value, cell, printer, settings, styles); + case NUMERIC -> (value, cell, styles) -> writeNumericCell(value, cell, printer, styles); + case DATE -> (value, cell, styles) -> writeDateCell(value, cell, printer, styles); + default -> (value, cell, styles) -> writeStringCell(cell, value, printer); + }; + } + /** * Writes the result lines for each entity. */ - private int writeRowsForEntity(List infos, EntityResult internalRow, final AtomicInteger currentRow, SXSSFSheet sheet, TypeWriter[] writers, PrintIdMapper idMapper) { + private int writeRowsForEntity( + List infos, + EntityResult internalRow, + final AtomicInteger currentRow, + SXSSFSheet sheet, + TypeWriter[] writers, + PrintIdMapper idMapper) { final String[] ids = idMapper.map(internalRow).getExternalId(); @@ -286,22 +319,6 @@ private void setColumnWidthsAndUntrack(SXSSFSheet sheet) { } } - private static TypeWriter writer(ResultType type, Printer printer, PrintSettings settings) { - if (type instanceof ResultType.ListT) { - //Excel cannot handle LIST types so we just toString them. - return (value, cell, styles) -> writeStringCell(cell, value, printer); - } - - return switch (((ResultType.Primitive) type)) { - case BOOLEAN -> (value, cell, styles) -> writeBooleanCell(value, cell, printer); - case INTEGER -> (value, cell, styles) -> writeIntegerCell(value, cell, printer, styles); - case MONEY -> (value, cell, styles) -> writeMoneyCell(value, cell, printer, settings, styles); - case NUMERIC -> (value, cell, styles) -> writeNumericCell(value, cell, printer, styles); - case DATE -> (value, cell, styles) -> writeDateCell(value, cell, printer, styles); - default -> (value, cell, styles) -> writeStringCell(cell, value, printer); - }; - } - // Type specific cell writers private static void writeStringCell(Cell cell, Object value, Printer printer) { cell.setCellValue((String) printer.apply(value)); diff --git a/backend/src/main/java/com/bakdata/conquery/io/result/excel/ResultExcelProcessor.java b/backend/src/main/java/com/bakdata/conquery/io/result/excel/ResultExcelProcessor.java index 18a0205c2b..7e2c81194f 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/result/excel/ResultExcelProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/io/result/excel/ResultExcelProcessor.java @@ -10,6 +10,7 @@ import jakarta.ws.rs.core.StreamingOutput; import com.bakdata.conquery.io.result.ResultUtil; +import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.models.auth.entities.Subject; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.config.ExcelConfig; @@ -33,6 +34,8 @@ public class ResultExcelProcessor { // Media type according to https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types public static final MediaType MEDIA_TYPE = new MediaType("application", "vnd.openxmlformats-officedocument.spreadsheetml.sheet"); + + private final MetaStorage metaStorage; private final DatasetRegistry datasetRegistry; private final ConqueryConfig conqueryConfig; @@ -57,7 +60,7 @@ public Response createResult(Su final ExcelRenderer excelRenderer = new ExcelRenderer(excelConfig, settings); final StreamingOutput out = output -> { - excelRenderer.renderToStream(conqueryConfig.getIdColumns().getIdResultInfos(), exec, output, limit, settings); + excelRenderer.renderToStream(conqueryConfig.getIdColumns().getIdResultInfos(), exec, output, limit, settings, metaStorage); log.trace("FINISHED downloading {}", exec.getId()); }; diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java index 0a34c77201..bbc552dafc 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/SerializingStore.java @@ -357,8 +357,8 @@ public static File makeExceptionFileName(@NotNull String keyOfDump, @NotNull Fil } - private static byte[] debugUnGzip(byte[] bytes) throws IOException { - return new GZIPInputStream(new ByteArrayInputStream(bytes)).readAllBytes(); + private static InputStream debugUnGzip(byte[] bytes) throws IOException { + return new GZIPInputStream(new ByteArrayInputStream(bytes)); } private static String sanitiseFileName(@NotNull String name) { diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java index c9828afb38..3bd94f6d4e 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionManager.java @@ -1,14 +1,8 @@ package com.bakdata.conquery.mode.cluster; import java.io.IOException; -import java.net.InetSocketAddress; import jakarta.validation.Validator; -import com.bakdata.conquery.io.mina.BinaryJacksonCoder; -import com.bakdata.conquery.io.mina.CQProtocolCodecFilter; -import com.bakdata.conquery.io.mina.ChunkReader; -import com.bakdata.conquery.io.mina.ChunkWriter; -import com.bakdata.conquery.io.mina.MdcFilter; import com.bakdata.conquery.io.mina.MinaAttributes; import com.bakdata.conquery.io.mina.NetworkSession; import com.bakdata.conquery.models.config.ConqueryConfig; @@ -29,7 +23,6 @@ import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; -import org.apache.mina.transport.socket.nio.NioSocketAcceptor; /** * Manager of the connection from the manager to the ConQuery shards. @@ -105,16 +98,10 @@ else if (toManagerNode instanceof SlowMessage slowMessage) { } public void start() throws IOException { - acceptor = new NioSocketAcceptor(); - acceptor.getFilterChain().addFirst("mdc", new MdcFilter("Manager[%s]")); - final ObjectMapper om = internalMapperFactory.createManagerCommunicationMapper(datasetRegistry); - final BinaryJacksonCoder coder = new BinaryJacksonCoder(datasetRegistry, validator, om); - acceptor.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om))); - acceptor.setHandler(this); - acceptor.getSessionConfig().setAll(config.getCluster().getMina()); - acceptor.bind(new InetSocketAddress(config.getCluster().getPort())); + acceptor = config.getCluster().getClusterAcceptor(om, this, "Manager"); + log.info("Started ManagerNode @ {}", acceptor.getLocalAddress()); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java index 4d108b330a..0b3e4ac643 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterConnectionShard.java @@ -5,11 +5,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import com.bakdata.conquery.io.mina.BinaryJacksonCoder; -import com.bakdata.conquery.io.mina.CQProtocolCodecFilter; -import com.bakdata.conquery.io.mina.ChunkReader; -import com.bakdata.conquery.io.mina.ChunkWriter; -import com.bakdata.conquery.io.mina.MdcFilter; import com.bakdata.conquery.io.mina.NetworkSession; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.jobs.JobManager; @@ -119,7 +114,7 @@ private void connectToCluster() { disconnectFromCluster(); - connector = getClusterConnector(workers); + connector = getClusterConnector(); while (true) { try { @@ -166,17 +161,10 @@ private void disconnectFromCluster() { } @NotNull - private NioSocketConnector getClusterConnector(ShardWorkers workers) { + private NioSocketConnector getClusterConnector() { ObjectMapper om = internalMapperFactory.createShardCommunicationMapper(); - final NioSocketConnector connector = new NioSocketConnector(); - - final BinaryJacksonCoder coder = new BinaryJacksonCoder(workers, environment.getValidator(), om); - connector.getFilterChain().addFirst("mdc", new MdcFilter("Shard[%s]")); - connector.getFilterChain().addLast("codec", new CQProtocolCodecFilter(new ChunkWriter(config.getCluster(), coder), new ChunkReader(coder, om))); - connector.setHandler(this); - connector.getSessionConfig().setAll(config.getCluster().getMina()); - return connector; + return config.getCluster().getClusterConnector(om, this, "Shard"); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterEntityResolver.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterEntityResolver.java index 8aa3d7c720..08fd8a6027 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterEntityResolver.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterEntityResolver.java @@ -1,15 +1,13 @@ package com.bakdata.conquery.mode.cluster; -import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.collectExtraData; -import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.readDates; -import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.tryResolveId; -import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.verifyOnlySingles; +import static com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; +import jakarta.validation.constraints.NotEmpty; import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolver; import com.bakdata.conquery.apiv1.query.concept.specific.external.EntityResolverUtil; @@ -19,7 +17,6 @@ import com.bakdata.conquery.models.identifiable.mapping.ExternalId; import com.bakdata.conquery.util.DateReader; import com.bakdata.conquery.util.io.IdColumnUtil; -import jakarta.validation.constraints.NotEmpty; public class ClusterEntityResolver implements EntityResolver { @@ -58,18 +55,18 @@ public ResolveStatistic resolveEntities( final String[] row = values[rowNum]; - if (rowDates[rowNum] == null) { - unresolvedDate.add(row); - continue; - } - + // Try to resolve the id first, because it has higher priority for the uploader than the dates String resolvedId = tryResolveId(row, readers, mapping); - if (resolvedId == null) { unresolvedId.add(row); continue; } + if (rowDates[rowNum] == null) { + unresolvedDate.add(row); + continue; + } + // read the dates from the row resolved.put(resolvedId, rowDates[rowNum]); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java index fed8e82097..7026df5796 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java @@ -18,10 +18,10 @@ import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.events.Bucket; +import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.TableId; -import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; @@ -35,6 +35,7 @@ import lombok.AllArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.mina.core.future.WriteFuture; /** * Handler of {@link Import} requests that realizes them both on the manager and the cluster's shards. @@ -48,10 +49,10 @@ public class ClusterImportHandler implements ImportHandler { @SneakyThrows @Override public void updateImport(Namespace namespace, InputStream inputStream) { - handleImport(namespace, inputStream, true, datasetRegistry); + handleImport(namespace, inputStream, true); } - private static void handleImport(Namespace namespace, InputStream inputStream, boolean update, DatasetRegistry datasetRegistry) throws IOException { + private static void handleImport(Namespace namespace, InputStream inputStream, boolean update) throws IOException { try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { // We parse semi-manually as the incoming file consist of multiple documents we read progressively: // 1) the header to check metadata @@ -61,10 +62,10 @@ private static void handleImport(Namespace namespace, InputStream inputStream, b final Table table = validateImportable(((DistributedNamespace) namespace), header, update); - readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser, datasetRegistry); + readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser); try(Stream> allConcepts = namespace.getStorage().getAllConcepts();) { - clearDependentConcepts(allConcepts, table); + clearDependentConcepts(allConcepts, table.getId()); } } } @@ -110,7 +111,7 @@ else if (processedImport != null) { return table; } - private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader, DatasetRegistry datasetRegistry) { + private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) { final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); final ImportId importId = new ImportId(tableId, header.getName()); @@ -133,14 +134,20 @@ private static void readAndDistributeImport(DistributedNamespace namespace, Tabl final Bucket bucket = Bucket.fromPreprocessed(table, container, imp); - log.trace("DONE reading bucket `{}`, contains {} entities.", bucket.getId(), bucket.entities().size()); + final BucketId bucketId = bucket.getId(); + log.trace("DONE reading bucket `{}`, contains {} entities.", bucketId, bucket.entities().size()); - final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucket.getId()); + final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucketId); - sendBucket(bucket, responsibleWorker); + sendBucket(bucket, responsibleWorker).addListener((f) -> { + if(((WriteFuture)f).isWritten()) { + log.trace("Sent Bucket {}", bucketId); + return; + } + log.warn("Failed to send Bucket {}", bucketId); + }); // NOTE: I want the bucket to be GC'd as early as possible, so I just store the part(s) I need later. - collectedEntities.put(bucket.getBucket(), bucket.entities()); } @@ -152,10 +159,10 @@ private static void readAndDistributeImport(DistributedNamespace namespace, Tabl } - private static void clearDependentConcepts(Stream> allConcepts, Table table) { + private static void clearDependentConcepts(Stream> allConcepts, TableId table) { allConcepts.map(Concept::getConnectors) .flatMap(List::stream) - .filter(con -> con.getResolvedTableId().equals(table.getId())) + .filter(con -> con.getResolvedTableId().equals(table)) .map(Connector::getConcept) .forEach(Concept::clearMatchingStats); } @@ -163,20 +170,19 @@ private static void clearDependentConcepts(Stream> allConcepts, Table /** * select, then send buckets. */ - public static WorkerId sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { + public static WriteFuture sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { responsibleWorker.awaitFreeJobQueue(); log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); - responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); + return responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); - return responsibleWorker.getId(); } @SneakyThrows @Override public void addImport(Namespace namespace, InputStream inputStream) { - handleImport(namespace, inputStream, false, datasetRegistry); + handleImport(namespace, inputStream, false); } @Override @@ -186,7 +192,7 @@ public void deleteImport(Import imp) { final DistributedNamespace namespace = datasetRegistry.get(id); try(Stream> allConcepts = namespace.getStorage().getAllConcepts()) { - clearDependentConcepts(allConcepts, imp.getTable().resolve()); + clearDependentConcepts(allConcepts, imp.getTable()); } namespace.getStorage().removeImport(imp.getId()); diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java b/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java index 655e93da5b..72b3f186b2 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/AuthorizationController.java @@ -34,11 +34,12 @@ import org.apache.shiro.SecurityUtils; import org.apache.shiro.authc.pam.FirstSuccessfulStrategy; import org.apache.shiro.authc.pam.ModularRealmAuthenticator; +import org.apache.shiro.lang.util.LifecycleUtils; import org.apache.shiro.mgt.DefaultSecurityManager; import org.apache.shiro.realm.AuthorizingRealm; import org.apache.shiro.realm.Realm; -import org.apache.shiro.util.LifecycleUtils; import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.jetbrains.annotations.NotNull; /** * The central class for the initialization of authorization and authentication. @@ -78,7 +79,7 @@ public final class AuthorizationController implements Managed { @Getter private DropwizardResourceConfig unprotectedAuthAdmin; - public AuthorizationController(MetaStorage storage, ConqueryConfig config, Environment environment, AdminServlet adminServlet) { + public AuthorizationController(@NotNull MetaStorage storage, @NotNull ConqueryConfig config, @NotNull Environment environment, AdminServlet adminServlet) { this.storage = storage; this.config = config; this.environment = environment; diff --git a/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java b/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java index 581b8015f9..31abe04329 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/auth/basic/LocalAuthenticationRealm.java @@ -42,8 +42,8 @@ import org.apache.shiro.authc.AuthenticationToken; import org.apache.shiro.authc.CredentialsException; import org.apache.shiro.authc.IncorrectCredentialsException; +import org.apache.shiro.lang.util.Destroyable; import org.apache.shiro.realm.AuthenticatingRealm; -import org.apache.shiro.util.Destroyable; /** * This realm stores credentials in a local database ({@link XodusStore}). Upon @@ -53,7 +53,7 @@ * authorization related user information that is saved in the * {@link MetaStorage}. So adding or removing a user in this realm does * not change the {@link MetaStorage}. {@link Conquery} interacts with - * this realm using the Shiro frame work. However, endusers can interface it + * this realm using the Shiro framework. However, endusers can interface it * through specific endpoints that are registerd by this realm. */ @Slf4j diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java index f404aa1a48..e94c1ee3ce 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java @@ -1,16 +1,30 @@ package com.bakdata.conquery.models.config; +import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import jakarta.validation.Valid; +import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; +import com.bakdata.conquery.io.mina.ChunkingFilter; +import com.bakdata.conquery.io.mina.JacksonProtocolDecoder; +import com.bakdata.conquery.io.mina.JacksonProtocolEncoder; +import com.bakdata.conquery.io.mina.MdcFilter; +import com.bakdata.conquery.models.messages.network.NetworkMessage; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.core.Configuration; -import io.dropwizard.util.DataSize; import io.dropwizard.util.Duration; import io.dropwizard.validation.PortRange; import lombok.Getter; import lombok.Setter; +import org.apache.mina.core.service.IoHandler; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.socket.DefaultSocketSessionConfig; +import org.apache.mina.transport.socket.nio.NioSocketAcceptor; +import org.apache.mina.transport.socket.nio.NioSocketConnector; @Getter @Setter @@ -22,7 +36,7 @@ public class ClusterConfig extends Configuration { private InetAddress managerURL = InetAddress.getLoopbackAddress(); @Valid @NotNull - private MinaConfig mina = new MinaConfig(); + private DefaultSocketSessionConfig mina = new DefaultSocketSessionConfig(); @Min(1) private int entityBucketSize = 1000; @@ -30,6 +44,24 @@ public class ClusterConfig extends Configuration { private Duration heartbeatTimeout = Duration.minutes(1); private Duration connectRetryTimeout = Duration.seconds(30); + /** + * Defines the maximum buffer size inclusive 4 bytes for a header. Objects larger than this size cannot be sent over the cluster. + *

+ * May only touch this for testing purposes. + */ + @Max(Integer.MAX_VALUE - 4) + @Min(64) // not practical + private int maxIoBufferSizeBytes = Integer.MAX_VALUE - 4; + + /** + * Defines the starting buffer allocation size. Larger can reduce reallocations, but can cause a greater memory demand. + *

+ * May only touch this for testing purposes. + */ + @Max(Integer.MAX_VALUE - 4) + @Min(64) // Mina's default + private int initialIoBufferSizeBytes = 8192; // 8kb + /** * @see com.bakdata.conquery.models.messages.namespaces.specific.CollectColumnValuesJob * @@ -44,27 +76,6 @@ public class ClusterConfig extends Configuration { */ private int networkSessionMaxQueueLength = 5; - /** - * {@link org.apache.mina.core.buffer.IoBuffer} size, that mina allocates. - * We assume a pagesize of 4096 bytes == 4 kibibytes - */ - @NotNull - @Valid - private DataSize messageChunkSize = DataSize.kibibytes(4); - - /** - * How long the soft pool cleaner waits before reducing the pool size down to softPoolBaselineSize. - */ - @NotNull - @Valid - private Duration softPoolCleanerPause = Duration.seconds(10); - - /** - * The number of soft references the soft pool should retain after cleaning. - * The actual number of {@link org.apache.mina.core.buffer.IoBuffer} - */ - private long softPoolBaselineSize = 100; - /** * Amount of backpressure before jobs can volunteer to block to send messages to their shards. *

@@ -72,4 +83,56 @@ public class ClusterConfig extends Configuration { */ @Min(0) private int backpressure = 1500; + + @JsonIgnore + public NioSocketConnector getClusterConnector(ObjectMapper om, IoHandler ioHandler, String mdcLocation) { + + final NioSocketConnector connector = new NioSocketConnector(); + + JacksonProtocolEncoder encoder = new JacksonProtocolEncoder(om.writerFor(NetworkMessage.class)); + encoder.setMaxObjectSize(maxIoBufferSizeBytes); + encoder.setInitialBufferCapacityBytes(initialIoBufferSizeBytes); + + ProtocolCodecFilter codecFilter = new ProtocolCodecFilter( + encoder, + new JacksonProtocolDecoder(om.readerFor(NetworkMessage.class)) + ); + connector.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation)); + if (mina.getSendBufferSize() > 0) { + connector.getFilterChain().addLast("chunk", new ChunkingFilter(mina.getSendBufferSize())); + } + connector.getFilterChain().addLast("codec", codecFilter); + + connector.setHandler(ioHandler); + connector.getSessionConfig().setAll(getMina()); + + return connector; + } + + @JsonIgnore + public NioSocketAcceptor getClusterAcceptor(ObjectMapper om, IoHandler ioHandler, String mdcLocation) throws IOException { + NioSocketAcceptor acceptor = new NioSocketAcceptor(); + + + JacksonProtocolEncoder encoder = new JacksonProtocolEncoder(om.writerFor(NetworkMessage.class)); + encoder.setMaxObjectSize(maxIoBufferSizeBytes); + encoder.setInitialBufferCapacityBytes(initialIoBufferSizeBytes); + + ProtocolCodecFilter codecFilter = new ProtocolCodecFilter( + encoder, + new JacksonProtocolDecoder(om.readerFor(NetworkMessage.class)) + ); + + acceptor.getFilterChain().addFirst("mdc", new MdcFilter(mdcLocation)); + if (mina.getSendBufferSize() > 0) { + acceptor.getFilterChain().addLast("chunk", new ChunkingFilter(mina.getSendBufferSize())); + } + acceptor.getFilterChain().addLast("codec", codecFilter); + + acceptor.setHandler(ioHandler); + acceptor.getSessionConfig().setAll(getMina()); + acceptor.bind(new InetSocketAddress(getPort())); + + return acceptor; + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java index 568a31adfd..a3639bbee3 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/PreviewConfig.java @@ -3,7 +3,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import jakarta.validation.Valid; @@ -180,7 +179,6 @@ public Listresolve) - .filter(Objects::nonNull) .collect(Collectors.toList()); } @@ -191,7 +189,6 @@ public List resolveSearchFilters() { return searchFilters.stream() .map(FilterId::resolve) - .filter(Objects::nonNull) .map(Filter::getId) .toList(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java index de44285ec2..171ead0ee0 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/ValidityDate.java @@ -1,5 +1,6 @@ package com.bakdata.conquery.models.datasets.concepts; +import java.util.function.BiFunction; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -38,6 +39,9 @@ public class ValidityDate extends Labeled implements NamespacedI @EqualsAndHashCode.Exclude private Connector connector; + @JsonIgnore + private BiFunction extractor; + public static ValidityDate create(Column column) { final ValidityDate validityDate = new ValidityDate(); validityDate.setColumn(column.getId()); @@ -53,37 +57,15 @@ public static ValidityDate create(Column startColumn, Column endColumn) { @CheckForNull public CDateRange getValidityDate(int event, Bucket bucket) { - // I spent a lot of time trying to create two classes implementing single/multi-column valditiy dates separately. - // JsonCreator was not happy, and I could not figure out why. This is probably the most performant implementation that's not two classes. - - if (getColumn() != null) { - final Column resolvedColumn = getColumn().resolve(); - if (bucket.has(event, resolvedColumn)) { - return bucket.getAsDateRange(event, resolvedColumn); - } - - return null; + if (extractor == null){ + //TODO this is just a workaround: We should actually be using Initializing, which sadly gives us issues with LoadingUtil + init(); } - final Column startColumn = getStartColumn() != null ? getStartColumn().resolve() : null; - final Column endColumn = getEndColumn() != null ? getEndColumn().resolve() : null; - - final boolean hasStart = bucket.has(event, startColumn); - final boolean hasEnd = bucket.has(event, endColumn); - - if (!hasStart && !hasEnd) { - return null; - } - - final int start = hasStart ? bucket.getDate(event, startColumn) : Integer.MIN_VALUE; - final int end = hasEnd ? bucket.getDate(event, endColumn) : Integer.MAX_VALUE; - - return CDateRange.of(start, end); + return extractor.apply(event, bucket); } - // TODO use Id as parameter - public boolean containsColumn(Column column) { - final ColumnId id = column.getId(); + public boolean containsColumn(ColumnId id) { return id.equals(getColumn()) || id.equals(getStartColumn()) || id.equals(getEndColumn()); } @@ -110,4 +92,37 @@ public DatasetId getDataset() { public ValidityDateId createId() { return new ValidityDateId(connector.getId(), getName()); } + + public void init() { + // Initialize extractor early to avoid resolve and dispatch in very hot code. Hopefully boxing can be elided. + if (column != null) { + final Column resolvedColumn = column.resolve(); + + extractor = (event, bucket) -> { + if (bucket.has(event, resolvedColumn)) { + return bucket.getAsDateRange(event, resolvedColumn); + } + + return null; + }; + return; + } + + final Column resolvedStartColumn = startColumn.resolve(); + final Column resolvedEndColumn = endColumn.resolve(); + + extractor = (event, bucket) -> { + final boolean hasStart = bucket.has(event, resolvedStartColumn); + final boolean hasEnd = bucket.has(event, resolvedEndColumn); + + if (!hasStart && !hasEnd) { + return null; + } + + final int start = hasStart ? bucket.getDate(event, resolvedStartColumn) : Integer.MIN_VALUE; + final int end = hasEnd ? bucket.getDate(event, resolvedEndColumn) : Integer.MAX_VALUE; + + return CDateRange.of(start, end); + }; + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/concept/ConceptColumnSelect.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/concept/ConceptColumnSelect.java index e66d654c6e..cf9a36e003 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/concept/ConceptColumnSelect.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/select/concept/ConceptColumnSelect.java @@ -1,6 +1,5 @@ package com.bakdata.conquery.models.datasets.concepts.select.concept; -import java.util.Collections; import java.util.Set; import com.bakdata.conquery.apiv1.query.concept.specific.CQConcept; @@ -76,6 +75,7 @@ public ResultType getResultType() { @Override public SelectConverter createConverter() { + //TODO bind Select to converter here return new ConceptColumnSelectConverter(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java index 437ee1afb7..3b9c0edba0 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java @@ -21,6 +21,7 @@ import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId; +import com.bakdata.conquery.models.identifiable.ids.specific.ConceptElementId; import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId; import com.bakdata.conquery.models.identifiable.ids.specific.ConnectorId; import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; @@ -276,8 +277,12 @@ private int getBucket(String id) { /** * Collects all Entites, that have any of the concepts on the connectors in a specific time. */ - public Set getEntitiesWithConcepts(Collection> concepts, Set connectors, CDateSet restriction) { - final long requiredBits = ConceptNode.calculateBitMask(concepts); + public Set getEntitiesWithConcepts(Collection> concepts, Set connectors, CDateSet restriction) { + List> resolvedConcepts = concepts.stream() + .>map(ConceptElementId::resolve) + .toList(); + + final long requiredBits = ConceptNode.calculateBitMask(resolvedConcepts); final Set out = new HashSet<>(); diff --git a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java index b0ae7c9b8f..36098fa984 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java +++ b/backend/src/main/java/com/bakdata/conquery/models/execution/ManagedExecution.java @@ -314,9 +314,12 @@ public void setStatusBase(@NonNull Subject subject, @NonNull ExecutionStatus sta status.setContainsDates(containsDates); if (owner != null) { - User user = owner.resolve(); - status.setOwner(user.getId()); - status.setOwnerName(user.getLabel()); + User user = metaStorage.get(owner); + + if(user != null) { + status.setOwner(user.getId()); + status.setOwnerName(user.getLabel()); + } } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java b/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java index 8e1ef40ecf..d86dfb1491 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/configs/FormConfig.java @@ -6,7 +6,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; @@ -94,7 +93,7 @@ public FormConfigId createId() { * actual form field values. */ public FormConfigOverviewRepresentation overview(MetaStorage storage, Subject subject) { - String ownerName = getOwnerName(); + String ownerName = getOwnerName(storage); return FormConfigOverviewRepresentation.builder() .id(getId()) @@ -110,15 +109,26 @@ public FormConfigOverviewRepresentation overview(MetaStorage storage, Subject su } @JsonIgnore - private @Nullable String getOwnerName() { - return Optional.ofNullable(owner).map(UserId::resolve).map(User.class::cast).map(User::getLabel).orElse(null); + @Nullable + private String getOwnerName(MetaStorage metaStorage) { + if (owner == null){ + return null; + } + + User resolved = metaStorage.get(owner); + + if (resolved == null){ + return null; + } + + return resolved.getLabel(); } /** * Return the full representation of the configuration with the configured form fields and meta data. */ public FormConfigFullRepresentation fullRepresentation(MetaStorage storage, Subject requestingUser){ - String ownerName = getOwnerName(); + String ownerName = getOwnerName(storage); /* Calculate which groups can see this query. * This is usually not done very often and should be reasonable fast, so don't cache this. diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java b/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java index 1a1c1c2f82..96270441f3 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/util/DateContext.java @@ -96,7 +96,7 @@ public static Function> getDateRangeSubdivider(Align int alignedPerResolution = resolution.getAmountForAlignment(alignment).orElseThrow(() -> new ConqueryError.ExecutionCreationPlanDateContextError(alignment, resolution)); if (alignedPerResolution == 1) { - // When the alignment fits the resolution we can use the the alignment subdivision directly + // When the alignment fits the resolution we can use the alignment subdivision directly return (dateRange) -> alignment.getSubdivider().apply(dateRange); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java index 17f8e43906..79078d30c9 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/CollectColumnValuesJob.java @@ -37,6 +37,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.mina.core.future.WriteFuture; /** @@ -71,7 +72,8 @@ public void react(Worker context) throws Exception { .collect(Collectors.groupingBy(Bucket::getTable)); } - final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS)); + BasicThreadFactory threadFactory = (new BasicThreadFactory.Builder()).namingPattern(this.getClass().getSimpleName() + "-Worker-%d").build(); + final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS, threadFactory)); final AtomicInteger done = new AtomicInteger(); @@ -88,7 +90,7 @@ public void react(Worker context) throws Exception { .flatMap(bucket -> ((StringStore) bucket.getStore(column)).streamValues()) .collect(Collectors.toSet()); - log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet()); + log.trace("Finished collecting {} values for column {}", values.size(), column); // Chunk values, to produce smaller messages Iterable> partition = Iterables.partition(values, columValueChunkSize); @@ -97,14 +99,18 @@ public void react(Worker context) throws Exception { column.getId(), values.size(), columValueChunkSize ); + int i = 0; for (List chunk : partition) { // Send values to manager RegisterColumnValues message = new RegisterColumnValues(getMessageId(), context.getInfo().getId(), column.getId(), chunk); WriteFuture send = context.send(message); - send.awaitUninterruptibly(); + log.trace("Finished sending chunk {} for column '{}'", i++, column.getId()); } + + getProgressReporter().report(1); + log.trace("Finished collections values for column {} as number {}", column, done.incrementAndGet()); }); } ) @@ -128,6 +134,7 @@ public void react(Worker context) throws Exception { // We may do this, because we own this specific ExecutorService. jobsExecutorService.shutdown(); + getProgressReporter().done(); log.info("Finished collecting values from these columns: {}", Arrays.toString(columns.toArray())); context.send(new FinalizeReactionMessage(getMessageId(), context.getInfo().getId())); diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java index 72ec04754e..394b119bff 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java @@ -83,7 +83,6 @@ public void execute() throws Exception { calculateConceptMatches(resolved, matchingStats, worker); final WriteFuture writeFuture = worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), matchingStats)); - writeFuture.awaitUninterruptibly(); progressReporter.report(1); }, worker.getJobsExecutorService()) diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java index 1de21de4b8..28923c7140 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/queryplan/TableExportQueryPlan.java @@ -173,7 +173,7 @@ private Object[] collectRow(int totalColumns, CQTable exportDescription, Bucket for (Column column : connector.getResolvedTable().getColumns()) { // ValidityDates are handled separately. - if (validityDate != null && validityDate.containsColumn(column)){ + if (validityDate != null && validityDate.containsColumn(column.getId())){ continue; } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java index 4984115649..5404597fa7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/FormShardResult.java @@ -27,7 +27,7 @@ public FormShardResult(ManagedExecutionId formId, ManagedExecutionId subQueryId, * Distribute the result to a sub query. */ @Override - public void addResult(DistributedExecutionManager executionManager) { + protected void addResult(DistributedExecutionManager executionManager) { final ManagedInternalForm managedInternalForm = (ManagedInternalForm) executionManager.getExecution(getFormId()); final ManagedQuery subQuery = managedInternalForm.getSubQuery(getQueryId()); diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java b/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java index 8c5a192593..4c827857d6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/results/ShardResult.java @@ -82,7 +82,7 @@ public synchronized void finish(@NonNull List results, Optional groups) { @Path("{" + GROUP_ID + "}/" + USERS_PATH_ELEMENT + "/{" + USER_ID + "}") @POST - public Response addUserToGroup(@PathParam(GROUP_ID) Group group, @PathParam(USER_ID) User user) { + public void addUserToGroup(@PathParam(GROUP_ID) Group group, @PathParam(USER_ID) User user) { processor.addUserToGroup(group, user); - return Response.ok().build(); } @Path("{" + GROUP_ID + "}/" + USERS_PATH_ELEMENT + "/{" + USER_ID + "}") @DELETE - public Response deleteUserFromGroup(@PathParam(GROUP_ID) Group group, @PathParam(USER_ID) UserId user) { + public void deleteUserFromGroup(@PathParam(GROUP_ID) Group group, @PathParam(USER_ID) UserId user) { processor.deleteUserFromGroup(group, user); - return Response.ok().build(); } @Path("{" + GROUP_ID + "}/" + ROLES_PATH_ELEMENT + "/{" + ROLE_ID + "}") @DELETE - public Response deleteRoleFromUser(@PathParam(GROUP_ID) Group group, @PathParam(ROLE_ID) RoleId role) { + public void deleteRoleFromUser(@PathParam(GROUP_ID) Group group, @PathParam(ROLE_ID) RoleId role) { processor.deleteRoleFrom(group, role); - return Response.ok().build(); } @Path("{" + GROUP_ID + "}/" + ROLES_PATH_ELEMENT + "/{" + ROLE_ID + "}") @POST - public Response addRoleToUser(@PathParam(GROUP_ID) Group group, @PathParam(ROLE_ID) Role role) { + public void addRoleToUser(@PathParam(GROUP_ID) Group group, @PathParam(ROLE_ID) Role role) { processor.addRoleTo(group, role); - return Response.ok().build(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/RoleResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/RoleResource.java index 5997d07245..061ff1d150 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/RoleResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/RoleResource.java @@ -13,7 +13,6 @@ import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; import com.bakdata.conquery.models.auth.entities.Role; import com.bakdata.conquery.models.exceptions.JSONException; @@ -29,9 +28,8 @@ public class RoleResource { private final AdminProcessor processor; @POST - public Response postRole(Role role) throws JSONException { + public void postRole(Role role) throws JSONException { processor.addRole(role); - return Response.ok().build(); } @GET @@ -41,14 +39,13 @@ public Collection getRoles() { @Path("{" + ROLE_ID + "}") @GET - public Response getRole(@PathParam(ROLE_ID) Role role) throws JSONException { - return Response.ok(role).build(); + public Role getRole(@PathParam(ROLE_ID) Role role) throws JSONException { + return role; } @Path("{" + ROLE_ID + "}") @DELETE - public Response deleteRole(@PathParam(ROLE_ID) RoleId role) throws JSONException { + public void deleteRole(@PathParam(ROLE_ID) RoleId role) { processor.deleteRole(role); - return Response.ok().build(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UIProcessor.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UIProcessor.java index d2ab8df297..ad631360e0 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UIProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UIProcessor.java @@ -139,6 +139,15 @@ private FrontendRoleContent getFrontendRoleContent(RoleId id) { return FrontendRoleContent.builder().id(id).build(); } + + public FrontendUserContent getUserContent(UserId id) { + User user = getStorage().getUser(id); + if (user != null) { + return getUserContent(user); + } + return FrontendUserContent.builder().id(id).build(); + } + public FrontendUserContent getUserContent(User user) { final Collection availableGroups = new ArrayList<>(getStorage().getAllGroups().toList()); availableGroups.removeIf(g -> g.containsMember(user)); diff --git a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UserResource.java b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UserResource.java index 690083d2d0..8c0748feea 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UserResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/admin/rest/UserResource.java @@ -14,7 +14,6 @@ import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; import com.bakdata.conquery.models.auth.entities.Role; import com.bakdata.conquery.models.auth.entities.User; @@ -35,42 +34,37 @@ public Collection getUsers() { } @POST - public Response postUser(@Valid User user) { + public void postUser(@Valid User user) { processor.addUser(user); - return Response.ok().build(); } @POST @Path("upload") - public Response postUsers(@NotEmpty List users) { + public void postUsers(@NotEmpty List users) { processor.addUsers(users); - return Response.ok().build(); } @Path("{" + USER_ID + "}") @GET - public Response getUser(@PathParam(USER_ID) User user) { - return Response.ok(user).build(); + public User getUser(@PathParam(USER_ID) User user) { + return user; } @Path("{" + USER_ID + "}") @DELETE - public Response deleteUser(@PathParam(USER_ID) UserId user) { + public void deleteUser(@PathParam(USER_ID) UserId user) { processor.deleteUser(user); - return Response.ok().build(); } @Path("{" + USER_ID + "}/" + ROLES_PATH_ELEMENT + "/{" + ROLE_ID + "}") @DELETE - public Response deleteRoleFromUser(@PathParam(USER_ID) User user, @PathParam(ROLE_ID) RoleId role) { + public void deleteRoleFromUser(@PathParam(USER_ID) User user, @PathParam(ROLE_ID) RoleId role) { processor.deleteRoleFrom(user, role); - return Response.ok().build(); } @Path("{" + USER_ID + "}/" + ROLES_PATH_ELEMENT + "/{" + ROLE_ID + "}") @POST - public Response addRoleToUser(@PathParam(USER_ID) User user, @PathParam(ROLE_ID) Role role) { + public void addRoleToUser(@PathParam(USER_ID) User user, @PathParam(ROLE_ID) Role role) { processor.addRoleTo(user, role); - return Response.ok().build(); } } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/api/DatasetQueryResource.java b/backend/src/main/java/com/bakdata/conquery/resources/api/DatasetQueryResource.java index a1e939f1b7..616a29894d 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/api/DatasetQueryResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/api/DatasetQueryResource.java @@ -87,7 +87,7 @@ public ExternalUploadResult upload(@Auth Subject subject, @Valid ExternalUpload @GET - public List getAllQueries(@Auth Subject subject, @QueryParam("all-providers") Optional allProviders) { + public List getAllQueries(@Auth Subject subject, @QueryParam("all-providers") Optional allProviders) { subject.authorize(dataset, Ability.READ); diff --git a/backend/src/main/java/com/bakdata/conquery/resources/api/FilterResource.java b/backend/src/main/java/com/bakdata/conquery/resources/api/FilterResource.java index 76e6aba671..5135140ed8 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/api/FilterResource.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/api/FilterResource.java @@ -43,7 +43,7 @@ public class FilterResource extends HAuthorized { @POST @Path("resolve") public ConceptsProcessor.ResolvedFilterValues resolveFilterValues(FilterValues filterValues) { - subject.isPermitted(filter.getDataset().resolve(), Ability.READ); + subject.isPermitted(filter.getDataset(), Ability.READ); subject.isPermitted(filter.getConnector().findConcept(), Ability.READ); return processor.resolveFilterValues((SelectFilter) filter, filterValues.values()); @@ -53,7 +53,7 @@ public ConceptsProcessor.ResolvedFilterValues resolveFilterValues(FilterValues f @POST @Path("autocomplete") public ConceptsProcessor.AutoCompleteResult autocompleteTextFilter(@Valid FilterResource.AutocompleteRequest request) { - subject.isPermitted(filter.getDataset().resolve(), Ability.READ); + subject.isPermitted(filter.getDataset(), Ability.READ); subject.isPermitted(filter.getConnector().findConcept(), Ability.READ); if (!(filter instanceof SelectFilter)) { diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java index 1bc33168cc..81e8287e6d 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/cqelement/concept/CQConceptConverter.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.ConceptElement; +import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.datasets.concepts.select.Select; import com.bakdata.conquery.models.datasets.concepts.select.concept.ConceptColumnSelect; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild; @@ -194,8 +195,10 @@ private static Stream collectConditions(CQTable cqTable, Concept } private static Optional convertConnectorCondition(CQTable cqTable, SqlFunctionProvider functionProvider) { - return Optional.ofNullable(cqTable.getConnector().resolve().getCondition()) - .map(condition -> condition.convertToSqlCondition(CTConditionContext.create(cqTable.getConnector().resolve(), functionProvider))); + final Connector connector = cqTable.getConnector().resolve(); + + return Optional.ofNullable(connector.getCondition()) + .map(condition -> condition.convertToSqlCondition(CTConditionContext.create(connector, functionProvider))); } private static Optional getDateRestriction(ConversionContext context, Optional validityDate) { diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java index 1dcd4286cc..29f800e979 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/HanaSqlFunctionProvider.java @@ -320,7 +320,7 @@ private ColumnDateRange toColumnDateRange(ValidityDate validityDate) { Column endColumn; // if no end column is present, the only existing column is both start and end of the date range - if (validityDate.getEndColumn() == null) { + if (validityDate.getColumn() != null) { Column column = validityDate.getColumn().resolve(); startColumn = column; endColumn = column; diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java index 8b1d166155..32ea34f4e0 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/dialect/PostgreSqlFunctionProvider.java @@ -122,10 +122,12 @@ public ColumnDateRange forValidityDate(ValidityDate validityDate, CDateRange dat @Override public ColumnDateRange forArbitraryDateRange(DaterangeSelectOrFilter daterangeSelectOrFilter) { String tableName = daterangeSelectOrFilter.getTable().getName(); - if (daterangeSelectOrFilter.getEndColumn() != null) { - return ofStartAndEnd(tableName, daterangeSelectOrFilter.getStartColumn().resolve(), daterangeSelectOrFilter.getEndColumn().resolve()); + + if (daterangeSelectOrFilter.getColumn() != null) { + return ofSingleColumn(tableName, daterangeSelectOrFilter.getColumn().resolve()); } - return ofSingleColumn(tableName, daterangeSelectOrFilter.getColumn().resolve()); + + return ofStartAndEnd(tableName, daterangeSelectOrFilter.getStartColumn().resolve(), daterangeSelectOrFilter.getEndColumn().resolve()); } @Override @@ -311,10 +313,12 @@ private ColumnDateRange toColumnDateRange(CDateRange dateRestriction) { private ColumnDateRange toColumnDateRange(ValidityDate validityDate) { String tableName = validityDate.getConnector().getResolvedTableId().getTable(); - if (validityDate.getEndColumn() != null) { - return ofStartAndEnd(tableName, validityDate.getStartColumn().resolve(), validityDate.getEndColumn().resolve()); + + if (validityDate.getColumn() != null) { + return ofSingleColumn(tableName, validityDate.getColumn().resolve()); } - return ofSingleColumn(tableName, validityDate.getColumn().resolve()); + + return ofStartAndEnd(tableName, validityDate.getStartColumn().resolve(), validityDate.getEndColumn().resolve()); } private ColumnDateRange ofSingleColumn(String tableName, Column column) { diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/aggregator/SumSqlAggregator.java b/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/aggregator/SumSqlAggregator.java index 5974b7536a..684544bb4a 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/aggregator/SumSqlAggregator.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conversion/model/aggregator/SumSqlAggregator.java @@ -98,14 +98,18 @@ private enum SumDistinctCteStep implements CteStep { @Override public ConnectorSqlSelects connectorSelect(SumSelect sumSelect, SelectContext selectContext) { + NameGenerator nameGenerator = selectContext.getNameGenerator(); + String alias = nameGenerator.selectName(sumSelect); + Column sumColumn = sumSelect.getColumn().resolve(); Column subtractColumn = sumSelect.getSubtractColumn() != null ? sumSelect.getSubtractColumn().resolve() : null; + List distinctByColumns = sumSelect.getDistinctByColumn().stream().map(ColumnId::resolve).toList(); - NameGenerator nameGenerator = selectContext.getNameGenerator(); - String alias = nameGenerator.selectName(sumSelect); + ConnectorSqlTables tables = selectContext.getTables(); CommonAggregationSelect sumAggregationSelect; + if (!distinctByColumns.isEmpty()) { SqlIdColumns ids = selectContext.getIds(); sumAggregationSelect = createDistinctSumAggregationSelect(sumColumn, distinctByColumns, alias, ids, tables, nameGenerator); diff --git a/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java b/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java index 98c00a29ad..784e8f9620 100644 --- a/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java +++ b/backend/src/main/java/com/bakdata/conquery/tasks/PermissionCleanupTask.java @@ -133,7 +133,7 @@ public static & Owned, ID extends Id> int del if (wpermission.getInstances().size() != 1) { log.trace("Skipping permission {} because it refers to multiple instances.", wpermission); } - ID executionId = null; + ID executionId; try { executionId = idParser.parse(wpermission.getInstances().iterator().next()); } @@ -144,14 +144,17 @@ public static & Owned, ID extends Id> int del E execution = instanceStorageExtractor.apply(executionId); if (execution == null) { - log.trace("The execution referenced in permission {} does not exist. Skipping permission"); + log.trace("The execution referenced in permission {} does not exist. Skipping permission", wpermission); continue; } if (!user.isOwner(execution)) { - log.trace("The user is not owner of the instance. Keeping the permission. User: {}, Owner: {}, Instance: {}, Permission: {}", user.getId(), execution - .getOwner(), execution - .getId(), wpermission); + log.trace("The user is not owner of the instance. Keeping the permission. User: {}, Owner: {}, Instance: {}, Permission: {}", + user.getId(), + execution.getOwner(), + execution.getId(), + wpermission + ); continue; } diff --git a/backend/src/main/java/com/bakdata/conquery/util/DateReader.java b/backend/src/main/java/com/bakdata/conquery/util/DateReader.java index 83797bb912..9513ee88e5 100644 --- a/backend/src/main/java/com/bakdata/conquery/util/DateReader.java +++ b/backend/src/main/java/com/bakdata/conquery/util/DateReader.java @@ -8,6 +8,7 @@ import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import com.bakdata.conquery.models.common.CDateSet; import com.bakdata.conquery.models.common.daterange.CDateRange; @@ -53,7 +54,7 @@ public class DateReader { * All available formats for parsing. */ @JsonIgnore - private List dateFormats; + private final List dateFormats; /** * Parsed values cache. */ @@ -63,9 +64,9 @@ public class DateReader { .concurrencyLevel(10) .build(CacheLoader.from(this::tryParseDate)); @JsonIgnore - private List rangeStartEndSeperators; + private final List rangeStartEndSeperators; @JsonIgnore - private List dateSetLayouts; + private final List dateSetLayouts; @JsonCreator public DateReader(Set dateParsingFormats, List rangeStartEndSeperators, List dateSetLayouts) { @@ -136,6 +137,7 @@ private CDateRange parseToCDateRange(String value, String sep) { /** * Try parsing the String value to a LocalDate. */ + @Nullable public LocalDate parseToLocalDate(String value) throws ParsingException { if (Strings.isNullOrEmpty(value)) { return null; @@ -153,6 +155,7 @@ public LocalDate parseToLocalDate(String value) throws ParsingException { /** * Try and parse value to CDateSet using all available layouts, but starting at the last known successful one. */ + @Nullable public CDateSet parseToCDateSet(String value) { if (Strings.isNullOrEmpty(value)) { return null; diff --git a/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java b/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java deleted file mode 100644 index 9f58bd4d72..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/util/SoftPool.java +++ /dev/null @@ -1,93 +0,0 @@ -package com.bakdata.conquery.util; - -import java.lang.ref.SoftReference; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import com.bakdata.conquery.models.config.ClusterConfig; -import com.google.common.util.concurrent.Uninterruptibles; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class SoftPool { - - private final ConcurrentLinkedDeque> pool = new ConcurrentLinkedDeque<>(); - private final AtomicLong poolSize = new AtomicLong(0); - private final Supplier supplier; - private final Thread poolCleaner; - private final long softPoolBaselineSize; - private final long cleanerPauseSeconds; - - public SoftPool(ClusterConfig config, Supplier supplier) { - this.supplier = supplier; - - softPoolBaselineSize = config.getSoftPoolBaselineSize(); - cleanerPauseSeconds = config.getSoftPoolCleanerPause().toSeconds(); - - if (softPoolBaselineSize <= 0 || cleanerPauseSeconds <= 0) { - log.debug("Not creating a Cleaner."); - poolCleaner = null; - return; - } - - poolCleaner = new Thread(this::cleanPool, "SoftPool Cleaner"); - // Should not prevent the JVM shutdown -> daemon - poolCleaner.setDaemon(true); - poolCleaner.start(); - } - - /** - * Offer/return a reusable object to the pool. - * - * @param v the object to return to the pool. - */ - public void offer(T v) { - pool.addLast(new SoftReference<>(v)); - - final long currentPoolSize = poolSize.incrementAndGet(); - - log.trace("Pool size: {} (offer)", currentPoolSize); - } - - /** - * Returns a reusable element from the pool if available or - * returns a new element from the provided supplier. - */ - public T borrow() { - SoftReference result; - - // First check the pool for available/returned elements - while ((result = pool.poll()) != null) { - final long currentPoolSize = poolSize.decrementAndGet(); - - log.trace("Pool size: {} (borrow)", currentPoolSize); - - // The pool had an element, inspect if it is still valid - final T elem = result.get(); - if (elem != null) { - // Return valid element - return elem; - } - // Referenced element was already garbage collected. Poll further - } - // Pool was empty -- request a new element - return supplier.get(); - } - - /** - * Trims the pool in a custom interval so that soft references get purged earlier - */ - private void cleanPool() { - while (true) { - Uninterruptibles.sleepUninterruptibly(cleanerPauseSeconds, TimeUnit.SECONDS); - - log.trace("Running pool cleaner"); - while (poolSize.get() > softPoolBaselineSize) { - // Poll until we reached the baseline - borrow(); - } - } - } -} diff --git a/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java b/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java index 7c7c16a04a..0dbf8e02c9 100644 --- a/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java +++ b/backend/src/test/java/com/bakdata/conquery/api/form/config/FormConfigTest.java @@ -297,6 +297,7 @@ public void getConfigs() { @Test public void patchConfig() { + // PREPARE user.addPermission(DatasetPermission.onInstance(Ability.READ, datasetId)); Group group1 = new Group("test1", "test1", storage); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java b/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java index 6504e804a2..5b21e5491b 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/sql/SqlStandaloneCommand.java @@ -3,7 +3,6 @@ import java.util.Collections; import java.util.List; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.commands.ManagerNode; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.commands.StandaloneCommand; @@ -12,9 +11,9 @@ import com.bakdata.conquery.mode.local.LocalManagerProvider; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.worker.LocalNamespace; +import com.bakdata.conquery.util.commands.NoOpConquery; import com.bakdata.conquery.util.io.ConqueryMDC; import io.dropwizard.core.cli.ServerCommand; -import io.dropwizard.core.setup.Bootstrap; import io.dropwizard.core.setup.Environment; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -24,27 +23,11 @@ @Getter public class SqlStandaloneCommand extends ServerCommand implements StandaloneCommand { - private final Conquery conquery; - private ManagerNode managerNode = new ManagerNode(); + private final ManagerNode managerNode = new ManagerNode(); private DelegateManager manager; - private Environment environment; - public SqlStandaloneCommand(Conquery conquery) { - super(conquery, "standalone", "starts a sql server and a client at the same time."); - this.conquery = conquery; - } - - @Override - public void startStandalone(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception { - ConqueryMDC.setLocation("ManagerNode"); - log.debug("Starting ManagerNode"); - this.manager = new LocalManagerProvider(new TestSqlDialectFactory()).provideManager(config, environment); - this.conquery.setManagerNode(managerNode); - this.conquery.run(manager); - // starts the Jersey Server - log.debug("Starting REST Server"); - ConqueryMDC.setLocation(null); - super.run(environment, namespace, config); + public SqlStandaloneCommand() { + super(new NoOpConquery(), "standalone", "starts a sql server and a client at the same time."); } @Override @@ -53,23 +36,14 @@ public List getShardNodes() { } @Override - public void run(Bootstrap bootstrap, Namespace namespace, ConqueryConfig configuration) throws Exception { - environment = new Environment( - bootstrap.getApplication().getName(), - bootstrap.getObjectMapper(), - bootstrap.getValidatorFactory(), - bootstrap.getMetricRegistry(), - bootstrap.getClassLoader(), - bootstrap.getHealthCheckRegistry(), - configuration - ); - configuration.getMetricsFactory().configure(environment.lifecycle(), bootstrap.getMetricRegistry()); - configuration.getServerFactory().configure(environment); - - bootstrap.run(configuration, environment); - startStandalone(environment, namespace, configuration); + protected void run(Environment environment, Namespace namespace, ConqueryConfig configuration) throws Exception { + ConqueryMDC.setLocation("ManagerNode"); + log.debug("Starting ManagerNode"); + this.manager = new LocalManagerProvider(new TestSqlDialectFactory()).provideManager(configuration, environment); + managerNode.run(manager); + // starts the Jersey Server + log.debug("Starting REST Server"); + ConqueryMDC.setLocation(null); + super.run(environment, namespace, configuration); } - - - } diff --git a/backend/src/test/java/com/bakdata/conquery/io/mina/MinaStackTest.java b/backend/src/test/java/com/bakdata/conquery/io/mina/MinaStackTest.java new file mode 100644 index 0000000000..15a3629a66 --- /dev/null +++ b/backend/src/test/java/com/bakdata/conquery/io/mina/MinaStackTest.java @@ -0,0 +1,277 @@ +package com.bakdata.conquery.io.mina; + +import static java.lang.Math.toIntExact; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import com.bakdata.conquery.io.cps.CPSType; +import com.bakdata.conquery.io.jackson.Jackson; +import com.bakdata.conquery.models.config.ClusterConfig; +import com.bakdata.conquery.models.messages.network.NetworkMessage; +import com.bakdata.conquery.models.messages.network.NetworkMessageContext; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dropwizard.util.DataSize; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.future.WriteFuture; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.transport.socket.nio.NioSocketAcceptor; +import org.apache.mina.transport.socket.nio.NioSocketConnector; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@Slf4j +public class MinaStackTest { + + private static final ClusterConfig CLUSTER_CONFIG = new ClusterConfig(); + private static final ObjectMapper OM = Jackson.BINARY_MAPPER.copy(); + private static final ConcurrentLinkedQueue> SERVER_RECEIVED_MESSAGES = new ConcurrentLinkedQueue<>(); + + private static NioSocketAcceptor SERVER; + + @BeforeAll + public static void beforeAll() throws IOException { + + CLUSTER_CONFIG.setPort(0); + CLUSTER_CONFIG.setMaxIoBufferSizeBytes(toIntExact(DataSize.mebibytes(10).toBytes())); + + // This enables the Chunking filter, which triggers for messages > 1 MebiByte + CLUSTER_CONFIG.getMina().setSendBufferSize(toIntExact(DataSize.mebibytes(1).toBytes())); + + // Server + SERVER = CLUSTER_CONFIG.getClusterAcceptor(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + + @Override + public void messageReceived(IoSession session, Object message) { + SERVER_RECEIVED_MESSAGES.add((NetworkMessage) message); + log.trace("Received {} messages", SERVER_RECEIVED_MESSAGES.size()); + } + + @Override + public void exceptionCaught(IoSession session, Throwable cause) { + fail("Server caught an Exception", cause); + } + }, "Server"); + + } + + @BeforeEach + public void beforeEach() { + SERVER_RECEIVED_MESSAGES.clear(); + } + + @Test + void smokeTest() { + + NioSocketConnector client = CLUSTER_CONFIG.getClusterConnector(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + }, "Client"); + + try { + + ConnectFuture connect = client.connect(SERVER.getLocalAddress()); + + connect.awaitUninterruptibly(); + IoSession clientSession = connect.getSession(); + + NetworkMessage input = new TestMessage(RandomStringUtils.randomAscii(1000)); + + WriteFuture write = clientSession.write(input); + + write.awaitUninterruptibly(); + + await().atMost(1, TimeUnit.SECONDS).until(() -> !SERVER_RECEIVED_MESSAGES.isEmpty()); + assertThat(SERVER_RECEIVED_MESSAGES).containsExactlyInAnyOrder(input); + + clientSession.closeNow().awaitUninterruptibly(); + } + finally { + client.dispose(); + + } + } + + /** + * This test requires a little RAM because we hold the messages twice to compare sender and receiver payloads. + */ + @Test + void concurrentWriting(){ + final int clientCount = 20; + final int messagesPerClient = 500; + final int minMessageLength = toIntExact(DataSize.kibibytes(1).toBytes()); + final int maxMessageLength = toIntExact(DataSize.kibibytes(100).toBytes()); + + final ConcurrentLinkedQueue> messagesWritten = new ConcurrentLinkedQueue<>(); + final List> clientThreads = new ArrayList<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(10); + try { + for (int clientI = 0; clientI < clientCount; clientI++) { + final int clientNumber = clientI; + CompletableFuture clientThread = CompletableFuture.runAsync(() -> { + NioSocketConnector client = CLUSTER_CONFIG.getClusterConnector(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + + @Override + public void messageSent(IoSession session, Object message) { + log.trace("Message written: {} bytes", ((TestMessage)message).data.getBytes().length); + } + + @Override + public void exceptionCaught(IoSession session, Throwable cause) { + fail("Client[%d] caught an Exception".formatted(clientNumber), cause); + } + }, "Client"); + try { + // Connect + ConnectFuture connect = client.connect(SERVER.getLocalAddress()); + connect.awaitUninterruptibly(); + IoSession clientSession = connect.getSession(); + + for (int i = 0; i < messagesPerClient; i++) { + NetworkMessage input = new TestMessage(RandomStringUtils.randomAscii(minMessageLength, maxMessageLength)); + + WriteFuture writeFuture = clientSession.write(input); + writeFuture.addListener((f) -> { + if (!((WriteFuture) f).isWritten()) { + fail("Failed to write a message"); + } + messagesWritten.add(input); + }); + writeFuture.awaitUninterruptibly(); + } + } + finally { + client.dispose(); + } + }, executorService); + clientThreads.add(clientThread); + } + + // Wait until all clients completed writing + CompletableFuture.allOf(clientThreads.toArray(new CompletableFuture[0])).join(); + + // Wait until all messages are received + await().atMost(10,TimeUnit.SECONDS).until(() -> SERVER_RECEIVED_MESSAGES.size() == messagesWritten.size()); + + // Check that the messages are correct + assertThat(SERVER_RECEIVED_MESSAGES).containsExactlyInAnyOrderElementsOf(messagesWritten); + + } + finally { + executorService.shutdownNow(); + } + + } + + private static Stream dataSizes() { + return Stream.of( + Arguments.of(DataSize.bytes(10), true), + Arguments.of(DataSize.kibibytes(10), true), + Arguments.of(DataSize.mebibytes(9), true), // Uses chunking + Arguments.of(DataSize.mebibytes(10), false) // Is too large for jackson encoder + ); + } + + @ParameterizedTest + @MethodSource("dataSizes") + void messageSizes(DataSize dataSize, boolean shouldPass) { + NioSocketConnector client = CLUSTER_CONFIG.getClusterConnector(OM, new IoHandlerAdapter() { + @Override + public void sessionOpened(IoSession session) { + log.info("Session to {} established", session.getRemoteAddress()); + } + + @Override + public void exceptionCaught(IoSession session, Throwable cause) { + log.trace("Failed to write message (probably expected)",cause); + } + }, "Client"); + + try { + + ConnectFuture connect = client.connect(SERVER.getLocalAddress()); + + connect.awaitUninterruptibly(); + IoSession clientSession = connect.getSession(); + + NetworkMessage input = new TestMessage(RandomStringUtils.randomAscii(toIntExact(dataSize.toBytes()))); + + WriteFuture write = clientSession.write(input); + + write.awaitUninterruptibly(); + + assertThat(write.isWritten()).isEqualTo(shouldPass); + + Assertions.setMaxStackTraceElementsDisplayed(200); + if (!shouldPass) { + assertThat(write.getException()).hasCauseInstanceOf(IllegalArgumentException.class); + } + + clientSession.closeNow().awaitUninterruptibly(); + } + finally { + client.dispose(); + + } + } + + @AfterAll + public static void afterAll() { + SERVER.dispose(); + } + + public static class TestNetworkMessageContext extends NetworkMessageContext { + + public TestNetworkMessageContext(NetworkSession session) { + super(session, 0); + } + } + + @CPSType(id = "TEST_MSG", base = NetworkMessage.class) + @RequiredArgsConstructor(onConstructor_ = @JsonCreator) + @Getter + @EqualsAndHashCode(callSuper = false) + public static class TestMessage extends NetworkMessage { + + private final String data; + + @Override + public void react(TestNetworkMessageContext context) { + // Do nothing + } + } +} diff --git a/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java b/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java index 0145874c60..9b6d27cece 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/result/excel/ExcelResultRenderTest.java @@ -84,7 +84,7 @@ void writeAndRead() throws IOException { final ExcelRenderer renderer = new ExcelRenderer(new ExcelConfig(), printSettings); - renderer.renderToStream(ResultTestUtil.getIdFields(), mquery, output, OptionalLong.empty(), printSettings); + renderer.renderToStream(ResultTestUtil.getIdFields(), mquery, output, OptionalLong.empty(), printSettings, metaStorage); final InputStream inputStream = new ByteArrayInputStream(output.toByteArray()); diff --git a/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java b/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java index ec15af7650..4b1567dec0 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java +++ b/backend/src/test/java/com/bakdata/conquery/models/auth/LocalAuthRealmTest.java @@ -22,7 +22,7 @@ import org.apache.shiro.authc.BearerToken; import org.apache.shiro.authc.CredentialsException; import org.apache.shiro.authc.IncorrectCredentialsException; -import org.apache.shiro.util.LifecycleUtils; +import org.apache.shiro.lang.util.LifecycleUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java index 12e34d050f..4277c90ecc 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java @@ -16,7 +16,6 @@ import jakarta.ws.rs.client.Client; import jakarta.ws.rs.core.UriBuilder; -import com.bakdata.conquery.Conquery; import com.bakdata.conquery.commands.DistributedStandaloneCommand; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.commands.StandaloneCommand; @@ -177,12 +176,12 @@ public void waitUntilWorkDone() { if (Duration.ofNanos(System.nanoTime() - started).toSeconds() > 10) { started = System.nanoTime(); - log.warn("waiting for done work for a long time", new Exception()); + log.warn("Waiting for done work for a long time", new Exception("This Exception marks the stacktrace, to show where we are waiting.")); } } while (true); } - log.trace("all jobs finished"); + log.trace("All jobs finished"); } public UriBuilder defaultAdminURIBuilder() { @@ -230,10 +229,10 @@ public void beforeAll() throws Exception { // define server dropwizard = new DropwizardTestSupport<>(TestBootstrappingConquery.class, config, app -> { if (config.getSqlConnectorConfig().isEnabled()) { - standaloneCommand = new SqlStandaloneCommand((Conquery) app); + standaloneCommand = new SqlStandaloneCommand(); } else { - standaloneCommand = new DistributedStandaloneCommand((Conquery) app); + standaloneCommand = new DistributedStandaloneCommand(); } return (Command) standaloneCommand; }); diff --git a/frontend/src/js/model/table.ts b/frontend/src/js/model/table.ts index 8c380f597d..ac6393648d 100644 --- a/frontend/src/js/model/table.ts +++ b/frontend/src/js/model/table.ts @@ -58,7 +58,7 @@ export function tableIsIncludedInIds( tableIds: string[], ) { return tableIds.some( - (id) => table.id.toLowerCase().indexOf(id.toLowerCase()) !== -1, + (id) => table.connectorId.toLowerCase().indexOf(id.toLowerCase()) !== -1, ); }