Skip to content

Commit

Permalink
Merge pull request #3612 from ingef/reintegrate-main
Browse files Browse the repository at this point in the history
Reintegrate Main
  • Loading branch information
awildturtok authored Nov 4, 2024
2 parents 7b9bd0f + 827f73f commit 14a13ab
Show file tree
Hide file tree
Showing 31 changed files with 483 additions and 383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -40,11 +41,16 @@ public WriteFuture send(final NetworkMessage<?> message) {
}
}
catch (InterruptedException e) {
log.error("Unexpected interruption", e);
return send(message);
log.error("Unexpected interruption, while trying to queue: {}", message, e);
return DefaultWriteFuture.newNotWrittenFuture(session, e);
}
WriteFuture future = session.write(message);
future.addListener(f -> queuedMessages.remove(message));
future.addListener(f -> {
if(f instanceof WriteFuture writeFuture && ! writeFuture.isWritten()) {
log.error("Could not write message: {}", message, writeFuture.getException());
}
queuedMessages.remove(message);
});

return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,40 @@ public void sessionCreated(IoSession session) {

@Override
public void sessionOpened(IoSession session) {
final NetworkSession networkSession = new NetworkSession(session);
NetworkSession networkSession = new NetworkSession(session);

context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment);
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());
// Schedule ShardNode and Worker registration, so we don't block this thread which does the actual sending
scheduler.schedule(() -> {
context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment);
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());

// Authenticate with ManagerNode
context.send(new AddShardNode());
// Authenticate with ManagerNode
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(networkSession);
final WorkerInformation info = w.getInfo();
for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}
}, 0, TimeUnit.SECONDS);

log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}

scheduleIdleLogger(scheduler, session, config.getCluster().getIdleTimeOut());
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
}

@Override
public void sessionClosed(IoSession session) {
log.info("Disconnected from ManagerNode.");
Expand All @@ -96,56 +111,6 @@ public void sessionClosed(IoSession session) {
}
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}


@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
final ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}


@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}


@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}


@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

private void connectToCluster() {
final InetSocketAddress address = new InetSocketAddress(
config.getCluster().getManagerURL().getHostAddress(),
Expand Down Expand Up @@ -184,7 +149,6 @@ private void connectToCluster() {
}
}


private void disconnectFromCluster() {
if (future != null) {
future.cancel();
Expand All @@ -201,7 +165,6 @@ private void disconnectFromCluster() {
}
}


@NotNull
private NioSocketConnector getClusterConnector(ShardWorkers workers) {
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper();
Expand All @@ -216,17 +179,50 @@ private NioSocketConnector getClusterConnector(ShardWorkers workers) {
return connector;
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}

@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}

@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}

@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}

@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,12 @@ public void updateStorage() {

@Override
public GroupId createId() {
GroupId groupId = new GroupId(name);
groupId.setMetaStorage(getMetaStorage());
return groupId;
return new GroupId(name);
}

public synchronized void removeMember(User user) {
if (members.remove(user.getId())) {
log.trace("Removed user {} from group {}", user.getId(), getId());
public synchronized void removeMember(UserId user) {
if (members.remove(user)) {
log.trace("Removed user {} from group {}", user, getId());
updateStorage();
}
}
Expand All @@ -86,9 +84,9 @@ public synchronized void addRole(Role role) {
}
}

public synchronized void removeRole(Role role) {
if (roles.remove(role.getId())) {
log.trace("Removed role {} from group {}", role.getId(), getId());
public synchronized void removeRole(RoleId role) {
if (roles.remove(role)) {
log.trace("Removed role {} from group {}", role, getId());
updateStorage();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@

import java.util.Set;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.auth.permissions.ConqueryPermission;
import com.bakdata.conquery.models.identifiable.ids.specific.RoleId;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;

public interface RoleOwner {

void addRole(Role role);

void removeRole(Role role);
void removeRole(RoleId role);

/**
* Return a copy of the roles hold by the owner.
Expand Down
Loading

0 comments on commit 14a13ab

Please sign in to comment.