0%

Java并发

产生并发安全问题的原因

在计算机架构中,一直有一个核心矛盾,就是CPU、内存、I/O设备这三者的读写速度差异很大,为了平衡读写速度:

  1. 给CPU增加了缓存,用来平衡CPU和内存的速度差异,会导致数据可见性问题
  2. 又增加了进程和线程,来复用CPU缓存,用来平衡CPU和I/O设备的速度差异,线程切换会带来原子性问题
  3. 编译程序也会优化指令执行顺序,让程序更加合理的运用缓存,会带来有序性问题

CPU缓存导致的可见性问题

在单核时代,所有线程都运行在同一个CPU上,所有线程操作的都是同一个CPU缓存,一个线程对缓存的读写,对另外一个线程是可见的,所以不会有可见性问题。

在多核时代,每个CPU都有自己的缓存,线程运行在不同的CPU上,这些线程操作的不是同一个CPU缓存,一个线程对变量的写操作对另一个线程就不具备可见性了。

比如有两个线程对同一个共享变量做+1操作,每个线程循环100次,最终得到的值是100~200之间的随机数,而不是200。

Java中可以通过volatile关键字禁用CPU缓存,解决可见性问题

也就是说,对于volatile字段的写操作其它线程是可以看到的。

volatile关键字解决不了i++操作,因为还会有原子性问题。

线程切换导致的原子性问题

我们写代码时,一行表达式通常会被编译成多条指令,比如i++就需要3条指令才能完成:

  1. i从内存加载到CPU寄存器。
  2. 在寄存器中执行+1操作。
  3. 最后把结果写回内存。

假设有两个线程对共享变量i+1操作:

  1. 线程A把i=0加载到寄存器。
  2. 然后切换线程,线程B执行完+1操作并写回内存。
  3. 线程A会以寄存器中的值为准,+1后并写回内存,最终的结果就是1。

原子性是指:多个操作的执行过程,不被中断的特性被称为原子性

在Java中可以通过Synchronized解决原子性问题

编译优化带来的有序性问题

有一个经典的案例就是,单例模式的双重检测机制。之所以要做双重检测,是因为new关键字经过编译优化后,指令序列可能会发生变化。

new关键字包含三个指令:

  1. 分配一块内存
  2. 在内存上初始化对象
  3. 把内存地址赋值给变量

优化之后的指令序列可能是:

  1. 分配一块内存
  2. 把内存地址赋值给变量
  3. 在内存上初始化对象

假设有两个线程同时获取这个单例对象:

  1. 线程A发现对象为空,就开始加锁并创建对象,线程A执行new操作的时候,先把内存地址赋值给了成员变量,但是内存中的对象还没有初始化。
  2. 然后线程切换,线程B发现对象不为空,就直接返回当前未被初始化的对象,如果调用这个对象的方法,就会出现空指针异常。

对于volatile字段,Java编译器会在该字段的读写操作前后插入内存屏障,来禁止指令重排序

⭐线程池

线程池可以重复利用已创建的线程,降低线程创建和销毁造成的消耗。

当收到请求时,不需要等待线程创建就能立即执行。

线程池的逻辑是这样的:

  1. 线程池内部会维护一个任务队列。
  2. 在收到一个任务后,首先会判断当前线程池中的任务数量是否小于核心线程的数量,如果小于的话,创建一个线程,开始执行任务。
  3. 如果任务数量大于或者等于核心线程数,就会把新的任务放到等待队列中。
  4. 如果等待队列满了,就会开始创建新的线程,如果创建的线程数量达到最大线程数的配置之后,就会触发拒绝策略(默认是抛异常)。
  5. 如果创建线程失败,也会开始执行拒绝策略。

实际上,优先把任务放到队列中,比较适合CPU密集型的任务,因为CPU密集型的任务执行时,CPU占用率会比较高,只需要创建和CPU核心数差不多的线程数量就好,线程多了反而会造成上下文切换,降低执行效率。

但是我们平时开发的Web项目,通常都属于IO密集型,所以Tomcat线程池就是当线程数超过核心线程数之后,会优先创建线程,直到超过最大线程数,才会把请求放到等待队列。

这样是更适合IO密集型场景的。

⭐核心参数

  • corePoolSize:核心线程数,线程池中最少的线程数量。

  • maximumPoolSize:表示线程池最大线程数,如果线程池的核心线程都在运行中,并且等待队列也满了之后,就会开始创建额外的线程,额外线程的数量最多不会超过这个参数指定的数量。

  • workQueue:等待队列,如果线程池收到的任务大于核心线程的数量,额外的任务就会放到这个等待队列中。

  • keepAliveTime:如果一个线程在这个参数指定的时间内都没有收到新的任务,并且线程池中的线程数量大于核心线程数,这些多余的线程就会被回收掉。

  • unit:keepAliveTime 参数的时间单位。

  • threadFactory:ThreadFactory是一个接口,可以重写newThread()方法自定义创建线程的逻辑。

  • handler:如果线程池中核心线程都在运行的时候,会把新的任务放到等待队列中,如果等待队列满了之后,就会开始创建新的线程,如果创建的线程数量达到最大线程数的配置之后,就会触发拒绝策略了。

    线程池提供了4种拒绝策略

    • ThreadPoolExecutor.AbortPolicy:抛出异常来拒绝处理新任务,这是默认的拒绝策略。
    • ThreadPoolExecutor.CallerRunsPolicy:让提交任务的线程来执行这个任务。
    • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
    • ThreadPoolExecutor.DiscardOldestPolicy:把最早进入等待队列的任务丢弃掉。
    • 也可以通过实现RejectedExecutionHandler接口,重写rejectedExecution方法来自定义拒绝策略。

