Skip to content

Commit

Permalink
does not issue messages on mina process thread, adds logging
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Oct 28, 2024
1 parent aee3231 commit c636dd8
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -40,11 +41,16 @@ public WriteFuture send(final NetworkMessage<?> message) {
}
}
catch (InterruptedException e) {
log.error("Unexpected interruption", e);
return send(message);
log.error("Unexpected interruption, while trying to queue: {}", message, e);
return DefaultWriteFuture.newNotWrittenFuture(session, e);
}
WriteFuture future = session.write(message);
future.addListener(f -> queuedMessages.remove(message));
future.addListener(f -> {
if(f instanceof WriteFuture writeFuture && ! writeFuture.isWritten()) {
log.error("Could not write message: {}", message, writeFuture.getException());
}
queuedMessages.remove(message);
});

return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,39 @@ public void sessionCreated(IoSession session) {
public void sessionOpened(IoSession session) {
NetworkSession networkSession = new NetworkSession(session);

context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment.getValidator());
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());
// Schedule ShardNode and Worker registration, so we don't block this thread which does the actual sending
scheduler.schedule(() -> {
context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment.getValidator());
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());

// Authenticate with ManagerNode
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}
}, 0, TimeUnit.SECONDS);

// Authenticate with ManagerNode
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}

scheduleIdleLogger(scheduler, session, config.getCluster().getIdleTimeOut());
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
}

@Override
public void sessionClosed(IoSession session) {
log.info("Disconnected from ManagerNode.");
Expand All @@ -96,56 +113,6 @@ public void sessionClosed(IoSession session) {
}
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}


@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}


@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}


@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}


@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

private void connectToCluster() {
InetSocketAddress address = new InetSocketAddress(
config.getCluster().getManagerURL().getHostAddress(),
Expand Down Expand Up @@ -184,7 +151,6 @@ private void connectToCluster() {
}
}


private void disconnectFromCluster() {
if (future != null) {
future.cancel();
Expand All @@ -201,7 +167,6 @@ private void disconnectFromCluster() {
}
}


@NotNull
private NioSocketConnector getClusterConnector(IdResolveContext workers) {
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper();
Expand All @@ -216,19 +181,65 @@ private NioSocketConnector getClusterConnector(IdResolveContext workers) {
return connector;
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}

@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}

@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}

@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}

@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

@Override
public void start() throws Exception {


jobManager = new JobManager(environment.getName(), config.isFailOnError());

scheduler = environment.lifecycle().scheduledExecutorService("cluster-connection-shard").build();
// Connect async as the manager might not be up jet or is started by a test in succession
scheduler.schedule(this::connectToCluster, 0, TimeUnit.MINUTES);

scheduler.scheduleAtFixedRate(this::reportJobManagerStatus, 30, 1, TimeUnit.SECONDS);

}

private void reportJobManagerStatus() {
if (context == null || !context.isConnected()) {
Expand Down Expand Up @@ -257,20 +268,6 @@ private void reportJobManagerStatus() {
}
}

@Override
public void start() throws Exception {


jobManager = new JobManager(environment.getName(), config.isFailOnError());

scheduler = environment.lifecycle().scheduledExecutorService("cluster-connection-shard").build();
// Connect async as the manager might not be up jet or is started by a test in succession
scheduler.schedule(this::connectToCluster, 0, TimeUnit.MINUTES);

scheduler.scheduleAtFixedRate(this::reportJobManagerStatus, 30, 1, TimeUnit.SECONDS);

}

public boolean isBusy() {
return jobManager.isSlowWorkerBusy();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.bakdata.conquery.command;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.core.GenericType;
import jakarta.ws.rs.core.Response;
Expand Down Expand Up @@ -42,18 +44,16 @@ public class DistributedCommandsTest {
this.getCluster().setConnectRetryTimeout(CONNECT_RETRY_TIMEOUT);
this.setStorage(new NonPersistentStoreFactory());
}};

private static final DropwizardAppExtension<ConqueryConfig> MANAGER = new DropwizardAppExtension<>(
Conquery.class,
CONQUERY_CONFIG_MANAGER,
ServerCommand::new
);

private static final DropwizardAppExtension<ConqueryConfig> SHARD = new DropwizardAppExtension<>(
Conquery.class,
CONQUERY_CONFIG_SHARD,
application -> new ShardCommand()
);
private static final DropwizardAppExtension<ConqueryConfig> MANAGER = new DropwizardAppExtension<>(
Conquery.class,
CONQUERY_CONFIG_MANAGER,
ServerCommand::new
);

@Test
@Order(0)
Expand Down Expand Up @@ -83,27 +83,28 @@ void checkHttpUpManager() {

@Test
@Order(1)
void clusterEstablished() throws InterruptedException {
void clusterEstablished() {
Client client = MANAGER.client();

// Wait for Shard to be connected
// May use https://github.com/awaitility/awaitility here in the future
Thread.sleep(2 * CONNECT_RETRY_TIMEOUT.toMilliseconds());
await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
Response response = client.target(
String.format("http://localhost:%d/healthcheck", MANAGER.getAdminPort()))
.request()
.get();

Response response = client.target(
String.format("http://localhost:%d/healthcheck", MANAGER.getAdminPort()))
.request()
.get();


assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getStatus()).isEqualTo(200);

Map<String, GenericHealthCheckResult> healthCheck = response.readEntity(new GenericType<>() {
});

Map<String, GenericHealthCheckResult> healthCheck = response.readEntity(new GenericType<>() {
assertThat(healthCheck).containsKey("cluster");
assertThat(healthCheck.get("cluster").healthy).isTrue();
assertThat(healthCheck.get("cluster").getMessage()).isEqualTo(String.format(ClusterHealthCheck.HEALTHY_MESSAGE_FMT, 1));
});

assertThat(healthCheck).containsKey("cluster");
assertThat(healthCheck.get("cluster").healthy).isTrue();
assertThat(healthCheck.get("cluster").getMessage()).isEqualTo(String.format(ClusterHealthCheck.HEALTHY_MESSAGE_FMT, 1));
}

@Data
Expand Down

0 comments on commit c636dd8

Please sign in to comment.