Skip to content

Commit

Permalink
remember to add the new files
Browse files Browse the repository at this point in the history
  • Loading branch information
stalep committed Oct 15, 2023
1 parent cd0fe6d commit bffa76b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.hyperfoil.tools.horreum.converter;

import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.lang.reflect.Type;
@ApplicationScoped
public class JsonToEventConverter implements MessageConverter {
@Override
public boolean canConvert(Message<?> in, Type target) {
return in.getMetadata(IncomingAmqpMetadata.class)
.map(meta -> meta.getContentType().equals("application/json") && target instanceof Class)
.orElse(false);
}

@Override
public Message<?> convert(Message<?> in, Type target) {
return in.withPayload(((JsonObject) in.getPayload()).mapTo((Class<?>) target));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.hyperfoil.tools.horreum.svc;

import io.quarkus.runtime.Startup;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import jakarta.ws.rs.Produces;
import org.eclipse.microprofile.context.ManagedExecutor;

@Singleton
@Startup
public class HorreumThreadProvider {
//should make these configurable
private final static int POOL_SIZE = 10;
private final static int POOL_QUEUE_SIZE = 100;

@Startup
@Singleton
@Produces
@Named("datasetServiceExecutor")
ManagedExecutor managedCustomExecutor() {
System.out.println("HorreumServiceExecutor created (should only happen once)!.");

return ManagedExecutor.builder()
.maxAsync(POOL_SIZE)
.maxQueued(POOL_QUEUE_SIZE)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.hyperfoil.tools.horreum.api.data.*;
import io.hyperfoil.tools.horreum.api.data.Extractor;
import io.hyperfoil.tools.horreum.bus.MessageBusChannels;
import io.hyperfoil.tools.horreum.entity.FingerprintDAO;
import io.hyperfoil.tools.horreum.entity.data.*;
import io.hyperfoil.tools.horreum.mapper.DataSetMapper;
import io.hyperfoil.tools.horreum.mapper.LabelMapper;
Expand All @@ -20,10 +19,8 @@
import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
import io.quarkus.panache.common.Page;
import io.quarkus.panache.common.Sort;
import io.quarkus.runtime.Startup;
import io.quarkus.security.identity.SecurityIdentity;

import io.smallrye.common.annotation.Blocking;
import jakarta.annotation.security.PermitAll;
import jakarta.annotation.security.RolesAllowed;
import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -52,10 +49,6 @@
import java.util.stream.StreamSupport;

import jakarta.ws.rs.DefaultValue;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.hibernate.Hibernate;
import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.hibernate.query.NativeQuery;
Expand Down Expand Up @@ -128,10 +121,6 @@ public URI create(URI baseURI, String segment) {
@Inject
DatasetServiceImpl datasetService;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 100)
@Channel("dataset-requests")
Emitter<DataSet.EventNew> dataSetEmitter;

@WithToken
@WithRoles
@PermitAll
Expand Down Expand Up @@ -703,8 +692,6 @@ public Integer addOrUpdateLabel(int schemaId, Label labelDTO) {
existing.persistAndFlush();

emitLabelChanged(existing);
// LabelDAO finalExisting = existing;
// mediator.executeBlocking(() -> emitLabelChanged(finalExisting));
}
return label.id;
}
Expand All @@ -728,9 +715,7 @@ private void emitLabelChanged(LabelDAO label) {
}

for(var datasetId : datasetIds) {
// datasetService.onNewDataset(testId, datasetId, label.id, true);
// datasetService.calculateLabels(testId, datasetId, label.id, true);
dataSetEmitter.send(new DataSet.EventNew(datasetId, testId, 0, label.id, true));
mediator.queueDatasetEvents(new DataSet.EventNew(datasetId, testId, 0, label.id, true));
}
}
catch (NoResultException nre) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;

@ApplicationScoped
public class ServiceMediator {
Expand Down Expand Up @@ -63,8 +66,12 @@ public class ServiceMediator {
@Inject
private SchemaServiceImpl schemaService;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 100)
@Channel("dataset-requests")
Emitter<DataSet.EventNew> dataSetEmitter;

@Inject
@NamedInstance("customServiceExecutor")
@NamedInstance("datasetServiceExecutor")
ManagedExecutor executor;

public ServiceMediator() {
Expand Down Expand Up @@ -126,13 +133,16 @@ void newChange(Change.Event event) {
@Blocking
@ActivateRequestContext
public void processDatasetEvents(DataSet.EventNew newEvent) {
// datasetService.onNewDatasetNoLock(newEvent);
executor.runAsync(() -> {
datasetService.onNewDatasetNoLock(newEvent);
validateDataset(newEvent.datasetId);
});
}

void queueDatasetEvents(DataSet.EventNew event) {
dataSetEmitter.send(event);
}

void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down

0 comments on commit bffa76b

Please sign in to comment.