⭐线程池大小如何定义

线程池的大小,要看具体的应用场景。

CPU密集型:

如果大部分功能都是纯CPU计算,那就是CPU密集型程序。比如对内存中的数据做大量运算。

对于CPU密集型应用,一个4核的CPU,每个核心对应一个线程,理论上创建4个线程就可以了,创建再多的线程只会增加线程切换的成本。所以线程的数量 = CPU核心数就是合适的。

不过线程有些时候可能会因为一些原因进入阻塞状态,所以可以多设置一个线程,保证CPU的利用率。线程数量可以设置为CPU核心数+1

I/O密集型:

CPU计算和I/O操作交叉执行,但是因为I/O操作耗时比较长,所以这种场景一般称为I/O密集型计算。

(网络、磁盘、数据库都属于I/O操作)

一个单核计算机,假设我们需要从表1中查询数据,计算完成后,再写入到表2中。假设有A、B、C三个线程:

  1. 线程A查询数据(I/O操作),线程B和线程C阻塞;(I/O设备利用率100%,CPU利用率0%)
  2. 线程A获取到数据后,在内存中计算(CPU操作),线程B开始查询数据(I/O操作)。(I/O设备利用率100%,CPU利用率100%)
  3. 线程A计算完成后,继续执行最后的写表操作(I/O操作),线程B获取到数据后开始计算(CPU操作),线程C查询数据(I/O操作)。(I/O设备利用率100%,CPU利用率100%)
  4. 线程数量可以设置为3.

**对于多核CPU最佳线程数 = CPU核心数 * [1 + (I/O耗时 / CPU耗时)]**。

对于4核8线程这种超线程技术,是指一个CPU物理核心包含两个逻辑核心,这两个逻辑核心也还是会存在竞争,所以这两个逻辑核心并不能完全并行运行。所以不管是4还是8都不一定是最合适的,还需要根据具体的测试结果进行调整。

⭐如何获取执行结果

ThreadPoolExecutor提供了3个submit()方法配合FutureTask工具类来支持获得线程池的返回值。

这三个submit()方法的返回值都是Future接口,Future接口提供了2个get()方法用来获取返回值(一个包含超时机制,一个没有超时机制)。

这两个get()方法都是阻塞的,如果任务没有执行完,会导致调用方被阻塞,直到任务完成或者超时才会被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建线程池
ExecutorService executor = ThreadPoolExecutor.newFixedThreadPool(1);

public static void main(String[] args) {
// 创建结果引用
Result result = new Result();
result.setData("zhangsan");

// 提交任务
Future<Result> future = executor.submit(new Task(result), result);
// 获取线程池返回值
Result res = future.get();
res.getData(); // lisi
}

// 自定义线程类
public static class Task implements Runnable {
private final Result result;

public Task(Result result) {
this.result = result;
}

@Override
public void run() {
result.setData("lisi");
}
}

线程的生命周期

Java中的线程一共有5种状态,分别是:

  1. 初始化状态:指的是线程已经被创建了,但是还没有被分配CPU执行,是在编程语言层面被创建,在操作系统层面还没被创建。

  2. 可运行状态:指的是线程在操作系统层面线程已经被创建出来了,可以分配给CPU执行。

  3. 运行状态:当有空闲的CPU时,操作系统就会把CPU分配给可运行状态的线程,被分配的线程就会变成运行状态。

  4. 阻塞状态:运行状态中的线程,如果调用了一个阻塞API,线程就会变为阻塞状态,并且让出CPU的使用权。

  5. 终止状态:线程执行完run()方法,会自动进入终止状态。也可以手动调用stop() 或者 interrupt() 方法终止一个线程。

    stop()interrupt() 方法的区别:

    • stop()方法会立刻杀死线程,线程持有的锁也会不释放,那其它线程就再也没有机会获得这个锁了。

      • 类似的方法还有suspend()resume()
    • interrupt()只会通知线程,线程还可以继续执行后续的操作。

