Skip to content

Commit

Permalink
Merge pull request #294 from scalecube/update/enhance-logging
Browse files Browse the repository at this point in the history
Enhanced logging
  • Loading branch information
artem-v authored Jan 20, 2020
2 parents d1cfcd9 + a022c30 commit a7fba99
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public OutboundSettings outboundSettings(Address destination) {
public void outboundSettings(Address destination, int lossPercent, int meanDelay) {
OutboundSettings settings = new OutboundSettings(lossPercent, meanDelay);
outboundSettings.put(destination, settings);
LOGGER.debug("Set outbound settings {} from {} to {}", settings, address, destination);
LOGGER.debug("[{}] Set outbound settings {} to {}", address, settings, destination);
}

/**
Expand All @@ -81,21 +81,21 @@ public void outboundSettings(Address destination, int lossPercent, int meanDelay
*/
public void setDefaultOutboundSettings(int lossPercent, int meanDelay) {
defaultOutboundSettings = new OutboundSettings(lossPercent, meanDelay);
LOGGER.debug("Set default outbound settings {} for {}", defaultOutboundSettings, address);
LOGGER.debug("[{}] Set default outbound settings {}", address, defaultOutboundSettings);
}

/** Blocks outbound messages to all destinations. */
public void blockAllOutbound() {
outboundSettings.clear();
setDefaultOutboundSettings(100, 0);
LOGGER.debug("Blocked outbound from {} to all destinations", address);
LOGGER.debug("[{}] Blocked outbound to all destinations", address);
}

/** Unblocks outbound messages to all destinations. */
public void unblockAllOutbound() {
outboundSettings.clear();
setDefaultOutboundSettings(0, 0);
LOGGER.debug("Unblocked outbound from {} to all destinations", address);
LOGGER.debug("[{}] Unblocked outbound to all destinations", address);
}

/**
Expand All @@ -116,7 +116,7 @@ public void blockOutbound(Collection<Address> destinations) {
for (Address destination : destinations) {
outboundSettings.put(destination, new OutboundSettings(100, 0));
}
LOGGER.debug("Blocked outbound from {} to {}", address, destinations);
LOGGER.debug("[{}] Blocked outbound to {}", address, destinations);
}

/**
Expand All @@ -135,7 +135,7 @@ public void unblockOutbound(Address... destinations) {
*/
public void unblockOutbound(Collection<Address> destinations) {
destinations.forEach(outboundSettings::remove);
LOGGER.debug("Unblocked outbound from {} to {}", address, destinations);
LOGGER.debug("[{}] Unblocked outbound {}", address, destinations);
}

/**
Expand Down Expand Up @@ -221,7 +221,7 @@ public InboundSettings inboundSettings(Address destination) {
public void inboundSettings(Address destination, boolean shallPass) {
InboundSettings settings = new InboundSettings(shallPass);
inboundSettings.put(destination, settings);
LOGGER.debug("Set inbound settings {} from {} to {}", settings, address, destination);
LOGGER.debug("[{}] Set inbound settings {} to {}", address, settings, destination);
}

/**
Expand All @@ -231,21 +231,21 @@ public void inboundSettings(Address destination, boolean shallPass) {
*/
public void setDefaultInboundSettings(boolean shallPass) {
defaultInboundSettings = new InboundSettings(shallPass);
LOGGER.debug("Set default inbound settings {} for {}", defaultInboundSettings, address);
LOGGER.debug("[{}] Set default inbound settings {}", address, defaultInboundSettings);
}

/** Blocks inbound messages from all destinations. */
public void blockAllInbound() {
inboundSettings.clear();
setDefaultInboundSettings(false);
LOGGER.debug("Blocked inbound to {} from all destinations", address);
LOGGER.debug("[{}] Blocked inbound from all destinations", address);
}

/** Unblocks inbound messages to all destinations. */
public void unblockAllInbound() {
inboundSettings.clear();
setDefaultInboundSettings(true);
LOGGER.debug("Unblocked inbound to {} from all destinations", address);
LOGGER.debug("[{}] Unblocked inbound from all destinations", address);
}

/**
Expand All @@ -266,7 +266,7 @@ public void blockInbound(Collection<Address> destinations) {
for (Address destination : destinations) {
inboundSettings.put(destination, new InboundSettings(false));
}
LOGGER.debug("Blocked inbound to {} from {}", address, destinations);
LOGGER.debug("[{}] Blocked inbound from {}", address, destinations);
}

/**
Expand All @@ -285,7 +285,7 @@ public void unblockInbound(Address... destinations) {
*/
public void unblockInbound(Collection<Address> destinations) {
destinations.forEach(inboundSettings::remove);
LOGGER.debug("Unblocked inbound to {} from {}", address, destinations);
LOGGER.debug("[{}] Unblocked inbound from {}", address, destinations);
}

/**
Expand Down
31 changes: 13 additions & 18 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,15 @@ private void initLifecycle() {
.then(doStart())
.doOnSuccess(avoid -> onStart.onComplete())
.doOnError(onStart::onError)
.subscribe(
null,
th -> {
LOGGER.error("Cluster member {} failed on start: ", localMember, th);
shutdown.onComplete();
});
.subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th));

shutdown //
shutdown
.then(doShutdown())
.doFinally(s -> onShutdown.onComplete())
.subscribe();
.subscribe(
null,
th ->
LOGGER.warn("[{}][doShutdown] Exception occurred: {}", localMember, th.toString()));
}

/**
Expand Down Expand Up @@ -352,7 +350,7 @@ private ObjectInstance startJmxMonitor0() throws Exception {
}

private void onError(Throwable th) {
LOGGER.error("Received unexpected error: ", th);
LOGGER.error("[{}] Received unexpected error:", localMember, th);
}

private Flux<Message> listenMessage() {
Expand Down Expand Up @@ -481,28 +479,25 @@ public void shutdown() {
private Mono<Void> doShutdown() {
return Mono.defer(
() -> {
LOGGER.info("Cluster member {} is shutting down", localMember);
LOGGER.debug("[{}] Cluster member is shutting down", localMember);
return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop())
.then()
.doFinally(s -> scheduler.dispose())
.doOnSuccess(
avoid -> LOGGER.info("Cluster member {} has been shut down", localMember))
.doOnError(
th ->
LOGGER.warn(
"Cluster member {} failed on shutdown: {}", localMember, th.toString()));
avoid -> LOGGER.debug("[{}] Cluster member has been shutdown", localMember));
});
}

private Mono<Void> leaveCluster() {
return membership
.leaveCluster()
.subscribeOn(scheduler)
.doOnSuccess(s -> LOGGER.debug("Cluster member {} has left a cluster", localMember))
.doOnSubscribe(s -> LOGGER.debug("[{}] Cluster member is leaving a cluster", localMember))
.doOnSuccess(s -> LOGGER.debug("[{}] Cluster member has left a cluster", localMember))
.doOnError(
ex ->
LOGGER.info(
"Cluster member {} failed on leaveCluster: {}", localMember, ex.toString()))
LOGGER.warn(
"[{}][leaveCluster] Exception occurred: {}", localMember, ex.toString()))
.then();
}

Expand Down
Loading

0 comments on commit a7fba99

Please sign in to comment.