进程和线程基础教程见进程和线程
即使是单核处理器也支持多线程执行代码,CPU通过给每个线程分配CPU时间片来实现 这个机制。时间片是CPU分配给各个线程的时间,因为时间片非常短,所以CPU通过不停地切 换线程执行,让我们感觉多个线程是同时执行的,时间片一般是几十毫秒(ms)。
CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个 任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再加载这 个任务的状态。所以任务从保存到再加载的过程就是一次上下文切换。
减少上下文切换的方法有无锁并发编程、CAS算法、使用最少线程和使用协程。
- 无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一 些办法来避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程处理不同段的数据。
- CAS算法。Java的Atomic包使用CAS算法来更新数据,而不需要加锁。
- 使用最少线程。避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态。
- 协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。
死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那他们都将无法推进下去,如果系统资源充足,线程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
如果此时有一个线程A,按照先锁a再获得锁b的的顺序获得锁,而在此同时又有另外一个线程B,按照先锁b再锁a的顺序获得锁。
/**
* @author wardseptember
* @create 2020-09-22 19:44
*/
class HoldLockDemo implements Runnable {
String lockA, lockB;
public HoldLockDemo(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName()+"\t 获得"+lockA+"\t 尝试获得"+lockB);
synchronized (lockB) {
}
}
}
}
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new HoldLockDemo(lockA, lockB), "Thread A").start();
new Thread(new HoldLockDemo(lockB, lockA), "Thread B").start();
}
}
jps -l # 查看DeadLockDemo进程号
jstack 进程号 # 查看异常信息
- 避免一个线程同时获取多个锁。
- 避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源。
- 尝试使用定时锁,使用lock.tryLock(timeout)来替代使用内部锁机制。
线程安全定义:
当多个线程访问同一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替运行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获取正确的结果,那这个对象是线程安全的。
出现线程安全的问题一般是因为主内存和工作内存数据不一致性和重排序导致的,而解决线程安全的问题最重要的就是理解这两种问题是怎么来的,那么,理解它们的核心在于理解java内存模型(JMM)。
Java Memory Model(JMM)是Java内存模型,本身是一种抽象的概念并不真实存在,它描述的是一组规则或者规范,通过这组规范定义了程序中各个变量(包括实例字段、静态字段和构成数组对象的元素)的访问方式。
JMM关于同步的规定:
- 线程解锁前,必须把共享变量的值刷新回主内存
- 线程加锁前,必须读取主内存的最新值到自己的工作内存
- 加锁解锁是同一把锁
- JMM抽象结构模型
我们知道CPU的处理速度和主存的读写速度不是一个量级的,为了平衡这种巨大的差距,每个CPU都会有缓存。因此,共享变量会先放在主存中,每个线程都有属于自己的工作内存,并且会把位于主存中的共享变量拷贝到自己的工作内存,之后的读写操作均使用位于工作内存的变量副本,并在某个时刻将工作内存的变量副本写回到主存中去。JMM就从抽象层次定义了这种方式,并且JMM决定了一个线程对共享变量的写入何时对其他线程是可见的。
- 重排序
在不改变程序执行结果的前提下,尽可能提高并行度。JMM对底层尽量减少约束,使其能够发挥自身优势。因此,在执行程序时,为了提高性能,编译器和处理器常常会对指令进行重排序。一般重排序可以分为以下三种:
-
编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序;
-
指令级并行的重排序。现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序;
-
内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行的。
1属于编译器重排序,而2和3统称为处理器重排序。这些重排序会导致线程安全的问题,一个很经典的例子就是DCL问题,这个在以后的文章中会具体去聊。针对编译器重排序,JMM的编译器重排序规则会禁止一些特定类型的编译器重排序;针对处理器重排序,编译器在生成指令序列的时候会通过插入内存屏障指令来禁止某些特殊的处理器重排序。
如果两个操作访问同一个变量,且这两个操作有一个为写操作,此时这两个操作就存在数据依赖性这里就存在三种情况:1. 读后写;2.写后写;3. 写后读,者三种操作都是存在数据依赖性的,如果重排序会对最终执行结果会存在影响。编译器和处理器在重排序时,会遵守数据依赖性,编译器和处理器不会改变存在数据依赖性关系的两个操作的执行顺序
- as-if-serial
as-if-serial语义的意思是:不管怎么重排序(编译器和处理器为了提供并行度),(单线程)程序的执行结果不能被改变。
- Happens-before
happens-before的概念来指定两个操作之间的执行顺序。由于这两个操作可以在一个线程之内,也可以是在不同线程之间。因此,JMM可以通过happens-before关系向程序员提供跨线程的内存可见性保证(如果A线程的写操作a与B线程的读操作b之间存在happens-before关系,尽管a操作和b操作在不同的线程中执行,但JMM向程序员保证a操作将对b操作可见)。具体的定义为:
1)如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
2)两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么这种重排序并不非法(也就是说,JMM允许这种重排序)。
上面的1)是JMM对程序员的承诺。从程序员的角度来说,可以这样理解happens-before关系:如果A happens-before B,那么Java内存模型将向程序员保证——A操作的结果将对B可见,且A的执行顺序排在B之前。注意,这只是Java内存模型向程序员做出的保证!
上面的2)是JMM对编译器和处理器重排序的约束原则。正如前面所言,JMM其实是在遵循一个基本原则:只要不改变程序的执行结果(指的是单线程程序和正确同步的多线程程序),编译器和处理器怎么优化都行。JMM这么做的原因是:程序员对于这两个操作是否真的被重排序并不关心,程序员关心的是程序执行时的语义不能被改变(即执行结果不能被改变)。因此,happens-before关系本质上和as-if-serial语义是一回事。
- as-if-serial VS happens-before
-
as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同步的多线程程序的执行结果不被改变。
-
as-if-serial语义给编写单线程程序的程序员创造了一个幻境:单线程程序是按程序的顺序来执行的。happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:正确同步的多线程程序是按happens-before指定的顺序来执行的。
-
as-if-serial语义和happens-before这么做的目的,都是为了在不改变程序执行结果的前提下,尽可能地提高程序执行的并行度。
TODO
Happens-before原则待补充。
原子性是指一个操作是不可中断的,要么全部执行成功要么全部执行失败,有着“同生共死”的感觉。即使在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程所干扰。
synchronized语义表示锁在同一时刻只能由一个线程进行获取,当锁被占用后,其他线程只能等待。因此,synchronized语义就要求线程在访问读写共享变量时只能“串行”执行,因此synchronized具有有序性。
在java内存模型中说过,为了性能优化,编译器和处理器会进行指令重排序;也就是说java程序天然的有序性可以总结为:如果在本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无序的。
可见性是指当一个线程修改了共享变量后,其他线程能够立即得知这个修改。
synchronized: 具有原子性,有序性和可见性; volatile:具有有序性和可见性
public class CreateNewThread {
// 第一种创建方式
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("T1");
}
}
// 第二种创建方式
static class Myrun implements Runnable {
@Override
public void run() {
System.out.println("T2");
}
}
public static void main(String[] args) {
new MyThread().start();
new Thread(new Myrun()).start();
new Thread(() -> {
System.out.println("T3");
}).start();
}
}
启动线程的三种方式:
- 继承Thread
- 实现Runnable
- Executors.newCachedThread 线程池启动
对某个对象上锁,锁的不是代码,是对象。
synchronized可以保持可见性、原子性和有序性。
下面两种上锁方式效果是一样的:
// 这块代码锁的是o这个对象
public class JUC02_Synchronized {
private int count = 10;
private Object o = new Object();
public void m() {
synchronized (o) {
count--;
}
}
}
// 这块代码锁的是JUC02_Synchronized2实例化的对象
public class JUC02_Synchronized2 {
private int count = 10;
private Object o = new Object();
public synchronized void m() {
count--;
}
}
// 对class进行上锁
public class JUC02_Synchronized3 {
private int count = 10;
private Object o = new Object();
public synchronized static void main(String[] args) { // 锁的是JUC02_Synchronized3.class
System.out.println("hello");
}
}
-
对于普通同步方法,锁是当前实例对象。
-
对于静态同步方法,锁是当前类的Class对象。
-
对于同步方法块,锁是Synchonized括号里配置的对象。
synchronized用的锁是存在Java对象头里的。如果对象是数组类型,则虚拟机用3个字宽 (Word)存储对象头,如果对象是非数组类型,则用2字宽存储对象头。在32位虚拟机中,1字宽 等于4字节,即32bit。
在32位虚拟机下,Mark word是32bit大小,在64位虚拟机下,Mark Word是64bit大小的。
一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请的时候仍然会得到该对象的锁。
注意:是同一个对象,同一把锁。
public class JUC02_Synchronized4 {
synchronized void m1() {
System.out.println("m1 start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
m2();
System.out.println("m1 start");
}
synchronized void m2() {
System.out.println("m2 start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("m2 start");
}
public synchronized static void main(String[] args) {
new JUC02_Synchronized4().m1();
}
}
默认情况下,程序出现异常,锁会被释放。
TODO
偏向锁—>自旋锁—>重量级锁
刚开始Synchronised对 对象加锁后,只是在对象头上记录下当前线程ID,并不是真正“锁”住对象,此时是偏向锁;如果有别的线程(假设为线程2)也想访问此对象,此时锁将升级为自旋锁,线程2会一直循环请求读此对象,如果循环了10次依然没有读到此对象,此时锁将升级为重量级锁。当为重量级锁时,如果有更多线程想来访问此对象,将进入一个等待队列里面,直到当前加锁线程访问完此对象。
加锁代码执行时间短,线程数少,用自旋锁;
加锁代码执行时间长,线程数多,用系统锁。
package com.paddx.test.concurrent;
public class SynchronizedDemo {
public void method() {
synchronized (this) {
System.out.println("Method 1 start");
}
}
}
上面代码反编译结果为:
每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:
-
如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
-
如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
-
如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。
执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
package com.paddx.test.concurrent;
public class SynchronizedMethod {
public synchronized void method() {
System.out.println("Hello World!");
}
}
反编译结果:
从反编译的结果来看,方法的同步并没有通过指令monitorenter和monitorexit来完成(理论上其实也可以通过这两条指令来实现),不过相对于普通方法,其常量池中多了ACC_SYNCHRONIZED标示符。JVM就是根据该标示符来实现方法的同步的:当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。在方法执行期间,其他任何线程都无法再获得同一个monitor对象。 其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。
volatile作用:
-
保证线程可见性
import java.util.concurrent.TimeUnit; public class T01_HelloVolatile { /*volatile*/ boolean running = true; // 对比一下有无volatile的情况下,整个程序运行结果的区别 void m() { System.out.println("m start"); while(running) { } System.out.println("m end!"); } public static void main(String[] args) { T01_HelloVolatile t = new T01_HelloVolatile(); new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t.running = false; } }
running变量如果不加volatile,"m end!"不会输出,加了volatile之后才会输出。
执行main和执行m的线程之间会有一个共享内存,共享内存保存类的成员变量,每个线程只是从共享内存里面把running变量复制一份副本,每次修改副本会立即更新running变量值,但是m方法并不会立即读共享内存里面的被修改后running变量,导致"m end!"一直不输出。加了volatile之后,main和m的执行线程互相可见,running被修改,其他线程立即能知道。
- MESI
- 缓存一致性协议
-
禁止指令重排序
-
DCL单例模式
单例模式是指内存里只有一个实例。下面Singleton就是一个单例类。
public class Singleton { private static final Singleton INSTANCE = new Singleton(); private Singleton() {}; public static Singleton getInstance() { return INSTANCE; } public static void main(String[] args) { Singleton s1 = Singleton.getInstance(); Singleton s2 = Singleton.getInstance(); System.out.println(s1 == s2); } }
-
Double Check Lock
Synchronized进一步细粒化后,如果使用双重检查,大概率不会运行出错,但是还是线程不安全的,要想实现线程安全,必须要加volatile。加volatile禁止指令重排序保证运行结果正确。
public class DoubleCheck { private static volatile DoubleCheck INSTANCE; // JIT private DoubleCheck() { } public static DoubleCheck getInstance() { // 省略业务代码 if (INSTANCE == null) { // 双重检查 synchronized (DoubleCheck.class) { if(INSTANCE == null) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } INSTANCE = new DoubleCheck(); } } } return INSTANCE; } public void m() { System.out.println("m"); } public static void main(String[] args) { for(int i=0; i < 100; i++) { new Thread(() -> { System.out.println(DoubleCheck.getInstance().hashCode()); }).start(); } } }
-
-
volatile不能保证原子性
Volatile并不能保证多个线程共同修改running变量说带来的不一致性问题,也就是说volatile不能替代synchronized。
import java.util.ArrayList; import java.util.List; public class VolatileVsSync { volatile int count = 0; /*synchronized*/ void m() { for (int i = 0; i < 10000; i++) { count++; } } public static void main(String[] args) { VolatileVsSync t = new VolatileVsSync(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
count++被编译成字节码,会分成三个指令,第一个从主内存拿到原始count,第二个在工作线程中执行+1操作,第三把累加后的值写回主内存。
上面的程序输出count的结果并不是10000,这是因为volatile不能保证原子性导致的脏读。对m()加synchronized关键字可以保证原子性,count最后结果一定是10000。
第二种保持volatile原子性的方法是,用AtmoicInteger。
-
volatile 引用类型(包括数组)只能保证引用本身的可见性,不能保证内部字段的可见性。
下面代码几秒之内并不会输出"m end!"。
import java.util.concurrent.TimeUnit; public class T02_VolatileReference1 { boolean running = true; volatile static T02_VolatileReference1 T = new T02_VolatileReference1(); void m() { System.out.println("m start"); while(running) { } System.out.println("m end!"); } public static void main(String[] args) { new Thread(T::m, "t1").start(); //lambda表达式 new Thread(new Runnable( run() {m()} try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } T.running = false; } }
- 可见性实现原理
在生成汇编代码时会在volatile修饰的共享变量进行写操作的时候会多出Lock前缀的指令(具体的大家可以使用一些工具去看一下,这里我就只把结果说出来)。我们想这个Lock指令肯定有神奇的地方,那么Lock前缀的指令在多核处理器下会发现什么事情了?主要有这两个方面的影响:
- 将当前处理器缓存行的数据写回系统内存;
- 这个写回内存的操作会使得其他CPU里缓存了该内存地址的数据无效
为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完不知道何时会写到内存。如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是,就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题。所以,在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里。因此,经过分析我们可以得出如下结论:
- Lock前缀的指令会引起处理器缓存写回内存;
- 一个处理器的缓存回写到内存会导致其他处理器的缓存失效;
- 当处理器发现本地缓存失效后,就会从内存中重读该变量数据,即可以获取当前最新值。
这样针对volatile变量通过这样的机制就使得每个线程都能获得该变量的最新值。
- 禁止指令重排序实现原理
volatile通过添加内存屏障,实现禁止指令重排序。
JMM内存屏障分为四类见下图:
java编译器会在生成指令系列时在适当的位置会插入内存屏障指令来禁止特定类型的处理器重排序。
JMM采取了保守策略:
- 在每个volatile写操作的前面插入一个StoreStore屏障;
- 在每个volatile写操作的后面插入一个StoreLoad屏障;
- 在每个volatile读操作的后面插入一个LoadLoad屏障;
- 在每个volatile读操作的后面插入一个LoadStore屏障。
需要注意的是:volatile写是在前面和后面分别插入内存屏障,而volatile读操作是在后面插入两个内存屏障
StoreStore屏障:禁止上面的普通写和下面的volatile写重排序;
StoreLoad屏障:防止上面的volatile写与下面可能有的volatile读/写重排序
LoadLoad屏障:禁止下面所有的普通读操作和上面的volatile读重排序
LoadStore屏障:禁止下面所有的普通写操作和上面的volatile读重排序
参考链接
- 公平锁是指多个线程按照申请锁的顺序来获取锁
- 非公平锁,是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,多个线程争夺锁。在高并发情况下,有可能会造成优先级反转或者饥饿现象。
reentranlock和synchronized默认是非公平锁。
可重入锁又名递归锁,指的是同一线程外层函数获得锁后,内层递归函数仍然能获得该锁的代码,即线程可以进入任何一个它已经拥有的锁,所同步着得代码块。
可重入锁最大的作用是避免死锁。
reentranlock和synchronized也是可重入锁。
自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
实现一个自旋锁
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author wardseptember
* @create 2020-09-21 21:25
*/
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName()+"\t come in");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void unLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(thread.getName()+"\t invoked myUnlock");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.lock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.unLock();
},"thread1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
spinLockDemo.lock();
spinLockDemo.unLock();
}, "thread2").start();
}
}
- 独占锁是指改锁一次只能被一个线程所持有
- 共享锁指该锁可被多个线程所持有
ReentrantLock和Synchronized都是独占锁;ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁。
读写锁使用演示:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author wardseptember
* @create 2020-09-21 23:31
*/
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"\t 正在写入:" + key);
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
} map.put(key, value);
System.out.println(Thread.currentThread().getName()+"\t 写入完成");
} catch (Exception e){
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public Object get(String key) {
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"\t 正在读取");
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object result = map.get(key);
System.out.println(Thread.currentThread().getName()+"\t 正在读取: "+result);
return result;
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
return null;
}
public void clearMap() {
map.clear();
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int keyValue = i;
new Thread(() -> {
myCache.put(keyValue+"", keyValue+ "");
}, i+"").start();
}
for (int i = 0; i < 5; i++) {
final int keyValue = i;
new Thread(() -> {
myCache.get(keyValue+"");
}, i+"").start();
}
}
}
总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中java.util.concurrent.atomic
包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。
总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronized
和ReentrantLock
等独占锁就是悲观锁思想的实现。
从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。
一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。
在修改之前,会有一个期望值,和要修改的值,如果期望值和我们设想的不一样,就不修改;如果一样就修改该值。CAS修改操作有CPU原语支持,中间不能中断,保存结果无误。
- ABA 问题
如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 "ABA"问题。
JDK 1.5 以后的 AtomicStampedReference 类
就提供了此种能力,其中的 compareAndSet 方法
就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
- 循环时间长开销大
自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
- 只能保证一个共享变量的原子操作
CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。但是从 JDK 1.5开始,提供了AtomicReference类
来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作.所以我们可以使用锁或者利用AtomicReference类
把多个共享变量合并成一个共享变量来操作。
简单的来说CAS适用于写比较少的情况下(多读场景,冲突一般较少),synchronized适用于写比较多的情况下(多写场景,冲突一般较多)
- 对于资源竞争较少(线程冲突较轻)的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源;而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。
- 对于资源竞争严重(线程冲突严重)的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized。
补充: Java并发编程这个领域中synchronized关键字一直都是元老级的角色,很久之前很多人都会称它为 “重量级锁” 。但是,在JavaSE 1.6之后进行了主要包括为了减少获得锁和释放锁带来的性能消耗而引入的 偏向锁 和 轻量级锁 以及其它各种优化之后变得在某些情况下并不是那么重了。synchronized的底层实现主要依靠 Lock-Free 的队列,基本思路是 自旋后阻塞,竞争切换后继续竞争锁,稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和CAS类似的性能;而线程冲突严重的情况下,性能远高于CAS。
自旋锁(spinlock):是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。
获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。
Synchronized优化一般指的是同步代码块中代码语句越少越好,就是锁细化;还有一种是锁的粗化。
import java.util.concurrent.TimeUnit;
public class FineCoarseLock {
int count = 0;
synchronized void m1() {
//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//业务逻辑中只有下面这句需要sync,这时不应该给整个方法上锁
count ++;
//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void m2() {
//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//业务逻辑中只有下面这句需要sync,这时不应该给整个方法上锁
//采用细粒度的锁,可以使线程争用时间变短,从而提高效率
synchronized(this) {
count ++;
}
//do sth need not sync
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-
Synchronized锁定某对象o后,如果o的属性发生变化,不影响锁使用;但如果o变成另一个对象,则锁定的对象发生变化。应该避免将锁定对象的引用变成另外的对象。
public class SyncSameObject { /*final*/ Object o = new Object(); void m() { synchronized(o) { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main(String[] args) { SyncSameObject t = new SyncSameObject(); //启动第一个线程 new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //创建第二个线程 Thread t2 = new Thread(t::m, "t2"); // 锁对象发生改变,所以t2线程得以执行,如果注释掉这句话,线程2将永远得不到执行机会 t.o = new Object(); t2.start(); } }
-
Synchronized不能用于锁String常量、Integer、Long,容易出问题。
在下面的例子中,m1和m2其实锁定的是同一个对象,这种情况还会发生比较诡异的现象,比如你用到了一个类库,在该类库中代码锁定了字符串“Hello”,但是你读不到源码,所以你在自己的代码中也锁定了"Hello",这时候就有可能发生非常诡异的死锁阻塞,因为你的程序和你用到的类库不经意间使用了同一把锁。
public class DoNotLockString { String s1 = "Hello"; String s2 = "Hello"; void m1() { synchronized(s1) { } } void m2() { synchronized(s2) { } } }
CAS是原子操作,可以保证线程安全。
下面代码无锁,也可以保证count最后结果是10000。
public class UseCAS {
AtomicInteger count = new AtomicInteger();
void m() {
for (int i = 0; i < 10000; i++) {
count.incrementAndGet(); // count++
}
}
public static void main(String[] args) {
UseCAS t = new UseCAS();
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(t::m, "thread-" + i));
}
threads.forEach((o) -> o.start());
threads.forEach((o) -> {
try {
o.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(t.count);
}
}
在修改之前,会有一个期望值,和要修改的值,如果期望值和我们设想的不一样,就不修改;如果一样就修改该值。CAS修改操作有CPU原语支持,中间不能中断,保存结果无误。
是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据,Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存。Java中CAS操作的执行依赖于Unsafe类的方法。
valueOffset表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。
变量value用volatile修饰,保证了多线程之间的内存可见性。
调用Unsafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令,这是一种完成依赖于硬件的功能,通过它实现了原子操作。由于CAS是一种系统原语,原语属于操作系统用语范畴,是由于若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是锁CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
ABA问题指如果期望值是A,操作之前被改成B,又改回A。如果是基础类型,没有影响;如果是引用类型,可能会有问题。
ABA问题可以用版本号解决,使用AtmoicStampedReference类。
分段锁
同一个线程内,锁可重入,就是可以再加一把锁,实际上没有加,而是直接调用。
可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。
reentrantlock可用于替代synchronized,它比synchronized的功能更强大。
-
Reentranlock必须要手动释放锁,并且加锁次数和释放锁次数要一样。synchronized如果遇到异常的话,jvm会自动释放锁。
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class T02_ReentrantLock2 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); // synchronized(this) for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } void m2() { try { lock.lock(); System.out.println("m2 ..."); } finally { lock.unlock(); } } public static void main(String[] args) { T02_ReentrantLock2 rl = new T02_ReentrantLock2(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
从上面的代码执行结果可以看出,另一个线程必须等当前线程使用完这个加锁对象,才能继续使用。如果是同一个线程,就可以直接继续使用,因为reentranlock是可重入锁。
-
Reentranlock有尝试加锁的功能
reentrantlock可以进行“尝试锁定”tryLock,如果这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待。
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class T03_ReentrantLock3 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行 * 可以根据tryLock的返回值来判定是否锁定 * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中 */ void m2() { boolean locked = false; try { locked = lock.tryLock(5, TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (locked) { lock.unlock(); } } } public static void main(String[] args) { T03_ReentrantLock3 rl = new T03_ReentrantLock3(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }
-
使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应。
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public class T04_ReentrantLock4 { public static void main(String[] args) { Lock lock = new ReentrantLock(); Thread t1 = new Thread(()->{ try { lock.lock(); System.out.println("t1 start"); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); System.out.println("t1 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t1.start(); Thread t2 = new Thread(()->{ try { //lock.lock(); lock.lockInterruptibly(); //可以对interrupt()方法做出响应 System.out.println("t2 start"); TimeUnit.SECONDS.sleep(5); System.out.println("t2 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t2.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t2.interrupt(); // 打断线程2的等待 } }
-
ReentrantLock还可以指定为公平锁
import java.util.concurrent.locks.ReentrantLock; public class T05_ReentrantLock5 extends Thread { // 参数为true表示为公平锁,请对比输出结果 private static ReentrantLock lock = new ReentrantLock(true); @Override public void run() { for(int i = 0; i < 100; i++) { lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"获得锁"); }finally{ lock.unlock(); } } } public static void main(String[] args) { T05_ReentrantLock5 rl = new T05_ReentrantLock5(); Thread th1=new Thread(rl); Thread th2=new Thread(rl); th1.start(); th2.start(); } }
countDownLatch这个类使一个主线程等待其他线程执行完毕后再执行。
它是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
它其实是作用于线程当中的,它就像一个门栓,一开始是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,当倒计时一到,等待的所有线程都可以通过。
要注意的是,它是一次性的,打开之后就不能关上了。
- 主线程等待其他线程执行完毕再执行
import java.util.concurrent.CountDownLatch;
public class T06_TestCountDownLatch {
public static void main(String[] args) {
usingJoin();
usingCountDownLatch();
}
private static void usingCountDownLatch() {
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j < 10000; j++) {
result += j;
}
latch.countDown();
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
try {
latch.await(); // 等待这100线程结束
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end latch");
}
private static void usingJoin() {
Thread[] threads = new Thread[100];
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j < 10000; j++) {
result += j;
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("end join");
}
}
- CountDownLatch还可以用于同时启动多个线程
public class CountDownLatchDemo {
static class TaskThread extends Thread {
CountDownLatch latch;
public TaskThread(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println(getName() + "start" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException{
int threadNum = 10;
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < threadNum; ++i) {
TaskThread taskThread = new TaskThread(latch);
taskThread.start();
}
latch.countDown();
}
}
从字面上的意思可以知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
- parties 是参与线程的个数
- 第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
public class CyclicBarrierDemo {
static class TaskThread extends Thread {
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(getName() + " 到达栅栏 A");
barrier.await();
System.out.println(getName() + " 冲破栅栏 A");
Thread.sleep(2000);
System.out.println(getName() + " 到达栅栏 B");
barrier.await();
System.out.println(getName() + " 冲破栅栏 B");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "满足threadNum,冲");
}
});
for(int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
可以用于多线程计算数据,最后合并计算结果的场景。
-
CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
-
CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
在Phaser中,它把多个线程协作执行的任务划分为多个阶段,编程时需要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程都可以随时注册并参与到某个阶段。
构造
Phaser创建后,初始阶段编号为0,构造函数中指定初始参与个数。
注册:Registration
Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量。
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserDemo {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
static void milliSleep(int milli) {
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
phaser.bulkRegister(7);
for(int i = 0; i < 5; i++) {
new Thread(new Person("p" + i)).start();
}
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0: // 第一阶段做的事
System.out.println("所有人到齐了!" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("所有人吃完了!" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("所有人离开了!" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
public Person(String name) {
this.name = name;
}
public void arrive() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 到达现场!\n", name);
phaser.arriveAndAwaitAdvance();
}
public void eat() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 吃完!\n", name);
phaser.arriveAndAwaitAdvance();
}
public void leave() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 离开!\n", name);
phaser.arriveAndAwaitAdvance();
}
private void hug() {
if(name.equals("新郎") || name.equals("新娘")) {
milliSleep(r.nextInt(1000));
System.out.printf("%s 洞房!\n", name);
phaser.arriveAndAwaitAdvance();
} else {
phaser.arriveAndDeregister();
//phaser.register()
}
}
@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
}
-
写锁,排它锁
-
读锁,共享锁
ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。 Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。
- 非公平模式(默认) 当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。
- 公平模式 当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。
锁降级:从写锁变成读锁;锁升级:从读锁变成写锁
ReentrantReadWriteLock支持锁降级,不支持锁升级。
ReetrantReadWriteLock读锁使用共享模式,即:同时可以有多个线程并发地读数据。写锁使用独占模式,换句话说,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。
读写锁之间为互斥,即不能同时对加锁对象操作。ReetrantReadWriteLock读写锁的实现中,需要注意的,当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁。
public class ReadWriteLockDemo {
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("write over!");
value = v;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
Runnable readRunnable = () -> read(readLock);
Runnable writeRunnable = () -> write(writeLock, new Random().nextInt());
for(int i = 0; i < 18; i++) {
new Thread(readRunnable).start();
}
for(int i = 0; i < 2; i++) {
new Thread(writeRunnable).start();
}
}
}
TODO
Semaphore 是一个计数信号量,必须由获取它的线程释放。
常用于限制可以访问某些资源的线程数量,例如通过 Semaphore 限流。
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
// 允许一个线程同时执行
Semaphore s = new Semaphore(1);
// 允许两个线程同时执行,第二个参数为true,表示公平模式
//Semaphore s = new Semaphore(2, true);
new Thread(()->{
try {
s.acquire();
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();
new Thread(()->{
try {
s.acquire();
System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
java.util.concurrent包中的Exchanger类可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。
import java.util.concurrent.Exchanger;
public class TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(()->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
}
LockSupport
是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,当然阻塞之后肯定得有唤醒的方法。
常用方法:
public static void park(Object blocker); // 暂停当前线程
public static void parkNanos(Object blocker, long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(Object blocker, long deadline); // 暂停当前线程,直到某个时间
public static void park(); // 无期限暂停当前线程
public static void parkNanos(long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(long deadline); // 暂停当前线程,直到某个时间
public static void unpark(Thread thread); // 恢复当前线程
public static Object getBlocker(Thread t);
-
park
不需要获取某个对象的锁 -
park和unpark
可以实现类似wait和notify
的功能,但是并不和wait和notify
交叉,也就是说unpark
不会对wait
起作用,notify
也不会对park
起作用。 -
park和unpark
的使用不会出现死锁的情况 -
相对于线程的
stop和resume
,park和unpark
的先后顺序并不是那么严格。stop和resume
如果顺序反了,会出现死锁现象。而park和unpark
却不会。park和unpark
会对每个线程维持一个许可(boolean值)。- unpark调用时,如果当前线程还未进入park,则许可为true
- park调用时,判断许可是否为true,如果是true,则继续往下执行;如果是false,则等待,直到许可为true
-
blocker的作用是在dump线程的时候看到阻塞对象的信息
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class TestLockSupport {
public static void main(String[] args) {
Thread t = new Thread(()->{
for (int i = 0; i < 10; i++) {
System.out.println(i);
if(i == 5) {
LockSupport.park(); // 无限期暂停当前线程,直到unpark执行
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
LockSupport.unpark(t);
}
}
实现一个容器,提供两个方法,add、size,写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
- 使用wait、notify
import java.util.ArrayList;
import java.util.List;
/**
* @author wardseptember
* @create 2020-07-05 20:30
*/
public class WaitAndNotify {
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
static Object lock = new Object();
public static void main(String[] args) {
WaitAndNotify waitAndNotify = new WaitAndNotify();
Thread t2 = new Thread(() -> {
synchronized (lock) {
System.out.println("t2 启动");
if (waitAndNotify.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");
// 通知t1继续执行
lock.notify();
}
});
Thread t1 = new Thread(() -> {
synchronized (lock) {
System.out.println("t1 启动");
for (int i = 0; i < 10; i++) {
waitAndNotify.add(i);
System.out.println("add " + i);
if (waitAndNotify.size() == 5) {
lock.notify();
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
t2.start();
t1.start();
}
}
- 使用CountDownLatch
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author wardseptember
* @create 2020-07-05 20:30
*/
public class CountDownLatch_Taobao {
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
public static void main(String[] args) {
CountDownLatch countDownLatch1 = new CountDownLatch(1);
CountDownLatch countDownLatch2 = new CountDownLatch(1);
CountDownLatch_Taobao latchTaobao = new CountDownLatch_Taobao();
Thread t2 = new Thread(() -> {
System.out.println("t2 启动");
if (latchTaobao.size() != 5) {
try {
// t2 暂停
countDownLatch1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");
// 放行t1
countDownLatch2.countDown();
});
Thread t1 = new Thread(() -> {
System.out.println("t1 启动");
for (int i = 0; i < 10; i++) {
latchTaobao.add(i);
System.out.println("add " + i);
if (latchTaobao.size() == 5) {
// 放行t2
countDownLatch1.countDown();
try {
// t1暂停
countDownLatch2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t2.start();
t1.start();
}
}
- 使用LockSupport
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
/**
* @author wardseptember
* @create 2020-07-05 20:30
*/
public class LockSupport_Taobao {
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
static Thread t1 = null, t2 = null;
public static void main(String[] args) {
LockSupport_Taobao latchTaobao = new LockSupport_Taobao();
t2 = new Thread(() -> {
System.out.println("t2 启动");
if (latchTaobao.size() != 5) {
LockSupport.park(t2);
}
System.out.println("t2 结束");
LockSupport.unpark(t1);
});
t1 = new Thread(() -> {
System.out.println("t1 启动");
for (int i = 0; i < 10; i++) {
latchTaobao.add(i);
System.out.println("add " + i);
if (latchTaobao.size() == 5) {
LockSupport.unpark(t2);
LockSupport.park(t1);
}
}
});
t2.start();
t1.start();
}
}
写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。
- 使用wait和notify/notifyAll来实现
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
/**
* @author wardseptember
* @create 2020-07-05 21:36
*/
public class MyContainer1<T> {
final private LinkedList<T> list = new LinkedList<>();
final private int MAX = 10;
public synchronized void put(T t) {
while (list.size() == MAX) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(t);
this.notifyAll();
}
public synchronized T get() {
T t = null;
while (list.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = list.removeFirst();
this.notifyAll();
return t;
}
public static void main(String[] args) {
MyContainer1<String> c = new MyContainer1<>();
// 启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for(int j = 0; j < 5; j++) {
System.out.println(c.get());
}
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for(int j = 0; j < 25; j++) {
c.put(Thread.currentThread().getName() + " " + j);
}
}, "p" + i).start();
}
}
}
- 使用ReentrantLock
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author wardseptember
* @create 2020-07-05 21:36
*/
public class MyContainer2<T> {
final private LinkedList<T> list = new LinkedList<>();
final private int MAX = 10;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(T t) {
try {
lock.lock();
while (list.size() == MAX) {
producer.await();
}
list.add(t);
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
try {
lock.lock();
while (list.size() == 0) {
consumer.await();
}
t = list.removeFirst();
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
// 启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for(int j = 0; j < 5; j++) {
System.out.println(c.get());
}
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for(int j = 0; j < 25; j++) {
c.put(Thread.currentThread().getName() + " " + j);
}
}, "p" + i).start();
}
}
}
-
synchronized是关键字属于JVM层面
monitorenter,底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象,只有在同步块或者方法中才能调用wait/notify等方法
monitorexit
-
lock是具体类(java.util.concurrent.locks.lock)是api层面的锁
synchronized不需要用户去手动释放锁,当synchronized代码执行完后系统会自动让线程释放对锁的占用。
reentrantlock需要用户去手动释放锁,若没有主动释放锁,就有可能导致出现死锁的现象。
synchronized不可中断,除非抛出异常或者正常运行完成
Reentranlock 可中断:
- 设置超时方法 trylock(long timeout, TimeUnit unit)
- lockInterruptibly放代码块中,调用interrupt()方法可中断
synchronized是非公平锁
reentrantlock可以非公平锁也可以是公平锁,默认为非公平锁。
synchronized不可以
Reentrantlock用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
双向链表加一个volatile int state,链表的每一个节点是一个线程。
VarHandle是指向某个对象的引用。可以用于原子性操作。
- 普通属性也可以通过VarHandle进行原子性操作
- 比反射快,直接操作二进制码
ThreadLocal是线程独有,别的线程不能修改。
ThreadLocal用途
- 声明式事务,保证同一个Connection
见之前教程引用
软引用,只有在内存不够用时才会被回收,多用于做缓存。
弱引用,只要进行垃圾回收,弱引用就会被回收,多用于容器里面,ThreadLocal是其一个应用例子。weakHashMap
ThreadLocal使用完了,手动remove掉。
虚引用
插入效率低一些,查询时效率高。
教程见ConcurrentHashMap详解(基于1.7和1.8)
是一个有序的并发Map。
读多写少可用CopyOnWriteList,写时加锁,读时不加锁。
两个线程交替输出A1B2C3...Z26
- 使用LockSupport
import java.util.concurrent.locks.LockSupport;
/**
* @author wardseptember
* @create 2020-07-06 16:50
*/
public class LockSupport_Huawei {
static Thread t1 = null, t2 = null;
public static void main(String[] args) {
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();
t1 = new Thread(() -> {
for (char c : aC) {
System.out.println(c);
LockSupport.unpark(t2);
LockSupport.park();
}
}, "t1");
t2 = new Thread(() -> {
for (char c : aI) {
LockSupport.park();
System.out.println(c);
LockSupport.unpark(t1);
}
}, "t2");
t1.start();
t2.start();
}
}
- 使用wait和notify
/**
* @author wardseptember
* @create 2020-07-05 20:30
*/
public class WaitAndNotify_Huawei {
static Object lock = new Object();
public static void main(String[] args) {
WaitAndNotify_Huawei waitAndNotify = new WaitAndNotify_Huawei();
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();
Thread t2 = new Thread(() -> {
synchronized (lock) {
for (char c : aC) {
System.out.println(c);
try {
lock.notify();
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.notify();
}
});
Thread t1 = new Thread(() -> {
synchronized (lock) {
for (char c : aI) {
System.out.println(c);
try {
lock.notify();
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.notify();
}
});
t2.start();
t1.start();
}
}
在多线程领域:所谓阻塞,即在某些情况下会挂起线程,一旦条件满足,被挂起的线程又会自动被唤醒。
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
- 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞
Queue提供了一些对线程友好的API,Offer、peek、poll
BlockingQueue提供Put take有阻塞的功能
由数组结构组成的有界阻塞队列
由链表结构组成的有界阻塞队列,默认大小为Integer.MAX_VALUE.
实现了生产者消费者模式,queue为空不能取,能往里面加;queue满了只能取,不能添加。
支持优先级排序的无界阻塞队列
使用优先级队列实现的延迟无界阻塞队列。
java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。
延时队列实现了Iterator接口,但iterator()遍历顺序不保证是元素的实际存放顺序。
不存储元素的阻塞队列,也即单个元素的队列。
容量为零。等着有人拿,才能往里装,相当于直接递到另一个线程手里,多用于两个线程之间交换数据。
transferQueue是往queue装数据,然后等着被取走,阻塞着等着。
由链表结构组成的无界阻塞队列。
由链表结构组成的双向阻塞队列
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author wardseptember
* @create 2020-09-22 14:20
*/
class MyResource {
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void myProduct() throws Exception {
String data = null;
boolean retValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"成功");
} else {
System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"\t 生成停止");
}
public void myConsumer() throws Exception {
String result = null;
while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (result == null || result.equalsIgnoreCase("")) {
FLAG = true;
System.out.println(Thread.currentThread().getName()+"\t 超过2秒钟没有取到,消费者退出");
return;
}
System.out.println(Thread.currentThread().getName()+"\t 消费队列" + result + "成功");
}
}
public void stop() throws Exception {
this.FLAG = false;
}
}
public class BlockingQueueProductConsumer {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
try {
myResource.myProduct();
} catch (Exception e) {
e.printStackTrace();
}
}, "product").start();
new Thread(() -> {
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
myResource.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
主要特点:线程复用、控制最大并发数和管理线程
-
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
-
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
-
提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开 来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线 程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程 也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器 (Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到 硬件处理器上。
从图中可以看出,应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统 内核控制,下层的调度不受应用程序的控制。
Executor框架主要由3大部分组成如下。
-
任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。
-
任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的 ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口 (ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
-
异步计算的结果。包括接口Future和实现Future接口的FutureTask类。
简介:
-
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开 来。
-
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。 ·ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
-
Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
-
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled- ThreadPoolExecutor执行。
主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一 个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。
然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(Executor- Service.submit(Runnable task)或ExecutorService.submit(Callable<T>task))
。
如果执行ExecutorService.submit(...),ExecutorService将返回一个实现Future接口的对象 (到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可 以创建FutureTask,然后直接交给ExecutorService执行。
最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行
FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
- 使用Executor
import java.util.concurrent.Executor;
/**
* @author wardseptember
* @create 2020-07-08 12:35
*/
public class MyExecutor implements Executor {
public static void main(String[] args) {
new MyExecutor().execute(() -> System.out.println("hello executor"));
}
@Override
public void execute(Runnable command) {
command.run();
}
}
- 使用Callable
import java.util.concurrent.*;
/**
* @author wardseptember
* @create 2020-07-08 13:46
*/
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> c = new Callable<String>() {
@Override
public String call() throws Exception {
return "hello callable";
}
};
ExecutorService service = Executors.newCachedThreadPool();
// 异步
Future<String> future = service.submit(c);
// 阻塞
System.out.println(future.get());
service.shutdown();
}
}
- FutureTask
FutureTask实现了RunnableFuture接口,RunnableFuture继承自Runnable、Future,因此FutureTask可以直接丢给线程运行,并且能获取返回值。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* @author wardseptember
* @create 2020-07-08 13:56
*/
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> task = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(1);
return 1000;
});
// 异步操作
new Thread(task).start();
// 阻塞操作
System.out.println(task.get());
}
}
Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、 Future接口、Runnable接口、Callable接口和Executors。
- ThreadPoolExecutor
ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的
ThreadPoolExecutor: SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下:
- ScheduledThreadPoolExecutor,包含若干个线程的ScheduledThreadPoolExecutor。
- SingleThreadScheduledExecutor,只包含一个线程的ScheduledThreadPoolExecutor。
- Future接口
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。当我们把Runnable 接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或 ScheduledThreadPoolExecutor时,ThreadPoolExecuto或ScheduledThreadPoolExecutor会向我们 返回一个FutureTask对象。
- Runnable接口和Callable接口
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled- ThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结 果。
除了可以自己创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个 Runnable包装成一个Callable。
CompletableFuture管理多个Future结果等等。
Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成。
-
corePool:核心线程池的大小。
-
maximumPool:最大线程池的大小。
-
BlockingQueue:用来暂时保存任务的工作队列。
-
RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和 时(达到了最大线程池大小且工作队列已满),execute()方法将要调用的Handler。
ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue<Runnable> |
线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
- corePoolSize、maximumPoolSize
线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。
当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。
- prestartCoreThread
prestartCoreThread 核心线程预启动
在默认情况下,只有当新任务到达时,才开始创建和启动核心线程,但是我们可以使用 prestartCoreThread()
和 prestartAllCoreThreads()
方法动态调整。
如果使用非空队列构建池,则可能需要预先启动线程。
方法 | 作用 |
---|---|
prestartCoreThread() | 创一个空闲任务线程等待任务的到达 |
prestartAllCoreThreads() | 创建核心线程池数量的空闲任务线程等待任务的到达 |
- ThreadFactory 线程工厂
新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从newThread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
- Keep-alive times 线程存活时间
如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止 ( 请参阅getKeepAliveTime(TimeUnit) )。这提供了一种在不积极使用线程池时减少资源消耗的方法。
如果池在以后变得更加活跃,则应构建新线程。 也可以使用方法setKeepAliveTime(long,TimeUnit)
进行动态调整。
防止空闲线程在关闭之前终止,可以使用如下方法:
setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)
也可用于将此超时策略应用于核心线程。
- `BlockingQueue
BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联。
-
如果当前线程池任务线程数量小于核心线程池数量,执行器总是优先创建一个任务线程,而不是从线程队列中取一个空闲线程。
-
如果当前线程池任务线程数量大于核心线程池数量,执行器总是优先从线程队列中取一个空闲线程,而不是创建一个任务线程。
-
如果当前线程池任务线程数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到线程数量等于maximumPoolSize;如果线程数量等于maximumPoolSize,线程池无空闲线程,且队列是满的,则任务将会被拒绝。
主要有三种队列策略:
- Direct handoffs 直接握手队列 Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。 此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。
- Unbounded queues 无界队列** 当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。
- Bounded queues 有界队列 一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:
- 使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
- 使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
-
Rejected tasks 拒绝任务 拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满; 无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:
- AbortPolicy:默认的策略,抛出RejectedExecutionException运行时异常;
- CallerRunsPolicy:在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务;
- DiscardPolicy:直接丢弃新提交的任务;
- DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程); 我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景
-
Hook methods 钩子方法
ThreadPoolExecutor为提供了每个任务执行前后提供了钩子方法,重写beforeExecute(Thread,Runnable)
和afterExecute(Runnable,Throwable)
方法来操纵执行环境; 例如,重新初始化ThreadLocals,收集统计信息或记录日志等。此外,terminated()
在Executor完全终止后需要完成后会被调用,可以重写此方法,以执行任殊处理。
注意:如果hook或回调方法抛出异常,内部的任务线程将会失败并结束。
- Queue maintenance 维护队列
getQueue()
方法可以访问任务队列,一般用于监控和调试。绝不建议将这个方法用于其他目的。当在大量的队列任务被取消时,remove()
和purge()
方法可用于回收空间。
- Finalization 关闭
如果程序中不在持有线程池的引用,并且线程池中没有线程时,线程池将会自动关闭。如果您希望确保即使用户忘记调用 shutdown()
方法也可以回收未引用的线程池,使未使用线程最终死亡。那么必须通过设置适当的 keep-alive times 并设置allowCoreThreadTimeOut(boolean) 或者 使 corePoolSize下限为0 。
一般情况下,线程池启动后建议手动调用shutdown()关闭。
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author wardseptember
* @create 2020-07-08 14:54
*/
public class ThreadPoolExecutorDemo {
public static void main(String[] args) throws IOException {
int corePoolSize = 2;
int maxmunPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
ArrayBlockingQueue<Runnable> blockingDeque = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = new NameThreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxmunPoolSize, keepAliveTime,
unit, blockingDeque, threadFactory, handler);
// 预启动所有核心线程
// threadPoolExecutor.prestartAllCoreThreads();
for (int i = 1; i <= 10; ++i) {
MyTask task = new MyTask(String.valueOf(i));
threadPoolExecutor.execute(task);
}
// 阻塞主线程
System.in.read();
threadPoolExecutor.shutdown();
}
static class NameThreadFactory implements ThreadFactory {
private final AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + atomicInteger.getAndIncrement());
System.out.println(t.getName() + "被创建");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
doLog(r, executor);
}
private void doLog(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "被拒绝");
}
}
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.toString() + "正在运行");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
运行结果
my-thread-1被创建
my-thread-2被创建
my-thread-3被创建
my-thread-4被创建
MyTask [name=2]正在运行
MyTask [name=1]正在运行
MyTask [name=5]正在运行
MyTask [name=6]正在运行
MyTask [name=7]被拒绝
MyTask [name=8]被拒绝
MyTask [name=9]被拒绝
MyTask [name=10]被拒绝
MyTask [name=3]正在运行
MyTask [name=4]正在运行
按照上面的三条规则可以知道,1、2任务来时直接创建核心线程处理,当3、4任务来时并不会直接创建线程处理,而是进入阻塞队列,当阻塞队列慢时,且核心线程没有空闲,则会创建非核心线程,所以可以看到5、6任务正在运行;当线程数达到最大值,且所有线程都在忙碌,阻塞队列也是满的,则会直接拒绝任务,调用RejectedExecutionHandler。
单线程线程池,可以保证任务顺序执行,还可以进行生命周期管理。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author wardseptember
* @create 2020-07-09 16:01
*/
public class SingleThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j = i;
executorService.execute(() -> {
System.out.println(j + " " + Thread.currentThread().getName());
});
}
executorService.shutdown();
}
}
CachedThreadPool是一个没有核心线程的线程池,当新任务来临时,如果非核心线程在忙碌,则直接新创建一个线程。SynchronousQueue是一个手把手传递的一个阻塞队列,可见上面的教程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author wardseptember
* @create 2020-07-09 16:23
*/
public class CachedPoolDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 2; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
TimeUnit.SECONDS.sleep(62);
System.out.println(service);
service.shutdown();
}
}
FixedThreadPool是一个固定线程数的线程池。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author wardseptember
* @create 2020-07-09 16:30
*/
public class FixedThreadPoolDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
getPrime(1, 200000);
long end = System.currentTimeMillis();
System.out.println(end - start);
final int cpuCoreNum = 4;
ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
MyTask t2 = new MyTask(80001, 130000);
MyTask t3 = new MyTask(130001, 170000);
MyTask t4 = new MyTask(170001, 200000);
Future<List<Integer>> f1 = service.submit(t1);
Future<List<Integer>> f2 = service.submit(t2);
Future<List<Integer>> f3 = service.submit(t3);
Future<List<Integer>> f4 = service.submit(t4);
start = System.currentTimeMillis();
f1.get();
f2.get();
f3.get();
f4.get();
end = System.currentTimeMillis();
System.out.println(end - start);
}
static class MyTask implements Callable<List<Integer>> {
int startPos, endPos;
MyTask(int s, int e) {
this.startPos = s;
this.endPos = e;
}
@Override
public List<Integer> call() throws Exception {
List<Integer> r = getPrime(startPos, endPos);
return r;
}
}
static boolean isPrime(int num) {
for(int i=2; i<=num/2; i++) {
if(num % i == 0) {
return false;
}
}
return true;
}
static List<Integer> getPrime(int start, int end) {
List<Integer> results = new ArrayList<>();
for(int i=start; i<=end; i++) {
if(isPrime(i)) {
results.add(i);
}
}
return results;
}
}
ScheduledThreadPoolExecutor是定时任务线程池,其本质上依然是ThreadPoolExecutor。
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author wardseptember
* @create 2020-07-09 16:39
*/
public class ScheduledPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(()->{
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MILLISECONDS);
}
}
不使用Executors创建线程池,因为FixedThreadPool和SingleThreadPool,允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;CachedThreadPool和ScheduledThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
// 8. `runStateLessThan()`,线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 9. `runStateAtLeast()`,线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 基本类型参数校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 空指针校验
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
// 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// worker数量比核心线程数小,直接创建worker执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// worker数量超过核心线程数,任务直接进入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
if (! isRunning(recheck) && remove(command))
reject(command);
// 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
// 这儿有3点需要注意:
// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
// 2. addWorker第2个参数表示是否创建核心线程
// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
// (rs > SHUTDOWN) ||
// (rs == SHUTDOWN && firstTask != null) ||
// (rs == SHUTDOWN && workQueue.isEmpty())
// 1. 线程池状态大于SHUTDOWN时,直接返回false
// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层自旋
for (;;) {
int wc = workerCountOf(c);
// worker数量超过容量,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS的方式增加worker数量。
// 若增加成功,则直接跳出外层循环进入到第二部分
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生变化,对外层循环进行自旋
if (runStateOf(c) != rs)
continue retry;
// 其他情况,直接内层循环进行自旋即可
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker的添加必须是串行的,因此需要加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 这儿需要重新检查线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker已经调用过了start()方法,则不再创建worker
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// worker创建并添加到workers成功
workers.add(w);
// 更新`largestPoolSize`变量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动worker线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// 省略代码...
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 调用unlock()是为了让外部可以中断
w.unlock(); // allow interrupts
// 这个变量用于判断是否进入过自旋(while循环)
boolean completedAbruptly = true;
try {
// 这儿是自旋
// 1. 如果firstTask不为null,则执行firstTask;
// 2. 如果firstTask为null,则调用getTask()从队列获取任务。
// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
while (task != null || (task = getTask()) != null) {
// 这儿对worker进行加锁,是为了达到下面的目的
// 1. 降低锁范围,提升性能
// 2. 保证每个worker执行的任务是串行的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,则对当前线程进行中断操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
// 这两个方法在当前类里面为空实现。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 帮助gc
task = null;
// 已完成任务数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 自旋操作被退出,说明线程池正在结束
processWorkerExit(w, completedAbruptly);
}
}
CPU核数+1
- 由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数 * 2
- CPU核数 / (1 - 阻塞系数),阻塞系数在0.8 - 0.9之间
- 分解汇总的任务
- 用很少的线程可以执行很多的任务
- CPU密集型
ForkJoinPool并不适合所有场景; 特别适合用于“分而治之”,递归计算的算法;
ForkJoinPool会将大任务分成小任务,然后小任务由各个线程去运行,这个过程就是Fork;将运行的结果汇总到一起,这个过程就是Join。ForkJoinPool线程池内每一个线程都维护一个阻塞队列,当某个线程执行完队列里的任务时就会去抢着执行别的线程队列里的任务。
RecursiveAction是用于定义任务和计算方式,其继承自ForkJoinTask,ForkJoinTask实现了Future、Serializable接口。
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
/**
* @author wardseptember
* @create 2020-07-09 17:41
*/
public class ForkJoinPoolDemo {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static long finallySum = 0L;
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}
System.out.println("流处理结果: " + Arrays.stream(nums).sum()); // stream api
}
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) {
sum += nums[i];
}
finallySum += sum;
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int mid = start + (end - start) / 2;
AddTask task1 = new AddTask(start, mid);
AddTask task2 = new AddTask(mid, end);
task1.fork();
task2.fork();
}
}
}
public static void main(String[] args) throws IOException, InterruptedException {
ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);
ForkJoinPoolDemo temp = new ForkJoinPoolDemo();
TimeUnit.SECONDS.sleep(2);
System.out.println("ForkJoinPool计算结果:" + finallySum);
System.in.read();
}
}
RecursiveTask跟RecursiveAction,不过他有返回值。
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* @author wardseptember
* @create 2020-07-09 17:41
*/
public class ForkJoinPoolDemo2 {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}
// stream api
System.out.println("流处理结果: " + Arrays.stream(nums).sum());
}
static class AddTaskReturn extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
int start, end;
AddTaskReturn(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) {
sum += nums[i];
}
return sum;
}
int middle = start + (end-start)/2;
AddTaskReturn subTask1 = new AddTaskReturn(start, middle);
AddTaskReturn subTask2 = new AddTaskReturn(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
AddTaskReturn task = new AddTaskReturn(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println("ForkJoinPool计算结果:" + result);
ForkJoinPoolDemo2 temp = new ForkJoinPoolDemo2();
System.in.read();
}
}
WorkStealingPool实际上直接新建一个ForkJoinPool对象。
线程池内每一个线程都维护一个阻塞队列,当某个线程执行完队列里的任务时就会去抢着执行别的线程队列里的任务。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author wardseptember
* @create 2020-07-09 18:36
*/
public class WorkStealingPoolDemo {
public static void main(String[] args) throws IOException {
ExecutorService service = Executors.newWorkStealingPool();
System.out.println(Runtime.getRuntime().availableProcessors());
service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000)); //daemon
service.execute(new R(2000));
// 由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
System.in.read();
}
static class R implements Runnable {
int time;
R(int t) {
this.time = t;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
WorkStealingPool就是ForkJoinPool,为什么还要他的存在?相当于帮我们定义了一个ForkJoinPool线程池,方便使用。
并发是指任务提交,并行是指任务执行。parallelStream计算一个并行执行的api。
Lamaba编程是1.8新添加的,之后会专门整理一个使用教程和其实现原理。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* @author wardseptember
* @create 2020-07-09 18:41
*/
public class ParallelStreamAPIDemo {
public static void main(String[] args) {
List<Integer> nums = new ArrayList<>();
Random r = new Random();
for(int i = 0; i < 10000; i++) {
nums.add(1000000 + r.nextInt(1000000));
}
// System.out.println(nums);
long start = System.currentTimeMillis();
nums.forEach(v->isPrime(v));
long end = System.currentTimeMillis();
System.out.println(end - start);
// 使用parallel stream api
start = System.currentTimeMillis();
nums.parallelStream().forEach(ParallelStreamAPIDemo::isPrime);
end = System.currentTimeMillis();
System.out.println(end - start);
}
static boolean isPrime(int num) {
for(int i = 2; i <= num / 2; i++) {
if (num % i == 0) {
return false;
}
}
return true;
}
}
多线程条件下会报java.util.ConcurrentModificationException
-
使用new Vector<>();其加锁sync,实现线程安全
-
Collections.synchronizedList(new ArrayList<>());
-
写时复制,使用
new CopyOnWriteArrayList<>();
CopyOnWrite容器即写时复制的容器,往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行copy,复制出一个新的容器Object[] newElements,然后新的容器Object[] newElements里添加元素,添加完元素后,再将原容器的引用指向新的容器setArray(newElements);这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁;因为当前容器不会添加任何元素,所以CopyOnWrite容器也是一种读写分离的思想,读和写在不同的容器里面。
hashset线程不安全,HashSet底层是HashMap
-
Collections.synchronizedSet(new HashSet<>());
-
写时复制,使用
new CopyOnWriteArraySet<>();
底层还是CopyOnWriteArrayList
hashmap线程不安全,使用ConcurrentHashMap.
http://openjdk.java.net/projects/code-tools/jmh/
JMH是java测试工具。
-
创建Maven项目,添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <encoding>UTF-8</encoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <groupId>mashibing.com</groupId> <artifactId>HelloJMH2</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core --> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> <version>1.21</version> </dependency> <!-- https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess --> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-generator-annprocess</artifactId> <version>1.21</version> <scope>test</scope> </dependency> </dependencies> </project>
-
idea安装JMH插件 JMH plugin v1.0.3
-
由于用到了注解,打开运行程序注解配置
compiler -> Annotation Processors -> Enable Annotation Processing
-
定义需要测试类PS (ParallelStream)
package com.mashibing.jmh; import java.util.ArrayList; import java.util.List; import java.util.Random; public class PS { static List<Integer> nums = new ArrayList<>(); static { Random r = new Random(); for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000)); } static void foreach() { nums.forEach(v->isPrime(v)); } static void parallel() { nums.parallelStream().forEach(PS::isPrime); } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } }
-
写单元测试
这个测试类一定要在test package下面
package com.mashibing.jmh; import org.openjdk.jmh.annotations.Benchmark; import static org.junit.jupiter.api.Assertions.*; public class PSTest { @Benchmark public void testForEach() { PS.foreach(); } }
-
运行测试类,如果遇到下面的错误:
ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock (拒绝访问。), exiting. Use -Djmh.ignoreLock=true to forcefully continue. at org.openjdk.jmh.runner.Runner.run(Runner.java:216) at org.openjdk.jmh.Main.main(Main.java:71)
这个错误是因为JMH运行需要访问系统的TMP目录,解决办法是:
打开RunConfiguration -> Environment Variables -> include system environment viables
-
阅读测试报告
-
Warmup 预热,由于JVM中对于特定代码会存在优化(本地化),预热对于测试结果很重要
-
Mesurement 总共执行多少次测试
-
Timeout
-
Threads 线程数,由fork指定
-
Benchmark mode 基准测试的模式
-
Benchmark 测试哪一段代码
disruptor是一个单机版的高性能的消息队列,使用CAS实现,把性能发挥到了极致。
主页:http://lmax-exchange.github.io/disruptor/
源码:https://github.com/LMAX-Exchange/disruptor
GettingStarted: https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
api: http://lmax-exchange.github.io/disruptor/docs/index.html
maven: https://mvnrepository.com/artifact/com.lmax/disruptor
对比ConcurrentLinkedQueue : 链表实现
JDK中没有ConcurrentArrayQueue
Disruptor是数组实现的
无锁,高并发,使用环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率
实现了基于事件的生产者消费者模式(观察者模式)
环形队列
RingBuffer的序号,指向下一个可用的元素
采用数组实现,没有首尾指针
对比ConcurrentLinkedQueue,用数组实现的速度更快
假如长度为8,当添加到第12个元素的时候在哪个序号上呢?用12%8决定
当Buffer被填满的时候到底是覆盖还是等待,由Producer决定
长度设为2的n次幂,利于二进制计算,例如:12%8 = 12 & (8 - 1) pos = num & (size -1)
-
定义Event - 队列中需要处理的元素
-
定义Event工厂,用于填充队列
这里牵扯到效率问题:disruptor初始化的时候,会调用Event工厂,对ringBuffer进行内存的提前分配
GC产频率会降低
-
定义EventHandler(消费者),处理容器中的元素
long sequence = ringBuffer.next(); // Grab the next sequence
try {
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(8888L); // Fill with data
} finally {
ringBuffer.publish(sequence);
}
//===============================================================
EventTranslator<LongEvent> translator1 = new EventTranslator<LongEvent>() {
@Override
public void translateTo(LongEvent event, long sequence) {
event.set(8888L);
}
};
ringBuffer.publishEvent(translator1);
//===============================================================
EventTranslatorOneArg<LongEvent, Long> translator2 = new EventTranslatorOneArg<LongEvent, Long>() {
@Override
public void translateTo(LongEvent event, long sequence, Long l) {
event.set(l);
}
};
ringBuffer.publishEvent(translator2, 7777L);
//===============================================================
EventTranslatorTwoArg<LongEvent, Long, Long> translator3 = new EventTranslatorTwoArg<LongEvent, Long, Long>() {
@Override
public void translateTo(LongEvent event, long sequence, Long l1, Long l2) {
event.set(l1 + l2);
}
};
ringBuffer.publishEvent(translator3, 10000L, 10000L);
//===============================================================
EventTranslatorThreeArg<LongEvent, Long, Long, Long> translator4 = new EventTranslatorThreeArg<LongEvent, Long, Long, Long>() {
@Override
public void translateTo(LongEvent event, long sequence, Long l1, Long l2, Long l3) {
event.set(l1 + l2 + l3);
}
};
ringBuffer.publishEvent(translator4, 10000L, 10000L, 1000L);
//===============================================================
EventTranslatorVararg<LongEvent> translator5 = new EventTranslatorVararg<LongEvent>() {
@Override
public void translateTo(LongEvent event, long sequence, Object... objects) {
long result = 0;
for(Object o : objects) {
long l = (Long)o;
result += l;
}
event.set(result);
}
};
ringBuffer.publishEvent(translator5, 10000L, 10000L, 10000L, 10000L);
package com.mashibing.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
public class Main03
{
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent((event, sequence) -> event.set(10000L));
System.in.read();
}
}
ProducerType有两种模式 Producer.MULTI和Producer.SINGLE
默认是MULTI,表示在多线程模式下产生sequence
如果确认是单线程生产者,那么可以指定SINGLE,效率会提升
如果是多个生产者(多线程),但模式指定为SINGLE,会出什么问题呢?
-
(常用)BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。
-
BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
-
LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.
-
LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。
-
PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略
-
TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常
-
(常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu
-
(常用)SleepingWaitStrategy : sleep
默认:disruptor.setDefaultExceptionHandler()
覆盖:disruptor.handleExceptionFor().with()