阻塞线程的方式

  • **sleep()**:
    • sleep方法可以指定一个以毫秒为单位的时间,线程在这个时间内会进入阻塞状态。
    • 不会释放锁,会让出CPU时间片,等待再次调度。
  • **yield()**:
    • yield方法会让出cpu时间片,但是线程还是会处于可执行的状态,当下次获得时间片之后还是会正常执行。
    • 不会释放锁
  • **wait()**:
    • wait方法会使线程进入阻塞状态,wait方法可以指定一个以毫秒为单位的时间,时间到了之后会自动唤醒。如果不指定时间则需要通过notify()或者notifyAll()方法来唤醒,否则会一直阻塞。
    • 会释放锁,会让出CPU时间片,等待再次调度。
  • **join()**:
    • 当线程A调用线程B的join方法,线程A就会进入阻塞状态,直到线程B运行结束,线程A才会进入可执行状态。
    • 底层还是调用wait方法,会释放锁
  • **notify()和notifyAll()**:
    • notify()会随机唤醒一个等待中的线程,notifyAll()会唤醒所有等待中的线程。但是notify()有可能导致一些线程无法直接被唤醒。
    • 假设有A、B两个资源,线程1申请到了A、线程2申请到了B、线程3再次申请A会进入等待队列,线程4申请B会进入等待队列。这个时候等待队列中有3、4两个线程。线程1归还资源后,如果用notify()来通知等待队列中的线程,有可能被通知的线程是4,但是线程4申请的是资源B,所以还会继续等待。

死锁

产生死锁的4个条件

  1. 互斥:共享资源只能被一个线程占用。
  2. 占有且等待:线程A占有资源A,在等待资源B的时候,不释放资源A。
  3. 不可抢占:其它线程不能抢占线程A已经占有的资源。
  4. 循环等待:线程A等待线程B占有的资源,线程B等待线程A占有的资源。

预防死锁

对资源进行加锁时,可以先对资源排序,然后按照顺序加锁。这样就不会出现线程1占用资源A,线程2占用资源B,双方都等待对方释放资源了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Account {
private int id;
private int balance;
// 转账
public void transfer(Account target, int money) {
Account left = this;
Account right = target;
// 根据id对账户进行排序
if (this.getId() > target.getId()) {
right = target;
left = this;
}

synchronized(left) {
synchronized(right) {
if (this.balance > money) {
this.balance -= money;
target.balance += money;
}
}
}
}
}

等待-通知优化循环等待

可以通过synchronized关键字配合wait()、notify()、notifyAll()这三个方法实现等待通知机制

就是在synchronized代码块中,判断条件如果不满足,就调用wait()方法让当前线程进入等待状态。其他线程执行完毕之后,调用notifyAll()方法唤醒等待队列中的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Allocator {
private List<Object> als = new ArrayList<>();
// 一次性申请所有资源
public synchronized void apply(Object from, Object to){
// 经典写法
while(als.contains(from) || als.contains(to)){
try {
wait();
} catch (Exception e){

}
}
als.add(from);
als.add(to);
}
// 归还资源
public synchronized void free(Object from, Object to){
als.remove(from);
als.remove(to);
notifyAll();
}
}

public static void main(String[] args) {
Allocator allocator = new Allocator();
try {
allocator.apply(o1, o2);
// do something...
} finally {
allocator.free(o1, o2);
}
}

⭐锁定义

乐观锁 / 悲观锁

乐观锁:乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断数据是否发生变更。如果数据已经被其他线程更新,则报错或者重试。乐观锁适合读多写少操作

乐观锁在Java中是通过使用无锁编程来实现,通常采用的是CAS算法,Java原子类中的递增操作就通过CAS实现的。

悲观锁:悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。

悲观锁适合写操作多的场景

synchronized关键字和Lock接口的实现类都是悲观锁。

偏向锁

偏向锁认为,同一把锁只会被一个线程多次获得,不存在多线程竞争。

在JVM中,当一个线程获取偏向锁时,会在对象头中的标记字段中存储当前线程的ID,在加锁和解锁时,只需要判断标记字段中是否保存着当前线程的ID就可以了。

轻量级锁

偏向锁被多个线程访问时,就会升级为轻量级锁。

加锁时,JVM会在当前线程的栈帧分配一块空间,用来保存锁记录(Lock Record),然后把锁对象的标记字段拷贝到锁记录中。

然后通过CAS操作,把标记字段替换为锁记录的内存地址:

  • 如果更新成功,就说明加锁成功。
  • 如果更新失败,就说明有其它线程获取当前锁,Java虚拟机就会把这把锁升级为重量级锁,并阻塞当前线程。

解锁时,Java虚拟机会通过CAS操作,比较锁对象的标记字段是否是锁记录的地址。

  • 如果是,就把标记字段替换为锁记录中的值,也就是锁对象原本的标记字段。这样就成功释放锁了。
  • 如果不是,说明锁已经升级为重量级锁了。

重量级锁

重量级锁会阻塞所有加锁失败的线程,解锁时再唤醒这些线程。

自旋锁 / 适应性自旋锁

自旋锁:因为阻塞或者唤醒一个Java线程,需要操作系统切换CPU状态来完成,如果代码中的逻辑比较简单,状态转换消耗的时间有可能比代码执行的时间还要长。

所以为了减少减少线程切换带来的消耗,可以让没有获取到锁的线程去执行一个无意义的循环。如果循环结束后锁已经被释放了,当前线程就可以避免切换线程的开销。

