Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Release #3581

Merged
merged 35 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
49a3f54
Bump org.codehaus.mojo:flatten-maven-plugin from 1.4.0 to 1.6.0
dependabot[bot] Feb 12, 2024
10499bd
Bump com.zaxxer:HikariCP from 5.0.1 to 5.1.0
dependabot[bot] May 23, 2024
eee0d50
Merge pull request #3545 from ingef/fix/reintegrate-action-shallow-ch…
thoniTUB Sep 2, 2024
366b048
Merge branch 'develop' into reintegrate-main
thoniTUB Sep 2, 2024
6df68b5
Merge pull request #3547 from ingef/reintegrate-main
thoniTUB Sep 2, 2024
583b990
Don't render folder if no children match
Kadrian Sep 6, 2024
5d0faf9
refactor concept search and concept tree list
Kadrian Sep 6, 2024
e871d72
Merge pull request #3549 from ingef/fix-search-for-nested-struct-nodes
Kadrian Sep 6, 2024
cba386c
Merge branch 'develop' into update-concept-tree-node-styles
Kadrian Sep 6, 2024
7254523
Merge pull request #3550 from ingef/update-concept-tree-node-styles
Kadrian Sep 7, 2024
758f6bb
Bump org.apache.logging.log4j:log4j-to-slf4j from 2.19.0 to 2.24.0
dependabot[bot] Sep 9, 2024
43a3616
fixes skipping of structure node when not roots are set and sets pare…
thoniTUB Sep 9, 2024
aee7fe9
Merge branch 'develop' into fix/structure-node-skip
thoniTUB Sep 9, 2024
6b24411
Merge pull request #3551 from ingef/dependabot/maven/org.apache.loggi…
thoniTUB Sep 9, 2024
52a3668
Merge pull request #3552 from ingef/fix/structure-node-skip
thoniTUB Sep 9, 2024
3235ec0
Bump org.testcontainers:postgresql from 1.17.6 to 1.20.1
dependabot[bot] Sep 10, 2024
081358a
Merge pull request #3553 from ingef/dependabot/maven/org.testcontaine…
thoniTUB Sep 17, 2024
d760e58
fix casting date to integer not long
awildturtok Sep 19, 2024
d6d6352
Merge pull request #3567 from ingef/fix/localdate-cast
awildturtok Sep 19, 2024
9e54f8f
Merge branch 'develop' into reintegrate-main
thoniTUB Sep 19, 2024
3a1dd53
fixes multithreaded access of NumberFormat causing issues
awildturtok Sep 19, 2024
6953fea
fixes copy paste error for money printing
awildturtok Sep 19, 2024
e76a4c6
fixes non breaking space comparison with Excel
awildturtok Sep 23, 2024
1ca9216
Merge pull request #3569 from ingef/fix/DecimalFormat-threadsafety
awildturtok Sep 23, 2024
7bff5bd
Merge pull request #3411 from ingef/dependabot/maven/com.zaxxer-Hikar…
awildturtok Sep 23, 2024
c4584b3
Merge pull request #3268 from ingef/dependabot/maven/org.codehaus.moj…
awildturtok Sep 23, 2024
ee0fc52
Merge branch 'develop' into reintegrate-main
thoniTUB Sep 25, 2024
17e01ed
Merge pull request #3561 from ingef/reintegrate-main
github-actions[bot] Sep 25, 2024
0af5056
Merge branch 'develop' into reintegrate-main
thoniTUB Sep 25, 2024
3aa7b83
Merge pull request #3575 from ingef/reintegrate-main
awildturtok Sep 25, 2024
99446aa
Merge branch 'develop' into reintegrate-main
thoniTUB Sep 30, 2024
4a10e08
Merge pull request #3577 from ingef/reintegrate-main
awildturtok Sep 30, 2024
a195f81
show warning in ui if user or role could not be resolved
thoniTUB Oct 7, 2024
aee3231
Merge pull request #3593 from ingef/fix/resolve-render-bug
thoniTUB Oct 16, 2024
c636dd8
does not issue messages on mina process thread, adds logging
thoniTUB Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.19.0</version>
<version>2.24.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
Expand Down Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
Expand All @@ -367,7 +367,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.17.6</version>
<version>1.20.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -40,11 +41,16 @@ public WriteFuture send(final NetworkMessage<?> message) {
}
}
catch (InterruptedException e) {
log.error("Unexpected interruption", e);
return send(message);
log.error("Unexpected interruption, while trying to queue: {}", message, e);
return DefaultWriteFuture.newNotWrittenFuture(session, e);
}
WriteFuture future = session.write(message);
future.addListener(f -> queuedMessages.remove(message));
future.addListener(f -> {
if(f instanceof WriteFuture writeFuture && ! writeFuture.isWritten()) {
log.error("Could not write message: {}", message, writeFuture.getException());
}
queuedMessages.remove(message);
});

