产生并发安全问题的原因
在计算机架构中,一直有一个核心矛盾,就是CPU、内存、I/O设备这三者的读写速度差异很大,为了平衡读写速度:
- 给CPU增加了缓存,用来平衡CPU和内存的速度差异,会导致数据可见性问题。
- 又增加了进程和线程,来复用CPU缓存,用来平衡CPU和I/O设备的速度差异,线程切换会带来原子性问题。
- 编译程序也会优化指令执行顺序,让程序更加合理的运用缓存,会带来有序性问题。
⭐CPU缓存导致的可见性问题:
在单核时代,所有线程都运行在同一个CPU上,所有线程操作的都是同一个CPU缓存,一个线程对缓存的读写,对另外一个线程是可见的,所以不会有可见性问题。
在多核时代,每个CPU都有自己的缓存,线程运行在不同的CPU上,这些线程操作的不是同一个CPU缓存,一个线程对变量的写操作对另一个线程就不具备可见性了。
比如有两个线程对同一个共享变量做+1操作,每个线程循环100次,最终得到的值是100~200之间的随机数,而不是200。
Java中可以通过volatile关键字禁用CPU缓存,解决可见性问题。
也就是说,对于volatile字段的写操作其它线程是可以看到的。
volatile关键字解决不了
i++
操作,因为还会有原子性问题。
⭐线程切换导致的原子性问题:
我们写代码时,一行表达式通常会被编译成多条指令,比如i++
就需要3条指令才能完成:
- 把
i
从内存加载到CPU寄存器。 - 在寄存器中执行+1操作。
- 最后把结果写回内存。
假设有两个线程对共享变量i
做+1
操作:
- 线程A把
i=0
加载到寄存器。 - 然后切换线程,线程B执行完
+1
操作并写回内存。 - 线程A会以寄存器中的值为准,
+1
后并写回内存,最终的结果就是1。
原子性是指:多个操作的执行过程,不被中断的特性被称为原子性。
在Java中可以通过Synchronized解决原子性问题。
⭐编译优化带来的有序性问题:
有一个经典的案例就是,单例模式的双重检测机制。之所以要做双重检测,是因为new
关键字经过编译优化后,指令序列可能会发生变化。
new关键字包含三个指令:
- 分配一块内存
- 在内存上初始化对象
- 把内存地址赋值给变量
优化之后的指令序列可能是:
- 分配一块内存
- 把内存地址赋值给变量
- 在内存上初始化对象
假设有两个线程同时获取这个单例对象:
- 线程A发现对象为空,就开始加锁并创建对象,线程A执行
new
操作的时候,先把内存地址赋值给了成员变量,但是内存中的对象还没有初始化。- 然后线程切换,线程B发现对象不为空,就直接返回当前未被初始化的对象,如果调用这个对象的方法,就会出现空指针异常。
对于volatile字段,Java编译器会在该字段的读写操作前后插入内存屏障,来禁止指令重排序。
⭐线程池
线程池可以重复利用已创建的线程,降低线程创建和销毁造成的消耗。
当收到请求时,不需要等待线程创建就能立即执行。
线程池的逻辑是这样的:
- 线程池内部会维护一个任务队列。
- 在收到一个任务后,首先会判断当前线程池中的任务数量是否小于核心线程的数量,如果小于的话,创建一个线程,开始执行任务。
- 如果任务数量大于或者等于核心线程数,就会把新的任务放到等待队列中。
- 如果等待队列满了,就会开始创建新的线程,如果创建的线程数量达到最大线程数的配置之后,就会触发拒绝策略(默认是抛异常)。
- 如果创建线程失败,也会开始执行拒绝策略。
实际上,优先把任务放到队列中,比较适合CPU密集型的任务,因为CPU密集型的任务执行时,CPU占用率会比较高,只需要创建和CPU核心数差不多的线程数量就好,线程多了反而会造成上下文切换,降低执行效率。
但是我们平时开发的Web项目,通常都属于IO密集型,所以Tomcat线程池就是当线程数超过核心线程数之后,会优先创建线程,直到超过最大线程数,才会把请求放到等待队列。
这样是更适合IO密集型场景的。
⭐核心参数
corePoolSize:核心线程数,线程池中最少的线程数量。
maximumPoolSize:表示线程池最大线程数,如果线程池的核心线程都在运行中,并且等待队列也满了之后,就会开始创建额外的线程,额外线程的数量最多不会超过这个参数指定的数量。
workQueue:等待队列,如果线程池收到的任务大于核心线程的数量,额外的任务就会放到这个等待队列中。
keepAliveTime:如果一个线程在这个参数指定的时间内都没有收到新的任务,并且线程池中的线程数量大于核心线程数,这些多余的线程就会被回收掉。
unit:keepAliveTime 参数的时间单位。
threadFactory:ThreadFactory是一个接口,可以重写
newThread()
方法自定义创建线程的逻辑。handler:如果线程池中核心线程都在运行的时候,会把新的任务放到等待队列中,如果等待队列满了之后,就会开始创建新的线程,如果创建的线程数量达到最大线程数的配置之后,就会触发拒绝策略了。
线程池提供了4种拒绝策略:
- ThreadPoolExecutor.AbortPolicy:抛出异常来拒绝处理新任务,这是默认的拒绝策略。
- ThreadPoolExecutor.CallerRunsPolicy:让提交任务的线程来执行这个任务。
- ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
- ThreadPoolExecutor.DiscardOldestPolicy:把最早进入等待队列的任务丢弃掉。
- 也可以通过实现
RejectedExecutionHandler
接口,重写rejectedExecution
方法来自定义拒绝策略。
⭐线程池大小如何定义
线程池的大小,要看具体的应用场景。
CPU密集型:
如果大部分功能都是纯CPU计算,那就是CPU密集型程序。比如对内存中的数据做大量运算。
对于CPU密集型应用,一个4核的CPU,每个核心对应一个线程,理论上创建4个线程就可以了,创建再多的线程只会增加线程切换的成本。所以线程的数量 = CPU核心数就是合适的。
不过线程有些时候可能会因为一些原因进入阻塞状态,所以可以多设置一个线程,保证CPU的利用率。线程数量可以设置为CPU核心数+1。
I/O密集型:
CPU计算和I/O操作交叉执行,但是因为I/O操作耗时比较长,所以这种场景一般称为I/O密集型计算。
(网络、磁盘、数据库都属于I/O操作)
一个单核计算机,假设我们需要从表1中查询数据,计算完成后,再写入到表2中。假设有A、B、C三个线程:
- 线程A查询数据(I/O操作),线程B和线程C阻塞;(I/O设备利用率100%,CPU利用率0%)
- 线程A获取到数据后,在内存中计算(CPU操作),线程B开始查询数据(I/O操作)。(I/O设备利用率100%,CPU利用率100%)
- 线程A计算完成后,继续执行最后的写表操作(I/O操作),线程B获取到数据后开始计算(CPU操作),线程C查询数据(I/O操作)。(I/O设备利用率100%,CPU利用率100%)
- 线程数量可以设置为3.
**对于多核CPU最佳线程数 = CPU核心数 * [1 + (I/O耗时 / CPU耗时)]**。
对于4核8线程这种超线程技术,是指一个CPU物理核心包含两个逻辑核心,这两个逻辑核心也还是会存在竞争,所以这两个逻辑核心并不能完全并行运行。所以不管是4还是8都不一定是最合适的,还需要根据具体的测试结果进行调整。
⭐如何获取执行结果
ThreadPoolExecutor提供了3个submit()方法配合FutureTask工具类来支持获得线程池的返回值。
这三个submit()方法的返回值都是Future接口,Future接口提供了2个get()方法用来获取返回值(一个包含超时机制,一个没有超时机制)。
这两个get()方法都是阻塞的,如果任务没有执行完,会导致调用方被阻塞,直到任务完成或者超时才会被唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 // 创建线程池
ExecutorService executor = ThreadPoolExecutor.newFixedThreadPool(1);
public static void main(String[] args) {
// 创建结果引用
Result result = new Result();
result.setData("zhangsan");
// 提交任务
Future<Result> future = executor.submit(new Task(result), result);
// 获取线程池返回值
Result res = future.get();
res.getData(); // lisi
}
// 自定义线程类
public static class Task implements Runnable {
private final Result result;
public Task(Result result) {
this.result = result;
}
public void run() {
result.setData("lisi");
}
}
线程的生命周期
Java中的线程一共有5种状态,分别是:
初始化状态:指的是线程已经被创建了,但是还没有被分配CPU执行,是在编程语言层面被创建,在操作系统层面还没被创建。
可运行状态:指的是线程在操作系统层面线程已经被创建出来了,可以分配给CPU执行。
运行状态:当有空闲的CPU时,操作系统就会把CPU分配给可运行状态的线程,被分配的线程就会变成运行状态。
阻塞状态:运行状态中的线程,如果调用了一个阻塞API,线程就会变为阻塞状态,并且让出CPU的使用权。
终止状态:线程执行完run()方法,会自动进入终止状态。也可以手动调用
stop()
或者interrupt()
方法终止一个线程。stop()
和interrupt()
方法的区别:stop()
方法会立刻杀死线程,线程持有的锁也会不释放,那其它线程就再也没有机会获得这个锁了。- 类似的方法还有
suspend()
和resume()
。
- 类似的方法还有
interrupt()
只会通知线程,线程还可以继续执行后续的操作。
阻塞线程的方式
- **sleep()**:
- sleep方法可以指定一个以毫秒为单位的时间,线程在这个时间内会进入阻塞状态。
- 不会释放锁,会让出CPU时间片,等待再次调度。
- **yield()**:
- yield方法会让出cpu时间片,但是线程还是会处于可执行的状态,当下次获得时间片之后还是会正常执行。
- 不会释放锁。
- **wait()**:
- wait方法会使线程进入阻塞状态,wait方法可以指定一个以毫秒为单位的时间,时间到了之后会自动唤醒。如果不指定时间则需要通过notify()或者notifyAll()方法来唤醒,否则会一直阻塞。
- 会释放锁,会让出CPU时间片,等待再次调度。
- **join()**:
- 当线程A调用线程B的join方法,线程A就会进入阻塞状态,直到线程B运行结束,线程A才会进入可执行状态。
- 底层还是调用wait方法,会释放锁
- **notify()和notifyAll()**:
- notify()会随机唤醒一个等待中的线程,notifyAll()会唤醒所有等待中的线程。但是notify()有可能导致一些线程无法直接被唤醒。
- 假设有A、B两个资源,线程1申请到了A、线程2申请到了B、线程3再次申请A会进入等待队列,线程4申请B会进入等待队列。这个时候等待队列中有3、4两个线程。线程1归还资源后,如果用notify()来通知等待队列中的线程,有可能被通知的线程是4,但是线程4申请的是资源B,所以还会继续等待。
死锁
产生死锁的4个条件:
- 互斥:共享资源只能被一个线程占用。
- 占有且等待:线程A占有资源A,在等待资源B的时候,不释放资源A。
- 不可抢占:其它线程不能抢占线程A已经占有的资源。
- 循环等待:线程A等待线程B占有的资源,线程B等待线程A占有的资源。
预防死锁:
对资源进行加锁时,可以先对资源排序,然后按照顺序加锁。这样就不会出现线程1占用资源A,线程2占用资源B,双方都等待对方释放资源了。
1 | public class Account { |
等待-通知优化循环等待:
可以通过synchronized关键字配合wait()、notify()、notifyAll()这三个方法实现等待通知机制。
就是在synchronized代码块中,判断条件如果不满足,就调用wait()方法让当前线程进入等待状态。其他线程执行完毕之后,调用notifyAll()方法唤醒等待队列中的线程。
1 | public class Allocator { |
⭐锁定义
⭐乐观锁 / 悲观锁
乐观锁:乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断数据是否发生变更。如果数据已经被其他线程更新,则报错或者重试。乐观锁适合读多写少操作。
悲观锁:悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。
悲观锁适合写操作多的场景。
synchronized关键字和Lock接口的实现类都是悲观锁。
⭐偏向锁
偏向锁认为,同一把锁只会被一个线程多次获得,不存在多线程竞争。
在JVM中,当一个线程获取偏向锁时,会在对象头中的标记字段中存储当前线程的ID,在加锁和解锁时,只需要判断标记字段中是否保存着当前线程的ID就可以了。
⭐轻量级锁
当偏向锁被多个线程访问时,就会升级为轻量级锁。
加锁时,JVM会在当前线程的栈帧分配一块空间,用来保存锁记录(Lock Record),然后把锁对象的标记字段拷贝到锁记录中。
- 如果更新成功,就说明加锁成功。
- 如果更新失败,就说明有其它线程获取当前锁,Java虚拟机就会把这把锁升级为重量级锁,并阻塞当前线程。
解锁时,Java虚拟机会通过CAS操作,比较锁对象的标记字段是否是锁记录的地址。
⭐重量级锁
重量级锁会阻塞所有加锁失败的线程,解锁时再唤醒这些线程。
⭐自旋锁 / 适应性自旋锁
自旋锁:因为阻塞或者唤醒一个Java线程,需要操作系统切换CPU状态来完成,如果代码中的逻辑比较简单,状态转换消耗的时间有可能比代码执行的时间还要长。
所以为了减少减少线程切换带来的消耗,可以让没有获取到锁的线程去执行一个无意义的循环。如果循环结束后锁已经被释放了,当前线程就可以避免切换线程的开销。
自旋锁也是有缺点的,自旋虽然可以避免切换线程的开销,但是会占用处理器时间。如果锁被占用的时间很短,自旋锁的效果就很好,否则自旋只会白白浪费处理器资源。所以自旋一定要有次数限制(默认是10次,可以通过-XX:PreBlockSpin
修改)。
适应性自旋锁:
JDK1.6引入了适应性自旋锁,适应性自旋表示自旋的次数不固定,而是根据以往能否通过自旋获取到锁来决定。
如果通过自旋获取到锁的概率非常高,Java虚拟机就会允许自旋更长的时间。
如果某个锁只有很少的次数能通过自旋获取到,Java虚拟机就会减少自旋时间或者跳过自旋,直接阻塞线程。
⭐可重入锁 / 不可重入锁
如果一个类中有多个synchronized方法,这些方法之间相互调用,就会对同一把锁重复加锁。
如果是可重入锁,就可以正常加锁。
如果是不可重入锁,就会产生死锁。
ReentrantLock 和 synchronized 都是可重入锁。
⭐公平锁 / 非公平锁
公平锁:
公平锁是指多个线程直接在队列中排队,队列中第一个线程才能获得锁。
公平锁的优点是每个线程都能获取到锁,不会饿死。
缺点是效率比非公平锁低,队列中除了第一个线程都会阻塞,CPU唤醒线程的开销会很大。
ReentrantLock在构造函数中提供了是否公平锁的初始化方式,默认是非公平锁。传入true就是公平锁。
非公平锁:
非公平锁是指多个线程加锁时,直接尝试获取锁,获取不到的才会进入等待队列。如果某一个线程尝试获取锁时,锁刚好可用,这个线程可以直接获取到锁。
非公平锁的优点是可以减少唤醒线程的开销,因为线程有几率不阻塞直接获取到锁。
缺点是等待队列中的线程可能会饿死,或者要等很久才能获取到锁。
synchronized是非公平锁,ReentantLock默认也是非公平锁。
⭐独占锁 / 共享锁
独占锁:独占锁只能被一个线程持有。
synchronized和JUC中Lock接口的实现类都是互斥锁。
共享锁:共享锁可以被多个线程持有,共享锁与独占锁之间是互斥的。获得共享锁的线程只能读数据,不能修改数据。
ReentrantReadWriteLock
类中有两把锁:ReadLock
和WriteLock
,可以分别加读锁和写锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 public class Cache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁
private final Lock readLock = lock.readLock();
// 写锁
private final Lock writeLock = lock.writeLock();
// 如果缓存不存在,就加载数据到缓存中
public V get(K key) {
V value = null;
try {
// 加读锁
readLock.lock();
value = cache.get(key);
} finally {
readLock.unlock();
}
if (value != null) {
return value;
}
try {
// 加写锁
writeLock.lock();
// 查询数据库
value = fromDB(key);
cache.put(key, value);
} finally {
writeLock.unlock();
}
return value;
}
}
⭐CAS
CAS全称 Compare And Swap(比较并替换),是一种无锁算法。
单核 CPU 和多核 CPU 下都能够保证的原子性。
在多核CPU环境下:
- 通过总线锁,保证修改操作的互斥性
- 通过缓存一致性协议,保证CPU缓存中的值对其它核心可见
- 通过内存屏障,保证多线程下的有序性
CAS算法需要三个参数:
- 需要修改的原值
- 要比较的期望值
- 要写入的新值
当 “原值” 等于 “期望值” 时,通过原子操作用 “新值” 替换 “原值”,否则就重新执行CAS流程。
CAS虽然很高效,但是也有两个问题:
ABA问题:CAS会检查 “原值” 是否发生变化,如果原来是A,后来变成了B,然后又变成了A,那么CAS进行检查的时候会发现 “原值” 没有变化过,但实际上是有变化的。
解决思路就是给数据添加版本号,每次更新数据都把版本号加一。
JDK1.5提供了
AtomicStampedReference
类来解决ABA问题,具体的逻辑封装在compareAndSet()
方法中:1
2
3
4
5// 第一个参数是原值,第二个参数是版本号
AtomicStampedReference<String> asf = new AtomicStampedReference<String>("zhangsan", 1);
// 第一个参数是期望值,第二个参数是新值,第三个参数是期望版本号,第四个参数是新版本号
boolean b = asf.compareAndSet("zhangsan", "lisi", 1, 2);只能保证单个变量的原子操作,多个共享变量无法保证原子操作。
JDK1.5提供了
AtomicReference
类来保证对象之间的原子性,可以把多个变量放在一个对象里面进行原子操作。
⭐JUC常用原子类
基本类型:
AtomicInteger、AtomicLong、AtomicBoolean。
主要方法:
getAndIncrement()
:原子化的i++
。getAndDecrement()
:原子化的i--
。incrementAndGet()
:原子化的++i
。decrementAndGet()
:原子化的--i
。compareAndSet()
:CAS操作,返回是否成功。
引用类型:
AtomicReference、AtomicStampedReference、AtomicMarkableReference。可以实现引用类型对象属性的原子化更新。
主要方法:
compareAndSet()
:CAS操作,返回是否成功。
数组:
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。可以实现原子化更新数组中的每一个元素。
主要方法:
compareAndSet()
:CAS操作,返回是否成功。
累加器:
DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder。只能用来执行累加操作,相比原子化的基本类型,计算速度更快,但是不支持compareAndSet()
方法。
主要方法:
increment()
:累加。decrement()
:减少。
⭐AQS
AbstractQueuedSynchronizer(AQS):它是AQS体系的核心基类,实现了锁的基本结构,内部的数据结构是链表,它继承自AOS。
AbstractOwnableSynchronizer(AOS):用来记录锁当前的持有者。
ReentrantLock是锁的具体实现,它继承自Lock抽象类,用内部类Sync继承AQS,所以ReentrantLock具体的锁实现都是委托Sync完成的。
Sync有两个子类,分别是FairSync(公平锁)和NonfairSync(非公平锁)
获取锁:
调用tryLock()
方法获取锁时,会委托给Sync来处理。
首先会判断当前的锁状态,state如果等于0,表示当前锁没有被占用,公平锁和非公平锁会有不同的处理逻辑:
- 如果是公平锁:需要先判断等待队列中有没有其它线程在排队。
- 如果有,就返回false并进入等待队列。
- 如果没有,就返回true,然后通过CAS操作更新state,如果更新成功,说明成功获取锁,就设置锁的拥有者为当前线程。否则就进入等待队列。
- 如果是非公平锁:通过CAS操作更新state。
- 如果更新成功,说明成功获取锁,就设置锁的拥有者为当前线程。
- 如果更新失败,就直接进入等待队列。
如果state不等于0,说明锁已经被占用,会判断当前线程是不是锁的持有者:
- 如果是,就通过CAS更新state。
- 如果不是,就进入等待队列。
释放锁:
调用unlock()
方法释放锁时,会委托给Sync来处理。
释放锁主要包含这样几个步骤:
- 判断当前线程释放是锁的持有者,如果不是则抛异常。
- 判断state是否为0,如果为0,表示锁已释放,需要唤醒等待队列中的线程。
⭐ThreadLocal
比如MVC架构的Web应用,在Service层无法直接获取HttpServletRequest,需要通过Controller以参数的形式传递进来。可以用ThreadLocal解决这种过度传参的问题。
可以在拦截器中把HttpServletRequest放到ThreadLocal中,Service层就可以直接获取了。需要注意的是要在拦截器中请求结束的回调方法中清除ThreadLocal,不然可能会导致内存泄漏。
工作原理
Thread内部维护了一个ThreadLocalMap对象,ThreadLocalMap是ThreadLocal中的内部类,
ThreadLocal只是一个代理类,内部并不保存任何数据。
ThreadLocalMap内部用一个Entry数组保存数据,Entry对ThreadLocal是弱引用。
ThreadLocalMap保存数据的时候,会根据ThreadLocal对象的hash值定位数组的位置,所以也会有Hash冲突:
在遇到Hash冲突时,如果是同一个ThreadLocal则替换Entry中的value,否则就直接寻找下一个空闲的位置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void set(ThreadLocal<?> key, Object value) {
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len - 1);
for (ThreadLocal.ThreadLocalMap.Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get(); // 这里的get()方法是Reference类中的
if (k == key) { // 如果是同一个ThreadLocal,就直接替换Value
e.value = value;
return;
}
if (k == null) { // 如果目标位置为空,则插入数据,否则继续找下一个位置
replaceStaleEntry(key, value, i);
return;
}
}
}
ThreadLocalMap获取数据的时候,也是根据ThreadLocal对象的hash值获取数组中对应的数据,获取到数据之后还需要判断是否是同一个ThreadLocal,如果不是就继续找下一个。
1
2
3
4
5
6
7
8
9
10private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key) { // 判断是否是同一个ThreadLocal
return e;
} else {
// getEntryAfterMiss(): 会遍历ThreadLocalMap中所有Entry,直到找到ThreadLocal相同的元素
return getEntryAfterMiss(key, i, e);
}
}
之所以要用Entry数组保存数据,是因为一个Thread可以创建多个ThreadLocal,这些ThreadLocal保存的数据只会放在同一个ThreadLocalMap中。
内存泄漏
线程池中的线程可能会一直存活下去,这就意味着Thread持有的ThreadLocalMap不会被回收,然后ThreadLocalMap中的Entry对ThreadLocal是弱引用,所以ThreadLocal是可以被回收掉的。但是Entry对Value是强引用的,所以Value无法被回收也无法被访问到,就会导致内存泄漏。
所以在写代码时,使用完ThreadLocal一定要在finally代码块中调用remove()
方法。
⭐ReentrantLock
ReentrantLock继承自Lock接口,它是可重入锁。
可以通过构造函数设置为公平锁,默认是非公平锁。
lock()
:获取锁,如果锁被其它线程持有,则阻塞当前线程,直到获取到锁。tryLock()
:尝试获取锁,如果获取到锁则返回true,否则返回false。不会阻塞当前线程。
1
2
3
4
5
6
7
8
9
10
11 public class T {
Lock lock = new ReentrantLock();
public void foo(int value) {
try {
lock.lock();
value++;
} finally {
lock.unlock();
}
}
}
⭐ReentrantReadWriteLock
ReentrantReadWriteLock实现了ReadWriteLock接口,它是支持可重入的读写锁。
可以通过构造函数设置为公平锁,默认是非公平锁。
ReadWriteLock不支持锁升级:不释放读锁的前提下,无法再获取写锁。
ReadWriteLock支持锁降级:不释放写锁的前提下,可以再次获取读锁。
常用写法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 public class Cache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁
private final Lock readLock = lock.readLock();
// 写锁
private final Lock writeLock = lock.writeLock();
// 如果缓存不存在,就加载数据到缓存中
public V get(K key) {
V value = null;
try {
// 加读锁
readLock.lock();
value = cache.get(key);
} finally {
readLock.unlock();
}
if (value != null) {
return value;
}
try {
// 加写锁
writeLock.lock();
// 查询数据库
value = fromDB(key);
cache.put(key, value);
} finally {
writeLock.unlock();
}
return value;
}
}
⭐StampedLock
StampedLock是JDK1.8提供的,它的性能比ReadWriteLock(读写锁)更好。但是StampedLock不支持重入。
StampedLock支持三种锁模式:写锁、悲观读锁、乐观读。
- 它允许多个线程同时获取悲观读锁,只允许一个线程获取写锁,写锁和悲观读锁是互斥的。
- 当多个线程同时进行乐观读时,是允许一个线程获取写锁的,并不是所有的写操作都会阻塞。
StampedLock之所以比ReadWriteLock性能好,就是因为StampedLock支持乐观读,乐观读不会加锁的,所以性能上会好很多。
StampedLock进行写锁、读锁、乐观读的时候,都会返回一个long类型的stamp,解锁时需要传入stamp。
需要注意的是,StampedLock是是支持锁升级和降级的,可以通过tryConvertToReadLock()和tryConvertToWriteLock()方法,但是这两个会返回新的stamp,在解锁时一定要使用这个新的stamp。
读模板:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 private final StampedLock sl = new StampedLock();
// 乐观读
long stamp = sl.tryOptimisticRead();
// 判断乐观读期间是否发生写操作,如果有则返回则返回false
if (!sl.validate(stamp)) {
// 如果发生了写操作,则升级为悲观读锁
stamp = sl.readLock();
try {
// 开始读操作....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}写模板:
1
2
3
4
5
6
7
8
9 private final StampedLock sl = new StampedLock();
// 加写锁
long stamp = sl.writeLock();
try {
// 进行写操作
} finally {
// 释放写锁
sl.unlockWrite(stamp);
}
⭐Condition
Condition是条件变量,可以用来做异步转同步。
Condition中主要有三个方法:await()
、signal()
、signalAll()
,对应的是Object类的wait()
、notify()
、notifyAll()
。
await()
:进入等待状态并释放锁。signal()
:唤醒一个等待线程。signalAll()
:唤醒所有等待线程。
比如需要等待某一个异步接口返回结果之后才能执行后续的逻辑,就可以在调用接口之前执行await()
方法阻塞当前线程,获取到返回值之后执行signal()
方法唤醒当前线程。
异步转同步:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Object response;
public Object get(int timeout) {
long start = System.nanoTime();
try {
lock.lock();
while (response == null) {
// 等待结果返回
condition.await(timeout);
long cur = System.nanoTime();
if (response != null || cur - start > timeout) {
break;
}
}
} finally {
lock.unlock();
}
if (response == null) {
throw new TimeoutException();
}
return response;
}
// 发送异步请求
public void send() {
new Thread(() -> {
Response res = sendRequest();
doReceived(res);
}).start();
}
// 返回结果时调用该方法
private void doRecived(Response res) {
try {
lock.lock();
response = res;
// 获取到返回值后,唤醒线程
condition.signal();
} finally {
lock.unlock();
}
}
⭐Semaphore
Semaphore可以让多个线程同时访问某一个资源,可以用来实现简单的限流器。
acquire()
:Semaphore中的计数器+1,当计数器达到阈值,会阻塞当前线程,直到计数器减少。release()
:Semaphore计数器-1。
使用Semaphore时需要注意线程安全问题,因为Semaphore可以让多个线程进入临界区。
1
2
3
4
5
6
7
8
9
10
11
12
13
14 public class Counter {
private int value;
// 初始化信号量
private Semaphore semaphore = new Semaphore(5);
public void add() {
try {
semaphore.acquire();
value++;
} finally {
semaphore.release();
}
}
}
⭐CountDownLatch
CountDownLatch是一个线程同步工具,可以让父线程等待子线程执行的执行结果。
CountDownLatch可以理解为是一个计数器,在初始化时可以指定计数器的值,通常是子线程或者任务的数量,让主线程调用await()方法进入等待状态,子线程执行完毕就调用countDown()方法让计数器减一,当计数器为0时CountDownLatch会唤醒主线程。
CountDownLatch的计数器是不能循环利用的,计数器的值一旦减到0,再次调用await()方法会直接通过。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 CountDownLatch latch = new CountDownLatch(2);
P p = null;
D d = null;
new Thread(() -> {
p = getP();
latch.countDown();
}).start();
new Thread(() -> {
d = getD();
latch.countDown();
}).start();
latch.await();
Diff diff = check(p, d);
save(diff);
⭐CyclicBarrier
CyclicBarrier是一个线程同步工具,可以让一组线程之间相互等待。
在初始化时可以指定计数器的值,子线程执行完之后调用await()方法,进入等待状态并让计数器减一。计数器为0时会重置计数器的值,并让最后一个调用await()方法的线程触发回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 Vector<P> pos;
Vector<D> dos;
Executor executor = Executors.newFixedThreadPool(1);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
// 将回调函数提交到线程池中执行
executor.execute(() -> check())
});
public void check() {
P p = pos.remove(0);
D d = dos.remove(0);
Diff diff = check(p,d);
save(diff);
}
public void checkAll() {
new Thread(() -> {
while (存在未对账的数据) {
P p = getP();
pos.add(p);
cyclicBarrier.await();
}
}).start();
new Thread(() -> {
while (存在未对账的数据) {
D d = getD();
dos.add(d);
cyclicBarrier.await();
}
}).start();
}
⭐FutureTask
FutureTask实现了Runnable接口和Future接口,所以可以把FutureTask作为任务提交给线程池去执行,也可以用来获取任务的执行结果。
它有两个构造函数:
1 | public FutureTask(Callable<V> callable) {} |
- 第一个构造函数的参数是Callable接口,Callable接口中的call()方法是有返回值的,调用FutureTask的get()方法得到的就是call()方法的返回值。
- 第二个构造函数的参数是Runnable接口和结果引用,调用FutureTask的get()方法得到的就是结果引用。
⭐CompletableFuture
CompletableFuture继承了Future接口和CompletionStage接口。
跟传统的Future相比,CompletableFuture可以更方便的处理任务之间的关联关系。
默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认的线程数是CPU的核心数。
在开发时尽量使用自定义的线程池,因为如果所有的CompletableFuture都使用同一个线程池,如果有一些任务执行了一些很慢的I/O操作,就会导致线程池中所有的任务都阻塞,会影响整个系统的性能,所以尽量根据不同的业务类型创建不同的线程池。
主要方法:
描述串行关系:
thenApply(function)
:把前一个任务的执行结果,交给后面的Function
处理(有返回值)。thenAccept(consumer)
:把前一个任务的执行结果,交给后面的Consume
处理(没有返回值)。thenRun(runnable)
:忽略前一个任务的结果,执行额外的逻辑(没有返回值)。thenCompose(function)
:会把前一个CompletableFuture
的结果传递给内部的CompletableFuture
,最后返回一个新的CompletableFuture
对象。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(new Supplier<Integer>() {
public Integer get() {
int number = new Random().nextInt(30);
System.out.println("第一次运算:" + number);
return number;
}
}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
public CompletionStage<Integer> apply(Integer param) {
return CompletableFuture.supplyAsync(new Supplier<Integer>() {
public Integer get() {
int number = param * 2;
System.out.println("第二次运算:" + number);
return number;
}
});
}
});
描述AND聚合关系:
thenCombine(other, function)
:两个任务都要有返回值,可以对两个任务的结果做一些额外的处理(有返回值)。thenAcceptBoth(other, consumer)
:两个任务都要有返回值,可以对两个任务的结果做一些额外的处理(没有返回值)。runAfterBoth(other, runnable)
:不需要前面的任务有返回值,可以执行一些额外的逻辑(没有返回值)。
描述OR聚合关系:
applyToEither(other, function)
:两个任务都要有返回值,任意一个任务先结束,就会执行(有返回值)。acceptEither(other, consumer)
:两个任务都要有返回值,任意一个任务先结束,就会执行(没有返回值)。runAfterEither(other, runnable)
:不需要前面的任务有返回值,任意一个任务先结束,就会执行(没有返回值)。
异常处理:
exceptionally(function)
:前面的任务报错时就会执行,类似于catch{}代码块。whenComplete(consumer)
:无论是否发生异常都会执行,相当于finally{}代码块,可以配合exceptionally
使用(没有返回值)。handle(function)
:无论是否发生异常都会执行,相当于finally{}代码块(有返回值)。
方法名带有Async表示会异步执行function、consumer或者runnable。
CompletableFuture提供4个静态方法来创建对象:
1 | // 使用默认线程池 |
runAsync()方法接收的是Runnable接口,所以它是没有返回值的。
supplyAsync()方法接收的是Supplyer函数式接口,Supplyer的get()方法是有返回值的。
用例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println("T1:洗⽔壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开⽔...");
sleep(15, TimeUnit.SECONDS);
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "⻰井";
});
CompletableFuture<String> f3 = f1.thenCombine(f2, (_, tf) -> {
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
//等待任务3执⾏结果
System.out.println(f3.join());
⭐CompletionService
有些时候我们会执行多个异步任务,并且需要获取返回值,比较简单的做法是调用线程池的submit()方法,获取Future对象,再调用Future对象的get()方法获取返回值。但是调用Future对象的get()方法会导致当前线程阻塞,直到任务执行完毕。
如果任务1的执行时间比任务2的执行时间长,如果先调用任务1的Future对象的get()方法,就会导致任务2即使执行完毕后也不能及时处理,需要等任务1执行完毕后才能处理,CompletionService就可以解决这样的问题。
CompletionService内部维护了一个阻塞队列,当任务执行结束后,就把Future对象加入到队列中。队列是先进先出的,这样先执行完的任务就可以先加入到队列中,就可以先进行处理。
CompletionService是一个接口,实现类是ExecutorCompletionService,这个实现类的构造方法有两个:
1 | public ExecutorCompletionService(Executor executor){} |
创建ExecutorCompletionService需要传入一个线程池和BlockingQueue,如果不指定BlockingQueue默认使用的是无界的LinkedBlockingQueue。
主要方法:
Future<V> submit(Callable<V> task)
:Future<V> submit(Runnable task, V result)
:执行Future<V> take()
:从队列中获取并移除一个元素,就阻塞当前线程,直到有元素返回。Future<V> pool()
:从队列中获取并移除一个元素,如果队列是空的,会返回null。Future<V> pool(timeout, unit)
:从队列中获取并移除一个元素,如果队列是空的,会等待一定的时间,超时后返回null。
用例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 // 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute( () -> save(r));
}
⭐Fork/Join
Fork/Join是一种分治任务模型,主要分为两个阶段:
- 一个阶段是任务分解,把一个大任务分解成多个小任务。
- 另一个阶段是结果合并,合并所有小任务的执行结果,得到最终的结果。
Java提供了ForkJoinPool和ForkJoinTask两个类来支持分治任务模型。这两个类的关系类似于线程池和线程的关系。
⭐ForkJoinPool
ForkJoinPool是内部有多个任务队列,当我们通过invoke()
或者submit()
方法提交任务时,ForkJoinPool会把任务提交到一个任务队列中。如果任务在执行过程中还会创建子线程,那么子线程会被提交到父线程所在的任务队列中。
如果某一个队列空了,还会从其它队列中 “窃取” 任务。
任务队列采用的是双端队列,正常执行任务和 “窃取任务” 会从不同的端口消费。
⭐ForkJoinTask
ForkJoinTask是一个抽象类,最核心的方法是fork()
和join()
,其中fork()
方法会异步执行一个子任务(执行compute()
方法),join()
方法会阻塞当前线程等待子任务的执行结果。
ForkJoinTask有两个子类,RecursiveAction和RecursiveTask,它们都是用递归的方式来处理分治任务的。这两个子类也是抽象类,内部定义了compute()
抽象方法,需要自己定义子类去拓展。
RecursiveAction没有返回值。
RecursiveTask有返回值。
调用
fork()
方法时,会调用我们实现的compute()
方法。
单词统计:
1
2
3
4
5
6
7 public static void main(String[] args) {
String[] words = {"hello world", "hello me", "hello"};
// 创建ForkJoinPool
ForkJoinPool taskPool = new ForkJoinPool(10);
Map<String, Integer> result = taskPool.invoke(new WordCount(words, 0, words.length));
System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 public class WordCount extends RecursiveTask<Map<String, Integer>> {
private final String[] words;
private final int start, end;
public WordCount(String[] words, int start, int end) {
this.words = words;
this.start = start;
this.end = end;
}
protected Map<String, Integer> compute() {
if (end - start == 1) {
return count(words[start]);
}
int mid = (start + end) / 2;
WordCount wc1 = new WordCount(words, start, mid);
wc1.fork();
WordCount wc2 = new WordCount(words, mid, end);
return merge(wc1.join(), wc2.compute());
}
private Map<String, Integer> merge(Map<String, Integer> r1, Map<String, Integer> r2) {
r1.forEach((key, value) -> {
r2.merge(key, value, Integer::sum);
});
return r2;
}
private Map<String, Integer> count(String line) {
String[] words = line.split("\\s");
Map<String, Integer> result = new HashMap<>();
for (String word : words) {
result.merge(word, 1, Integer::sum);
}
return result;
}
}