自旋锁也是有缺点的,自旋虽然可以避免切换线程的开销,但是会占用处理器时间。如果锁被占用的时间很短,自旋锁的效果就很好,否则自旋只会白白浪费处理器资源。所以自旋一定要有次数限制(默认是10次,可以通过-XX:PreBlockSpin修改)。

适应性自旋锁

JDK1.6引入了适应性自旋锁,适应性自旋表示自旋的次数不固定,而是根据以往能否通过自旋获取到锁来决定。

如果通过自旋获取到锁的概率非常高,Java虚拟机就会允许自旋更长的时间。

如果某个锁只有很少的次数能通过自旋获取到,Java虚拟机就会减少自旋时间或者跳过自旋,直接阻塞线程。

可重入锁 / 不可重入锁

如果一个类中有多个synchronized方法,这些方法之间相互调用,就会对同一把锁重复加锁。

如果是可重入锁,就可以正常加锁。

如果是不可重入锁,就会产生死锁。

ReentrantLock 和 synchronized 都是可重入锁。

公平锁 / 非公平锁

公平锁

公平锁是指多个线程直接在队列中排队,队列中第一个线程才能获得锁。

公平锁的优点是每个线程都能获取到锁,不会饿死。

缺点是效率比非公平锁低,队列中除了第一个线程都会阻塞,CPU唤醒线程的开销会很大。

ReentrantLock在构造函数中提供了是否公平锁的初始化方式,默认是非公平锁。传入true就是公平锁。

非公平锁

非公平锁是指多个线程加锁时,直接尝试获取锁,获取不到的才会进入等待队列。如果某一个线程尝试获取锁时,锁刚好可用,这个线程可以直接获取到锁。

非公平锁的优点是可以减少唤醒线程的开销,因为线程有几率不阻塞直接获取到锁。

缺点是等待队列中的线程可能会饿死,或者要等很久才能获取到锁。

synchronized是非公平锁,ReentantLock默认也是非公平锁。

独占锁 / 共享锁

独占锁:独占锁只能被一个线程持有。

synchronized和JUC中Lock接口的实现类都是互斥锁。

共享锁:共享锁可以被多个线程持有,共享锁与独占锁之间是互斥的。获得共享锁的线程只能读数据,不能修改数据。

ReentrantReadWriteLock类中有两把锁:ReadLockWriteLock,可以分别加读锁和写锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Cache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁
private final Lock readLock = lock.readLock();
// 写锁
private final Lock writeLock = lock.writeLock();

// 如果缓存不存在,就加载数据到缓存中
public V get(K key) {
V value = null;
try {
// 加读锁
readLock.lock();
value = cache.get(key);
} finally {
readLock.unlock();
}
if (value != null) {
return value;
}
try {
// 加写锁
writeLock.lock();
// 查询数据库
value = fromDB(key);
cache.put(key, value);
} finally {
writeLock.unlock();
}
return value;
}
}

CAS

CAS全称 Compare And Swap(比较并替换),是一种无锁算法。

单核 CPU 和多核 CPU 下都能够保证的原子性。

在多核CPU环境下:

  1. 通过总线锁,保证修改操作的互斥性
  2. 通过缓存一致性协议,保证CPU缓存中的值对其它核心可见
  3. 通过内存屏障,保证多线程下的有序性

CAS算法需要三个参数:

  • 需要修改的原值
  • 要比较的期望值
  • 要写入的新值

当 “原值” 等于 “期望值” 时,通过原子操作用 “新值” 替换 “原值”,否则就重新执行CAS流程。

CAS虽然很高效,但是也有两个问题:

  • ABA问题:CAS会检查 “原值” 是否发生变化,如果原来是A,后来变成了B,然后又变成了A,那么CAS进行检查的时候会发现 “原值” 没有变化过,但实际上是有变化的。

    解决思路就是给数据添加版本号,每次更新数据都把版本号加一。

    JDK1.5提供了AtomicStampedReference类来解决ABA问题,具体的逻辑封装在compareAndSet()方法中:

    1
    2
    3
    4
    5
    // 第一个参数是原值,第二个参数是版本号
    AtomicStampedReference<String> asf = new AtomicStampedReference<String>("zhangsan", 1);

    // 第一个参数是期望值,第二个参数是新值,第三个参数是期望版本号,第四个参数是新版本号
    boolean b = asf.compareAndSet("zhangsan", "lisi", 1, 2);
  • 只能保证单个变量的原子操作,多个共享变量无法保证原子操作。

    JDK1.5提供了AtomicReference类来保证对象之间的原子性,可以把多个变量放在一个对象里面进行原子操作。

⭐JUC常用原子类

基本类型

AtomicInteger、AtomicLong、AtomicBoolean。

主要方法:

  • getAndIncrement():原子化的i++
  • getAndDecrement():原子化的i--
  • incrementAndGet():原子化的++i
  • decrementAndGet():原子化的--i
  • compareAndSet():CAS操作,返回是否成功。

