Skip to content

Commit

Permalink
Merge pull request Terracotta-OSS#190 from mathieucarbou/bug_context
Browse files Browse the repository at this point in the history
Fixing context of stats and notif
  • Loading branch information
anthonydahanne authored Nov 21, 2016
2 parents 9fffda4 + d81599c commit 5839156
Showing 1 changed file with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,31 @@ public void pushBestEffortsData(long consumerId, PlatformServer sender, String n

case TOPIC_SERVER_ENTITY_NOTIFICATION: {
ContextualNotification notification = (ContextualNotification) data;
Context serverEntityContext = getServerEntity(sender.getServerName(), consumerId).getContext();
notification.setContext(notification.getContext().with(serverEntityContext));
Context notificationContext = notification.getContext();
if (notificationContext.contains(ServerEntity.CONSUMER_ID)) {
consumerId = Long.parseLong(notificationContext.get(ServerEntity.CONSUMER_ID));
}
Context context = getOptionalServerEntity(sender.getServerName(), consumerId)
.map(ServerEntity::getContext)
.orElseGet(() -> stripe.getServerByName(sender.getServerName())
.map(Server::getContext)
.orElseGet(stripe::getContext));
notification.setContext(notification.getContext().with(context));
fireNotification(notification);
break;
}

case TOPIC_SERVER_ENTITY_STATISTICS: {
ContextualStatistics[] statistics = (ContextualStatistics[]) data;
Context serverEntityContext = getServerEntity(sender.getServerName(), consumerId).getContext();
for (ContextualStatistics statistic : statistics) {
statistic.setContext(statistic.getContext().with(serverEntityContext));
Context statContext = statistic.getContext();
long cid = statContext.contains(ServerEntity.CONSUMER_ID) ? Long.parseLong(statContext.get(ServerEntity.CONSUMER_ID)) : consumerId;
Context context = getOptionalServerEntity(sender.getServerName(), cid)
.map(ServerEntity::getContext)
.orElseGet(() -> stripe.getServerByName(sender.getServerName())
.map(Server::getContext)
.orElseGet(stripe::getContext));
statistic.setContext(statistic.getContext().with(context));
}
fireStatistics(statistics);
break;
Expand Down Expand Up @@ -443,6 +457,18 @@ private synchronized ServerEntity getServerEntity(String serverName, long consum
.<IllegalStateException>orElseThrow(() -> newIllegalTopologyState("Server entity " + consumerId + " not found on server " + serverName + " within the current topology on active server " + getCurrentActive().getServerName()));
}

private synchronized Optional<ServerEntity> getOptionalServerEntity(String serverName, long consumerId) {
Map<Long, ServerEntityIdentifier> map = entities.get(serverName);
if (map == null) {
return Optional.empty();
}
ServerEntityIdentifier serverEntityIdentifier = map.get(consumerId);
if (serverEntityIdentifier == null) {
return Optional.empty();
}
return stripe.getServerByName(serverName).flatMap(server -> server.getServerEntity(serverEntityIdentifier));
}

private Server getCurrentActive() {
if (currentActive == null) {
throw newIllegalTopologyState("Current server is not active!");
Expand Down

0 comments on commit 5839156

Please sign in to comment.