Skip to content

Commit affbaa8

Browse files
authored
Fix ConsumeDriver running status (#748)
1 parent 3549537 commit affbaa8

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Release Notes.
2828
* Add Caffeine plugin as optional.
2929
* Add Undertow 2.1.7.final+ worker thread pool metrics.
3030
* Support for tracking in spring gateway versions 4.1.2 and above.
31+
* Fix `ConsumeDriver` running status concurrency issues.
3132

3233
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/222?closed=1)
3334

apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* Pool of consumers <p> Created by wusheng on 2016/10/25.
2828
*/
2929
public class ConsumeDriver<T> implements IDriver {
30-
private boolean running;
30+
private volatile boolean running;
3131
private ConsumerThread[] consumerThreads;
3232
private Channels<T> channels;
3333
private ReentrantLock lock;
@@ -88,6 +88,9 @@ public void begin(Channels channels) {
8888
}
8989
lock.lock();
9090
try {
91+
if (running) {
92+
return;
93+
}
9194
this.allocateBuffer2Thread();
9295
for (ConsumerThread consumerThread : consumerThreads) {
9396
consumerThread.start();
@@ -124,8 +127,14 @@ private void allocateBuffer2Thread() {
124127

125128
@Override
126129
public void close(Channels channels) {
130+
if (!running) {
131+
return;
132+
}
127133
lock.lock();
128134
try {
135+
if (!running) {
136+
return;
137+
}
129138
this.running = false;
130139
for (ConsumerThread consumerThread : consumerThreads) {
131140
consumerThread.shutdown();

0 commit comments

Comments
 (0)