引用类型

AtomicReference、AtomicStampedReference、AtomicMarkableReference。可以实现引用类型对象属性的原子化更新。

主要方法:

  • compareAndSet():CAS操作,返回是否成功。

数组

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。可以实现原子化更新数组中的每一个元素。

主要方法:

  • compareAndSet():CAS操作,返回是否成功。

累加器

DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder。只能用来执行累加操作,相比原子化的基本类型,计算速度更快,但是不支持compareAndSet()方法。

主要方法:

  • increment():累加。
  • decrement():减少。

⭐AQS

AbstractQueuedSynchronizer(AQS):它是AQS体系的核心基类,实现了锁的基本结构,内部的数据结构是链表,它继承自AOS。

AbstractOwnableSynchronizer(AOS):用来记录锁当前的持有者。

ReentrantLock是锁的具体实现,它继承自Lock抽象类,用内部类Sync继承AQS,所以ReentrantLock具体的锁实现都是委托Sync完成的。

Sync有两个子类,分别是FairSync(公平锁)和NonfairSync(非公平锁)

获取锁

调用tryLock()方法获取锁时,会委托给Sync来处理。

首先会判断当前的锁状态,state如果等于0,表示当前锁没有被占用,公平锁和非公平锁会有不同的处理逻辑:

  • 如果是公平锁:需要先判断等待队列中有没有其它线程在排队。
    • 如果有,就返回false并进入等待队列。
    • 如果没有,就返回true,然后通过CAS操作更新state,如果更新成功,说明成功获取锁,就设置锁的拥有者为当前线程。否则就进入等待队列。
  • 如果是非公平锁:通过CAS操作更新state。
    • 如果更新成功,说明成功获取锁,就设置锁的拥有者为当前线程。
    • 如果更新失败,就直接进入等待队列。

如果state不等于0,说明锁已经被占用,会判断当前线程是不是锁的持有者:

  • 如果是,就通过CAS更新state。
  • 如果不是,就进入等待队列。

释放锁

调用unlock()方法释放锁时,会委托给Sync来处理。

释放锁主要包含这样几个步骤:

  1. 判断当前线程释放是锁的持有者,如果不是则抛异常。
  2. 判断state是否为0,如果为0,表示锁已释放,需要唤醒等待队列中的线程。

ThreadLocal

比如MVC架构的Web应用,在Service层无法直接获取HttpServletRequest,需要通过Controller以参数的形式传递进来。可以用ThreadLocal解决这种过度传参的问题。

可以在拦截器中把HttpServletRequest放到ThreadLocal中,Service层就可以直接获取了。需要注意的是要在拦截器中请求结束的回调方法中清除ThreadLocal,不然可能会导致内存泄漏

工作原理

Thread内部维护了一个ThreadLocalMap对象,ThreadLocalMap是ThreadLocal中的内部类,

ThreadLocal只是一个代理类,内部并不保存任何数据。

ThreadLocalMap内部用一个Entry数组保存数据,Entry对ThreadLocal是弱引用。

  • ThreadLocalMap保存数据的时候,会根据ThreadLocal对象的hash值定位数组的位置,所以也会有Hash冲突:

    • 在遇到Hash冲突时,如果是同一个ThreadLocal则替换Entry中的value,否则就直接寻找下一个空闲的位置。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      private void set(ThreadLocal<?> key, Object value) {
      ThreadLocal.ThreadLocalMap.Entry[] tab = table;
      int len = tab.length;
      int i = key.threadLocalHashCode & (len - 1);
      for (ThreadLocal.ThreadLocalMap.Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
      ThreadLocal<?> k = e.get(); // 这里的get()方法是Reference类中的
      if (k == key) { // 如果是同一个ThreadLocal,就直接替换Value
      e.value = value;
      return;
      }
      if (k == null) { // 如果目标位置为空,则插入数据,否则继续找下一个位置
      replaceStaleEntry(key, value, i);
      return;
      }
      }
      }
  • ThreadLocalMap获取数据的时候,也是根据ThreadLocal对象的hash值获取数组中对应的数据,获取到数据之后还需要判断是否是同一个ThreadLocal,如果不是就继续找下一个。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    if (e != null && e.get() == key) { // 判断是否是同一个ThreadLocal
    return e;
    } else {
    // getEntryAfterMiss(): 会遍历ThreadLocalMap中所有Entry,直到找到ThreadLocal相同的元素
    return getEntryAfterMiss(key, i, e);
    }
    }

之所以要用Entry数组保存数据,是因为一个Thread可以创建多个ThreadLocal,这些ThreadLocal保存的数据只会放在同一个ThreadLocalMap中。

内存泄漏

线程池中的线程可能会一直存活下去,这就意味着Thread持有的ThreadLocalMap不会被回收,然后ThreadLocalMap中的Entry对ThreadLocal是弱引用,所以ThreadLocal是可以被回收掉的。但是Entry对Value是强引用的,所以Value无法被回收也无法被访问到,就会导致内存泄漏。

