Skip to content

Commit

Permalink
enhanced health added ability to mark a service queue as down
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Hightower committed Jul 9, 2015
1 parent 9338ae3 commit 57d2dc2
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.advantageous.qbit.service.health;

import io.advantageous.qbit.annotation.Service;
import io.advantageous.qbit.queue.QueueCallBackHandler;
import io.advantageous.qbit.service.ServiceContext;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.util.Timer;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -49,7 +49,9 @@ private void check() {
if (duration > checkInIntervalMS) {
lastCheckTime = now;

boolean failing = ServiceContext.serviceContext().currentService().failing();
ServiceQueue serviceQueue = ServiceContext.serviceContext().currentService();

boolean failing = serviceQueue.failing();

if (!failing) {
healthServiceAsync.checkInOk(serviceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import static io.advantageous.qbit.QBit.factory;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;

Expand Down Expand Up @@ -103,7 +101,6 @@ public class BaseServiceQueueImpl implements ServiceQueue {
private Transformer<Response<Object>, Response> responseObjectTransformer = new NoOpResponseTransformer();
private final CallbackManager callbackManager;
private final QueueCallBackHandler queueCallBackHandler;
private final AtomicBoolean failing = new AtomicBoolean();

public BaseServiceQueueImpl(final String rootAddress,
final String serviceAddress,
Expand Down Expand Up @@ -402,13 +399,15 @@ private void start(final ServiceMethodHandler serviceMethodHandler,

@Override
public void init() {

serviceThreadLocal.set(BaseServiceQueueImpl.this);
queueCallBackHandler.queueInit();
serviceMethodHandler.init();
serviceThreadLocal.set(null);
}

@Override
public void receive(MethodCall<Object> methodCall) {

queueCallBackHandler.beforeReceiveCalled();
doHandleMethodCall(methodCall, serviceMethodHandler);
queueCallBackHandler.afterReceiveCalled();
Expand All @@ -423,6 +422,7 @@ public void empty() {

@Override
public void startBatch() {
serviceThreadLocal.set(BaseServiceQueueImpl.this);
serviceMethodHandler.startBatch();
queueCallBackHandler.queueStartBatch();
}
Expand All @@ -436,9 +436,13 @@ public void limit() {

@Override
public void shutdown() {

serviceThreadLocal.set(BaseServiceQueueImpl.this);
handle();
serviceMethodHandler.shutdown();
queueCallBackHandler.queueShutdown();

serviceThreadLocal.set(null);
}

@Override
Expand All @@ -449,6 +453,7 @@ public void idle() {
if (callbackManager!=null) {
callbackManager.process(0);
}
serviceThreadLocal.set(null);
}


Expand Down Expand Up @@ -578,6 +583,7 @@ public void flush() {
}


private AtomicBoolean failing = new AtomicBoolean();
@Override
public boolean failing() {
return failing.get();
Expand Down

0 comments on commit 57d2dc2

Please sign in to comment.