-
Notifications
You must be signed in to change notification settings - Fork 2
/
线程同步器原理剖析
214 lines (199 loc) · 11.8 KB
/
线程同步器原理剖析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
CountDownLatch 和 CyclicBarrier 和 Semaphore 的用法,以及相互之间的差别
一、CountDownLatch:
场景举例:进行并行查询数据库,返回结果后进行计算,也就是说各个线程要全部运行完毕,才能进行下一步的计算,这时候可以用 CountDownLatch。
public class ParallelTest {
// 此处实现了固定大小的线程池,可根据需要进行其他实现,每次不再新建线程池实例
private static ExecutorService executor = Executors.newFixedThreadPool(20);
private CountDownLatch countDownLatch;
//构造方法参数为并行线程的数量,并且每次new CountDownLatch对象,因为不可重复使用
public ParallelTest(Integer nThreads) {
this.countDownLatch = new CountDownLatch(nThreads);
}
public static void main(String[] args) throws Exception {
ParallelTest simpleParallelTaskExecutor = new ParallelTest(2);
Future<Integer> integerFuture = simpleParallelTaskExecutor.addTask(() -> {
//此处模拟执行数据查询等任务
Thread.sleep(2000);
return 1;
});
Future<Integer> integerFuture1 = simpleParallelTaskExecutor.addTask(() -> {
Thread.sleep(1000);
return 2;
});
simpleParallelTaskExecutor.checkDone(3000);
Integer integer = integerFuture.get();
Integer integer1 = integerFuture1.get();
System.out.println(integer);
System.out.println(integer1);
}
// 调用此方法向线程池中添加任务,此处对 Callable 进行了简单包装,为了执行完任务调用 countDownLatch.countDown();
public <V> Future<V> addTask(Callable<V> task) {
return executor.submit(new WrapperThread<>(task, countDownLatch));
}
//可设置超时时间,检查任务是否运行完毕
public boolean checkDone(long milliseconds) throws InterruptedException {
return countDownLatch.await(milliseconds, TimeUnit.MILLISECONDS);
}
//Callable包装类,为了执行完任务调用countDownLatch.countDown();
public class WrapperThread<V> implements Callable<V> {
private Callable<? extends V> callable;
private CountDownLatch countDownLatch;
public WrapperThread(Callable callable, CountDownLatch countDownLatch) {
this.callable = callable;
this.countDownLatch = countDownLatch;
}
@Override
public V call() throws Exception {
//此处直接调用callable.call();和直接调用thread.run()类似,没有起新的线程此处和加入的任务内部是同一个线程。
V call = callable.call();
this.countDownLatch.countDown();
return call;
}
}
}
https://juejin.im/post/5cccee27e51d453a8f348bb2
允许一个或多个线程等待其他线程完成操作。
CountDownLatch如何工作?
CountDownLatch是通过维护一个计数器 cnt 来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,调用 countDown() 方法会让计数器的值就会减1。当计数器值到达0时,
它表示所有的线程已经完成了任务,那些因为调用 await() 方法而在等待的线程就会被唤醒。
API:
countDownLatch.countDown(); // 使CountDownLatch初始值N减1;
countDownLatch.await(); // 调用该方法的线程等到构造方法传入的N减到0的时候,才能继续往下执行;
await(long timeout, TimeUnit unit); // 与上面的await方法功能一致,只不过这里有了时间限制,调用该方法的线程等到指定的timeout时间后,不管N是否减至为0,都会继续往下执行;
long getCount(); // 获取当前CountDownLatch维护的值;
DEMO:
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown(); // state 为 0时,调用LockSuppor.unpark 唤醒
});
}
countDownLatch.await(); // 新建一个 SHARE 节点入队尾,调用LockSupport.park 挂起
System.out.println("end");
executorService.shutdown();
}
流程分析:
public void countDown() {
sync.releaseShared(1);// 调用的 AQS 的方法 public final boolean releaseShared(int arg) 如下
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 调用方法如下
doReleaseShared(); // 关键是这一步 这是 AQS 中的方法
return true;
}
return false;
}
CountDownLatch 内部类 Sync 中的方法:
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState(); // 获取当前state属性的值
if (c == 0) // 如果state为0,则说明当前计数器已经计数完成,直接返回
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 使用CAS算法对state进行设置
return nextc == 0; // 设置成功后返回当前是否为最后一个设置 state 的线程
}
}
private void doReleaseShared() {
for (;;) {
Node h = head; // 记录等待队列中的头结点的线程
if (h != null && h != tail) { // 头结点不为空,且头结点不等于尾节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // SIGNAL状态表示当前节点正在等待被唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 清除当前节点的等待状态
continue;
unparkSuccessor(h); // 唤醒当前节点的下一个节点
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果h还是指向头结点,说明前面这段代码执行过程中没有其他线程对头结点进行过处理
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 清除当前节点的等待状态
Node s = node.next;
if (s == null || s.waitStatus > 0) { // s的等待状态大于0说明该节点中的线程已经被外部取消等待了
s = null;
// 从队列尾部往前遍历,找到最后一个处于等待状态的节点,用s记录下来
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒离传入节点最近的处于等待状态的节点线程
}
可以看到,unparkSuccessor(Node) 方法的作用是唤醒离传入节点最近的一个处于等待状态的线程,使其继续往下执行。前面我们讲到过,等待队列中的线程可能有多个,而调用 countDown() 方法的线程只唤醒了一个处于等待状态的线程,
这里剩下的等待线程是如何被唤醒的呢?其实这些线程是被当前唤醒的线程唤醒的。
具体的我们可以看看 await()方法的具体执行过程。如下是 await()方法的代码:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await() 方法实际还是调用了 Sync 对象的方法 acquireSharedInterruptibly(int)方法,如下是该方法的具体实现:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
CountDownLatch 内部类 Sync 中的方法:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
可以看到 acquireSharedInterruptibly(int) 方法判断当前线程是否需要以共享状态获取执行权限,这里 tryAcquireShared(int) 方法是 AbstractQueuedSynchronizer 中的一个模板方法,其具体实现在前面的 Sync 类中,
可以看到,其主要是判断 state 是否为零,如果为零则返回1,表示当前线程不需要进行权限获取,可直接执行后续代码,返回 -1 则表示当前线程需要进行共享权限。
具体的获取执行权限的代码在 doAcquireSharedInterruptibly(int) 方法中,如下是该方法的具体实现:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 使用当前线程创建一个共享模式的节点
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // 获取当前节点的前一个节点
if (p == head) { // 判断前一个节点是否为头结点
int r = tryAcquireShared(arg); // 查看当前线程是否获取到了执行权限
if (r >= 0) { // 大于0表示获取了执行权限
setHeadAndPropagate(node, r); // 将当前节点设置为头结点,并且唤醒后面处于等待状态的节点
p.next = null; // help GC
failed = false;
return;
}
}
// 走到这一步说明没有获取到执行权限,就使当前线程进入“搁置”状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在 doAcquireSharedInterruptibly(int) 方法中,首先使用当前线程创建一个共享模式的节点。然后在一个 for 循环中判断当前线程是否获取到执行权限,如果有(r >= 0判断)则将当前节点设置为头节点,
并且唤醒后续处于共享模式的节点;如果没有,则对调用 shouldParkAfterFailedAcquire(Node, Node) 和 parkAndCheckInterrupt() 方法使当前线程处于“搁置”状态,该“搁置”状态是由操作系统进行的,
这样可以避免该线程无限循环而获取不到执行权限,造成资源浪费,这里也就是线程处于等待状态的位置,也就是说当线程被阻塞的时候就是阻塞在这个位置。当有多个线程调用await()方法而进入等待状态时,
这几个线程都将等待在此处。这里回过头来看前面的 countDown() 方法,其会唤醒处于等待队列中离头节点最近的一个处于等待状态的线程,也就是说该线程被唤醒之后会继续从这个位置开始往下执行,
此时执行到 tryAcquireShared(int)方法时,发现 r 大于0(因为 state 已经被置为 0 了),该线程就会调用 setHeadAndPropagate(Node, int) 方法,并且退出当前循环,也就开始执行 awat() 方法之后的代码。
这里我们看看 setHeadAndPropagate(Node, int) 方法的具体实现:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 将当前节点设置为头节点
// 检查唤醒过程是否需要往下传递,并且检查头结点的等待状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 如果下一个节点是尝试以共享状态获取获取执行权限的节点,则将其唤醒
doReleaseShared();
}
}
setHeadAndPropagate(Node, int) 方法主要作用是设置当前节点为头结点,并且将唤醒工作往下传递,在传递的过程中,其会判断被传递的节点是否是以共享模式尝试获取执行权限的,
如果不是,则传递到该节点处为止(一般情况下,等待队列中都只会都是处于共享模式或者处于独占模式的节点)。也就是说,头结点会依次唤醒后续处于共享状态的节点,这也就是共享锁与独占锁的实现方式。
这里 doReleaseShared() 方法也就是我们前面讲到的会将离头结点最近的一个处于等待状态的节点唤醒的方法。
二、CyclicBarrier
https://juejin.im/post/5ae755256fb9a07ac3634067
三、Semaphore