所以在写代码时,使用完ThreadLocal一定要在finally代码块中调用remove()方法。

ReentrantLock

ReentrantLock继承自Lock接口,它是可重入锁。

可以通过构造函数设置为公平锁,默认是非公平锁。

  • lock():获取锁,如果锁被其它线程持有,则阻塞当前线程,直到获取到锁。
  • tryLock():尝试获取锁,如果获取到锁则返回true,否则返回false。不会阻塞当前线程。
1
2
3
4
5
6
7
8
9
10
11
public class T {
Lock lock = new ReentrantLock();
public void foo(int value) {
try {
lock.lock();
value++;
} finally {
lock.unlock();
}
}
}

ReentrantReadWriteLock

ReentrantReadWriteLock实现了ReadWriteLock接口,它是支持可重入的读写锁。

可以通过构造函数设置为公平锁,默认是非公平锁。

ReadWriteLock不支持锁升级:不释放读锁的前提下,无法再获取写锁。

ReadWriteLock支持锁降级:不释放写锁的前提下,可以再次获取读锁。

常用写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Cache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 读锁
private final Lock readLock = lock.readLock();
// 写锁
private final Lock writeLock = lock.writeLock();

// 如果缓存不存在,就加载数据到缓存中
public V get(K key) {
V value = null;
try {
// 加读锁
readLock.lock();
value = cache.get(key);
} finally {
readLock.unlock();
}
if (value != null) {
return value;
}
try {
// 加写锁
writeLock.lock();
// 查询数据库
value = fromDB(key);
cache.put(key, value);
} finally {
writeLock.unlock();
}
return value;
}
}

StampedLock

StampedLock是JDK1.8提供的,它的性能比ReadWriteLock(读写锁)更好。但是StampedLock不支持重入。

StampedLock支持三种锁模式:写锁悲观读锁乐观读

  • 它允许多个线程同时获取悲观读锁,只允许一个线程获取写锁,写锁和悲观读锁是互斥的。
  • 当多个线程同时进行乐观读时,是允许一个线程获取写锁的,并不是所有的写操作都会阻塞。

StampedLock之所以比ReadWriteLock性能好,就是因为StampedLock支持乐观读,乐观读不会加锁的,所以性能上会好很多。

StampedLock进行写锁、读锁、乐观读的时候,都会返回一个long类型的stamp,解锁时需要传入stamp。

需要注意的是,StampedLock是是支持锁升级和降级的,可以通过tryConvertToReadLock()和tryConvertToWriteLock()方法,但是这两个会返回新的stamp,在解锁时一定要使用这个新的stamp。

读模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final StampedLock sl = new StampedLock();
// 乐观读
long stamp = sl.tryOptimisticRead();
// 判断乐观读期间是否发生写操作,如果有则返回则返回false
if (!sl.validate(stamp)) {
// 如果发生了写操作,则升级为悲观读锁
stamp = sl.readLock();
try {
// 开始读操作....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}

写模板:

1
2
3
4
5
6
7
8
9
private final StampedLock sl = new StampedLock();
// 加写锁
long stamp = sl.writeLock();
try {
// 进行写操作
} finally {
// 释放写锁
sl.unlockWrite(stamp);
}

Condition

Condition是条件变量,可以用来做异步转同步。

Condition中主要有三个方法:await()signal()signalAll(),对应的是Object类的wait()notify()notifyAll()

  • await():进入等待状态并释放锁。
  • signal():唤醒一个等待线程。
  • signalAll():唤醒所有等待线程。

比如需要等待某一个异步接口返回结果之后才能执行后续的逻辑,就可以在调用接口之前执行await()方法阻塞当前线程,获取到返回值之后执行signal()方法唤醒当前线程。

异步转同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Object response;

public Object get(int timeout) {
long start = System.nanoTime();
try {
lock.lock();
while (response == null) {
// 等待结果返回
condition.await(timeout);
long cur = System.nanoTime();
if (response != null || cur - start > timeout) {
break;
}
}
} finally {
lock.unlock();
}
if (response == null) {
throw new TimeoutException();
}
return response;
}

// 发送异步请求
public void send() {
new Thread(() -> {
Response res = sendRequest();
doReceived(res);
}).start();
}

// 返回结果时调用该方法
private void doRecived(Response res) {
try {
lock.lock();
response = res;
// 获取到返回值后,唤醒线程
condition.signal();
} finally {
lock.unlock();
}
}

Semaphore

Semaphore可以让多个线程同时访问某一个资源,可以用来实现简单的限流器。

  • acquire():Semaphore中的计数器+1,当计数器达到阈值,会阻塞当前线程,直到计数器减少。
  • release():Semaphore计数器-1。

使用Semaphore时需要注意线程安全问题,因为Semaphore可以让多个线程进入临界区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Counter {
private int value;
// 初始化信号量
private Semaphore semaphore = new Semaphore(5);

public void add() {
try {
semaphore.acquire();
value++;
} finally {
semaphore.release();
}
}
}

CountDownLatch

CountDownLatch是一个线程同步工具,可以让父线程等待子线程执行的执行结果。

CountDownLatch可以理解为是一个计数器,在初始化时可以指定计数器的值,通常是子线程或者任务的数量,让主线程调用await()方法进入等待状态,子线程执行完毕就调用countDown()方法让计数器减一,当计数器为0时CountDownLatch会唤醒主线程。

CountDownLatch的计数器是不能循环利用的,计数器的值一旦减到0,再次调用await()方法会直接通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CountDownLatch latch = new CountDownLatch(2);
P p = null;
D d = null;
new Thread(() -> {
p = getP();
latch.countDown();
}).start();

new Thread(() -> {
d = getD();
latch.countDown();
}).start();

latch.await();
Diff diff = check(p, d);
save(diff);

CyclicBarrier

CyclicBarrier是一个线程同步工具,可以让一组线程之间相互等待。

在初始化时可以指定计数器的值,子线程执行完之后调用await()方法,进入等待状态并让计数器减一。计数器为0时会重置计数器的值,并让最后一个调用await()方法的线程触发回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Vector<P> pos;
Vector<D> dos;
Executor executor = Executors.newFixedThreadPool(1);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
// 将回调函数提交到线程池中执行
executor.execute(() -> check())
});