return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,39 @@ public void sessionCreated(IoSession session) {
public void sessionOpened(IoSession session) {
NetworkSession networkSession = new NetworkSession(session);

context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment.getValidator());
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());
// Schedule ShardNode and Worker registration, so we don't block this thread which does the actual sending
scheduler.schedule(() -> {
context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment.getValidator());
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());

// Authenticate with ManagerNode
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}
}, 0, TimeUnit.SECONDS);

// Authenticate with ManagerNode
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}

scheduleIdleLogger(scheduler, session, config.getCluster().getIdleTimeOut());
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
}

@Override
public void sessionClosed(IoSession session) {
log.info("Disconnected from ManagerNode.");
Expand All @@ -96,56 +113,6 @@ public void sessionClosed(IoSession session) {
}
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}


@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}


@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}


@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}


@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

private void connectToCluster() {
InetSocketAddress address = new InetSocketAddress(
config.getCluster().getManagerURL().getHostAddress(),
Expand Down Expand Up @@ -184,7 +151,6 @@ private void connectToCluster() {
}
}


private void disconnectFromCluster() {
if (future != null) {
future.cancel();
Expand All @@ -201,7 +167,6 @@ private void disconnectFromCluster() {
}
}


@NotNull
private NioSocketConnector getClusterConnector(IdResolveContext workers) {
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper();
Expand All @@ -216,19 +181,65 @@ private NioSocketConnector getClusterConnector(IdResolveContext workers) {
return connector;
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}

@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}

@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}

@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}

@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

@Override
public void start() throws Exception {


jobManager = new JobManager(environment.getName(), config.isFailOnError());

scheduler = environment.lifecycle().scheduledExecutorService("cluster-connection-shard").build();
// Connect async as the manager might not be up jet or is started by a test in succession
scheduler.schedule(this::connectToCluster, 0, TimeUnit.MINUTES);

scheduler.scheduleAtFixedRate(this::reportJobManagerStatus, 30, 1, TimeUnit.SECONDS);

}

private void reportJobManagerStatus() {
if (context == null || !context.isConnected()) {
Expand Down Expand Up @@ -257,20 +268,6 @@ private void reportJobManagerStatus() {
}
}

@Override
public void start() throws Exception {


jobManager = new JobManager(environment.getName(), config.isFailOnError());

scheduler = environment.lifecycle().scheduledExecutorService("cluster-connection-shard").build();
// Connect async as the manager might not be up jet or is started by a test in succession
scheduler.schedule(this::connectToCluster, 0, TimeUnit.MINUTES);

scheduler.scheduleAtFixedRate(this::reportJobManagerStatus, 30, 1, TimeUnit.SECONDS);

}

public boolean isBusy() {
return jobManager.isSlowWorkerBusy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public Set<ConqueryPermission> getEffectivePermissions() {
return permissions;
}

public synchronized void addMember(User user) {
if (members.add(user.getId())) {
log.trace("Added user {} to group {}", user.getId(), getId());
updateStorage();
}
}

@Override
public void updateStorage() {
storage.updateGroup(this);
Expand All @@ -55,16 +62,9 @@ public GroupId createId() {
return new GroupId(name);
}

public synchronized void addMember(User user) {
if (members.add(user.getId())) {
log.trace("Added user {} to group {}", user.getId(), getId());
updateStorage();
}
}

public synchronized void removeMember(User user) {
if (members.remove(user.getId())) {
log.trace("Removed user {} from group {}", user.getId(), getId());
public synchronized void removeMember(UserId user) {
if (members.remove(user)) {
log.trace("Removed user {} from group {}", user, getId());
updateStorage();
}
}
Expand All @@ -84,9 +84,9 @@ public synchronized void addRole(Role role) {
}
}

public synchronized void removeRole(Role role) {
if (roles.remove(role.getId())) {
log.trace("Removed role {} from group {}", role.getId(), getId());
public synchronized void removeRole(RoleId role) {
if (roles.remove(role)) {
log.trace("Removed role {} from group {}", role, getId());
updateStorage();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@

import java.util.Set;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.auth.permissions.ConqueryPermission;
import com.bakdata.conquery.models.identifiable.ids.specific.RoleId;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;

public interface RoleOwner {

void addRole(Role role);

void removeRole(Role role);
void removeRole(RoleId role);

/**
* Return a copy of the roles hold by the owner.
Expand Down
Loading
Loading