diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index f9f21ca313..14f7121837 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -105,9 +105,12 @@ public void run() { Event event; try { event = eventQueue.take(); + if (LOG.isTraceEnabled()) { + LOG.trace("AsyncDispatcher taken event: {}", event); + } } catch(InterruptedException ie) { if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", ie); + LOG.warn("AsyncDispatcher thread interrupted (while taking event)", ie); } return; } @@ -140,6 +143,8 @@ public void setDrainEventsOnStop() { @Override protected void serviceStop() throws Exception { + LOG.info("AsyncDispatcher serviceStop called, drainEventsOnStop: {}, drained: {}, eventQueue size: {}", + drainEventsOnStop, drained, eventQueue.size()); if (drainEventsOnStop) { blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); @@ -148,7 +153,7 @@ protected void serviceStop() throws Exception { TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT); synchronized (waitForDrained) { - while (!drained && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { + while (!eventQueue.isEmpty() && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { waitForDrained.wait(1000); LOG.info( "Waiting for AsyncDispatcher to drain. Current queue size: {}, handler thread state: {}", @@ -364,9 +369,12 @@ public void handle(Event event) { } try { eventQueue.put(event); + if (LOG.isTraceEnabled()) { + LOG.trace("AsyncDispatcher put event: {}", event); + } } catch (InterruptedException e) { if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", e); + LOG.warn("AsyncDispatcher thread interrupted (while putting event): {}", event, e); } throw new YarnRuntimeException(e); }