public void check() {
P p = pos.remove(0);
D d = dos.remove(0);
Diff diff = check(p,d);
save(diff);
}

public void checkAll() {
new Thread(() -> {
while (存在未对账的数据) {
P p = getP();
pos.add(p);
cyclicBarrier.await();
}
}).start();
new Thread(() -> {
while (存在未对账的数据) {
D d = getD();
dos.add(d);
cyclicBarrier.await();
}
}).start();
}

FutureTask

FutureTask实现了Runnable接口和Future接口,所以可以把FutureTask作为任务提交给线程池去执行,也可以用来获取任务的执行结果

它有两个构造函数:

1
2
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}
  • 第一个构造函数的参数是Callable接口,Callable接口中的call()方法是有返回值的,调用FutureTask的get()方法得到的就是call()方法的返回值。
  • 第二个构造函数的参数是Runnable接口和结果引用,调用FutureTask的get()方法得到的就是结果引用。

CompletableFuture

CompletableFuture继承了Future接口和CompletionStage接口。

跟传统的Future相比,CompletableFuture可以更方便的处理任务之间的关联关系。

默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认的线程数是CPU的核心数。

在开发时尽量使用自定义的线程池,因为如果所有的CompletableFuture都使用同一个线程池,如果有一些任务执行了一些很慢的I/O操作,就会导致线程池中所有的任务都阻塞,会影响整个系统的性能,所以尽量根据不同的业务类型创建不同的线程池。

主要方法:

  1. 描述串行关系:

    • thenApply(function):把前一个任务的执行结果,交给后面的Function处理(有返回值)。

    • thenAccept(consumer):把前一个任务的执行结果,交给后面的Consume处理(没有返回值)。

    • thenRun(runnable):忽略前一个任务的结果,执行额外的逻辑(没有返回值)。

    • thenCompose(function):会把前一个CompletableFuture的结果传递给内部的CompletableFuture,最后返回一个新的CompletableFuture对象。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      CompletableFuture<Integer> future = CompletableFuture
      .supplyAsync(new Supplier<Integer>() {
      @Override
      public Integer get() {
      int number = new Random().nextInt(30);
      System.out.println("第一次运算:" + number);
      return number;
      }
      }).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
      @Override
      public CompletionStage<Integer> apply(Integer param) {
      return CompletableFuture.supplyAsync(new Supplier<Integer>() {
      @Override
      public Integer get() {
      int number = param * 2;
      System.out.println("第二次运算:" + number);
      return number;
      }
      });
      }
      });
  2. 描述AND聚合关系:

    • thenCombine(other, function):两个任务都要有返回值,可以对两个任务的结果做一些额外的处理(有返回值)。
    • thenAcceptBoth(other, consumer):两个任务都要有返回值,可以对两个任务的结果做一些额外的处理(没有返回值)。
    • runAfterBoth(other, runnable):不需要前面的任务有返回值,可以执行一些额外的逻辑(没有返回值)。
  3. 描述OR聚合关系:

    • applyToEither(other, function):两个任务都要有返回值,任意一个任务先结束,就会执行(有返回值)。
    • acceptEither(other, consumer):两个任务都要有返回值,任意一个任务先结束,就会执行(没有返回值)。
    • runAfterEither(other, runnable):不需要前面的任务有返回值,任意一个任务先结束,就会执行(没有返回值)。
  4. 异常处理:

    • exceptionally(function):前面的任务报错时就会执行,类似于catch{}代码块。
    • whenComplete(consumer):无论是否发生异常都会执行,相当于finally{}代码块,可以配合exceptionally使用(没有返回值)。
    • handle(function):无论是否发生异常都会执行,相当于finally{}代码块(有返回值)。

方法名带有Async表示会异步执行function、consumer或者runnable。

CompletableFuture提供4个静态方法来创建对象:

1
2
3
4
5
6
// 使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplyer<U> supplyer);
// 指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplyer<U> supplyer, Executor executor);

runAsync()方法接收的是Runnable接口,所以它是没有返回值的。

supplyAsync()方法接收的是Supplyer函数式接口,Supplyer的get()方法是有返回值的。

用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println("T1:洗⽔壶...");
sleep(1, TimeUnit.SECONDS);

System.out.println("T1:烧开⽔...");
sleep(15, TimeUnit.SECONDS);
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);

System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);

System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "⻰井";
});

CompletableFuture<String> f3 = f1.thenCombine(f2, (_, tf) -> {
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});

//等待任务3执⾏结果
System.out.println(f3.join());

CompletionService

有些时候我们会执行多个异步任务,并且需要获取返回值,比较简单的做法是调用线程池的submit()方法,获取Future对象,再调用Future对象的get()方法获取返回值。但是调用Future对象的get()方法会导致当前线程阻塞,直到任务执行完毕。

如果任务1的执行时间比任务2的执行时间长,如果先调用任务1的Future对象的get()方法,就会导致任务2即使执行完毕后也不能及时处理,需要等任务1执行完毕后才能处理,CompletionService就可以解决这样的问题。

CompletionService内部维护了一个阻塞队列,当任务执行结束后,就把Future对象加入到队列中。队列是先进先出的,这样先执行完的任务就可以先加入到队列中,就可以先进行处理。

CompletionService是一个接口,实现类是ExecutorCompletionService,这个实现类的构造方法有两个:

1
2
public ExecutorCompletionService(Executor executor){}
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {}

创建ExecutorCompletionService需要传入一个线程池和BlockingQueue,如果不指定BlockingQueue默认使用的是无界的LinkedBlockingQueue。

主要方法:

  • Future<V> submit(Callable<V> task)
  • Future<V> submit(Runnable task, V result):执行
  • Future<V> take():从队列中获取并移除一个元素,就阻塞当前线程,直到有元素返回。
  • Future<V> pool():从队列中获取并移除一个元素,如果队列是空的,会返回null。
  • Future<V> pool(timeout, unit):从队列中获取并移除一个元素,如果队列是空的,会等待一定的时间,超时后返回null。

用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute( () -> save(r));
}

Fork/Join

Fork/Join是一种分治任务模型,主要分为两个阶段:

  • 一个阶段是任务分解,把一个大任务分解成多个小任务。
  • 另一个阶段是结果合并,合并所有小任务的执行结果,得到最终的结果。

Java提供了ForkJoinPool和ForkJoinTask两个类来支持分治任务模型。这两个类的关系类似于线程池和线程的关系。

ForkJoinPool

ForkJoinPool是内部有多个任务队列,当我们通过invoke()或者submit()方法提交任务时,ForkJoinPool会把任务提交到一个任务队列中。如果任务在执行过程中还会创建子线程,那么子线程会被提交到父线程所在的任务队列中。

如果某一个队列空了,还会从其它队列中 “窃取” 任务。

任务队列采用的是双端队列,正常执行任务和 “窃取任务” 会从不同的端口消费。

ForkJoinTask

ForkJoinTask是一个抽象类,最核心的方法是fork()join(),其中fork()方法会异步执行一个子任务(执行compute()方法),join()方法会阻塞当前线程等待子任务的执行结果。

ForkJoinTask有两个子类,RecursiveAction和RecursiveTask,它们都是用递归的方式来处理分治任务的。这两个子类也是抽象类,内部定义了compute()抽象方法,需要自己定义子类去拓展。

RecursiveAction没有返回值。

RecursiveTask有返回值。

调用fork()方法时,会调用我们实现的compute()方法。

单词统计:

1
2
3
4
5
6
7
public static void main(String[] args) {
String[] words = {"hello world", "hello me", "hello"};
// 创建ForkJoinPool
ForkJoinPool taskPool = new ForkJoinPool(10);
Map<String, Integer> result = taskPool.invoke(new WordCount(words, 0, words.length));
System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class WordCount extends RecursiveTask<Map<String, Integer>> {

private final String[] words;
private final int start, end;

public WordCount(String[] words, int start, int end) {
this.words = words;
this.start = start;
this.end = end;
}

@Override
protected Map<String, Integer> compute() {
if (end - start == 1) {
return count(words[start]);
}
int mid = (start + end) / 2;
WordCount wc1 = new WordCount(words, start, mid);
wc1.fork();
WordCount wc2 = new WordCount(words, mid, end);
return merge(wc1.join(), wc2.compute());
}

private Map<String, Integer> merge(Map<String, Integer> r1, Map<String, Integer> r2) {
r1.forEach((key, value) -> {
r2.merge(key, value, Integer::sum);
});
return r2;
}

private Map<String, Integer> count(String line) {
String[] words = line.split("\\s");
Map<String, Integer> result = new HashMap<>();
for (String word : words) {
result.merge(word, 1, Integer::sum);
}
return result;
}
}