提示

Java 多线程与并发相关。@ermo

# 3 Java 多线程与并发

# 3.1 进程和线程

# 线程和线程的概念?

进程是一次独立的程序过程,是操作系统分配资源的最小单位,进程与进程之间相互独立,互不干扰,可以理解为一个应用程序就是一个进程。

线程是程序执行流中的最小单元,一个进程有多个线程,每个线程有自己独立的程序计数器、虚拟机栈、本地方法栈。多个线程之间共享当前进程的内存空间。

# 并行和并发的区别是什么?

并行是多个处理器同时处理多个不同的任务,多个任务在同一时刻发生。

并发是一个处理器同时处理多个任务,多个任务在微观上的同一时间段发生。

举例说明,2个售票窗口排着2队人,售票员(处理器)一会去 A 窗口卖票,一会去 B 窗口卖票,这就是并发。

2个售票窗口排着2队人,每个售票窗口都有一名售票员(处理器),购票人(任务)可以同时买票,这就是并行。

# 说一下你对 Java 内存模型的理解?

Java 内存模型(Java Memory Model,JMM)是一种规范,定义 Java 程序中多线程访问共享变量时内存的使用和交互行为。在 Java 之前,编程语言可以直接复用操作系统的内存模型,但是 Java 是一门跨平台语言,面对不同操作系统的差异性,Java 自己定义了一套内存模型用于屏蔽操作系统的差异性。

JMM 的出现是为了按需禁用 CPU 缓存和指令重排带来的可见性问题。JMM 定义了主内存和工作内存:

  1. 主内存:所有变量都必须存到主内存(Main Memory)中,包括成员变量和方法中的局部变量。
  2. 工作内存:每个线程都有一个工作内存(Working Memory)用于存储共享变量的副本,工作内存是私有的,只能被当前线程访问。

上图中,如果2个线程需要对一个共享变量进行交互,就必须执行2个操作:

  1. 线程1将工作内存中修改过的共享变量副本同步大主内存中。
  2. 线程2去主内存中读取共享变量。

针对线程、工作内存和主内存的交互,JMM 定义了8种原子操作:

  1. lock(锁定)
  2. unlock(解锁)
  3. read(读取)
  4. load(载入)
  5. use(使用)
  6. assign(赋值)
  7. store(存储)
  8. write(写入)

共享变量在工作内存与主内存的交互必须使用上述8中原子操作,并且执行顺序必须按照 JMM 的规则。比如,变量从主内存复制到工作内存,必须顺序执行 read 和 load 操作;变量从工作内存回到主内存必须顺序执行 store 和 write 操作。

有了 JMM,Java 可以解决多线程中的三个主要问题:

  1. 原子性:只有基本数据类型的读取和赋值属于原子性操作,更大范围的原子性操作可以使用 synchronizedLock 来解决。
  2. 可见性:当一个线程修改了变量的值,其他线程可以立即获取到最新的修改。为了保证共享变量在多个线程之间可见,Java 使用 volatilesynchronizedfinal 关键字来解决,本质上是使用 happens-before 原则。
  3. 有序性:为防止指令重排,Java 使用了 volatilesynchronized 来保证指令的有序性。防止指令重排的底层原理是内存屏障,加上 volatile 的变量反编译后会出现 lock addl $0x0,(%esp) 的操作,相当于一个 lock 指令,也就是内存屏障指令。

JMM 中规定的一组 happens-before 用于保证两个操作之间的内存可见性,即操作1 happens-before 操作2,那么操作1的执行结果对操作2可见。一共有8条规则,与书写代码相关的规则有:

  1. 程序顺序规则:一个线程内,程序按照代码书写顺序执行,前面的动作 happens-befores 后面的动作。
  2. 监视器锁定规则:监视器的解锁动作 happens-before 后续对这个监视器的锁定动作。
  3. volatile 变量规则:对 volatile 的写入动作 happens-before 后续对这个字段的读取动作。
  4. 传递性:A happens-before B,B happens-before C,可以推导出 A happens-before C。
  5. 线程启动规则:Thread 中的 start 方法 happens-before 线程 run 中的任一个动作。

# 线程的生命周期和状态包括哪些?

线程的状态:新建(New),就绪(Runable),运行(Running),阻塞(Blocking),无限期等待(Waiting),限期等待(Timed Waiting),结束(Terminated)。

关于进入每种状态的节点:

  • 新建(New)

线程创建后没有调用 start() 方法。

  • 就绪(Runnable)

线程调用 start() 方法后处于就绪状态,当前状态的线程可能正在运行,也可能等待 CPU 分配资源。

  • 阻塞(Blocked)

阻塞状态,线程因为某种原因放弃 CPU 使用权,暂时停止运行。直到线程进入就绪状态,才会有机会转入运行状态。2个线程竞争 synchronized 关键字描述的代码块执行机会,如果线程1成功获取到锁,线程2就会进入 blocked 状态。

  • 无限期等待(Waiting)

只能等待其他线程显示调用 Object.notify() 或者 Object.notifyAll() 方法,或者被调用的线程执行完毕。

  • 限期等待(Timed Waiting)

无需等待其他线程显示唤醒,在一定时间后会被系统自动唤醒。

阻塞 blocked 和等待 waiting/timed waiting 的区别是,阻塞是被动的,获取排他锁失败的时候会进入阻塞状态,而等待是主动的,可以通过调用 Thread.sleep()Object.wait() 主动进入等待状态。

  • 结束(Terminated)

线程执行完毕或者是执行异常退出 run() 方法,当前线程结束生命周期。

# 有几种创建线程的方式?

  • 继承 Thread
public class ThreadTest extends Thread {
    @Override
    public void run() {
        System.out.println("I'm a test thread.");
    }
    public static void main(String[] args) throws InterruptedException {
        // 创建线程
        ThreadTest threadTest = new ThreadTest();
        // 启动线程
        threadTest.start();

        System.out.println("main 线程结束");
    }
}

执行 main() 方法输出:

main 线程结束
I'm a test thread.
  • 实现 Runnable 接口,重写 run 方法
public class RunnableTest implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " execute");
    }

    public static void main(String[] args) {
        Thread test = new Thread(new RunnableTest());
        test.start();
        System.out.println("main thread execute");
    }
}

执行 main() 方法,输出:

main thread execute
Thread-0 execute
  • 实现 Callable 接口,覆写 run() 方法
public class CallableTest implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println(Thread.currentThread().getName() + " execute");
        return 2;
    }

    public static void main(String[] args) {
        FutureTask<Integer> futureTask = new FutureTask<>(new CallableTest());
        Thread thread = new Thread(futureTask);
        thread.start();
        System.out.println("main thread execute");
    }
}

执行 main() 方法,输出:

main thread execute
Thread-0 execute

# 用户线程和守护线程的区别是什么?

Java 中分为两种线程:用户线程(user)和守护线程(daemon)。

可以通过 thread.setDaemon(true) 来设置守护线程。

最后一个非守护线程结束 JVM 才会退出,守护线程不影响 JVM 退出。

# 线程的中断有哪些方式?

  • interrupt() 方法

设置线程中断标志后线程并不会直接终止,被中断的线程会根据自身状态自行处理。

interrupt() 方法,中断线程,set 操作。设置中断标志为 true,此时线程不会真正退出,如果当前线程处于阻塞状态,再被调用 interrupt() 方法后,线程会抛出 InterruptedException 异常然后终止。

Thread 类中还有一个 isInterrupted() 方法,判断当前线程是否被中断,get 操作。

public class ThreadInterruptTest {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread() {
            @Override
            public void run() {
                while(!Thread.currentThread().isInterrupted()) {
                    System.out.println(Thread.currentThread() + " hello");
                }
            }
        };

        thread.start();

        // 主线程休眠1s
        Thread.sleep(1000);

        // 子线程设置中断标志
        System.out.println("main thread interrupt thread");
        thread.interrupt();

        // 等待子线程执行结束
        thread.join();
        System.out.println("main is over");
    }
}
  • interrupted() 方法

interrupted() 方法,静态方法,检测当前线程是否被中断,内部调用当前线程的 isInterrupted() 方法。如果发现当前线程中断标志为 true,就会清楚中断标志。属于 getandset 操作。

  • 自定义中断标识符

定义一个使用 volatile 修饰的静态变量作为中断标识符,通过标识符的值来决定是否要中断线程,这种方法不会很及时。

public class InterruptFlagTest {

    private static volatile boolean isInterrupt = false;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread() {
            @Override
            public void run() {
                while (!isInterrupt) {
                    System.out.println(Thread.currentThread().getName() + " start run");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " end run");
                }
            }
        };

        thread.start();
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " set isInterrupt flag=true");
        isInterrupt = true;
    }
}

输出

Thread-0 start run
Thread-0 end run
Thread-0 start run
main set isInterrupt flag=true
Thread-0 end run

# 线程池有哪几种状态?

TODO 待完成

running,shutdown,stop,Tidying,TERMINATED。

# 线程之间有哪些协作方式?

  • wait()notify()/notifyAll() 方法

wait()notify()/ notifyAll() 方法用于指定 object 的锁,而不是自己的。这些方法用于实现等待和通知机制,通过调用 wait() 方法,一个线程可以主动释放对象的锁并且进入等待状态,直到其他线程调用 notify() 来唤醒它。

  • join() 方法

用于等待一个线程的结束,在线程 A 内调用线程 B 的 join() 方法,线程 A 会进入阻塞状态,直到线程 B 的执行完毕,线程 A 才会继续执行。

/**
 * join() 示例,在线程 t2 中调用线程 t1.join(),t2 阻塞,直到 t1 执行完成。
 */
public class JointTest {

    private static class Thread1 extends Thread {
        @Override
        public void run() {
            System.out.println("thread1 run.");
        }
    }

    private static class Thread2 extends Thread {
        private final Thread1 thread1;
        Thread2(Thread1 thread1) {
            this.thread1 = thread1;
        }
        @Override
        public void run() {
            try {
                // t2 等待 t1 执行结束
                thread1.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread2 run.");
        }
    }

    public static void main(String[] args) {
        Thread1 t1 = new Thread1();
        Thread2 t2 = new Thread2(t1);
        t2.start();
        t1.start();
    }
}
  • sleep() 方法

sleep() 方法会使线程暂停执行一段时间。线程在调用 sleep() 方法会进入阻塞状态,并且不会释放锁,常用于模拟一段时间间隔或定时任务。

# sleep() 方法和 wait() 方法的区别?

  • wait()Object 类的方法,sleep()Thread 类的静态本地方法
  • wait() 释放锁,sleep() 没有释放锁
  • wait() 配合 notify() 或者 notifyAll() 用于线程通信,sleep() 用于暂停执行

# 调用 Thread 的 run() 和 start() 方法有什么区别?

run() 方法定义了线程的执行逻辑,并在当前线程的上下文中执行,不会创建新的线程。当通过调用Thread 对象的 run() 方法时,实际上是在当前线程中按顺序执行 run() 方法的代码。这相当于普通的方法调用,没有并发执行的效果。

start() 方法用于启动一个新线程,并执行其中的线程代码。当通过调用 Thread 对象的start() 方法时,会创建一个新的线程,并在新线程的上下文中执行 run() 方法的代码。start() 方法会触发线程调度,使得新线程可以与其他线程并发执行。

# 手写一个生产者和消费者案例?

生产者:

public class Producer implements Runnable {
    private LinkedList<Integer> buffer;
    private int capacity;
    private int num;

    public Producer(LinkedList<Integer> buffer, int capacity, int num) {
        this.buffer = buffer;
        this.capacity = capacity;
        this.num = num;
    }

    @Override
    public void run() {
        for (int i = 0; i < num; i++) {
            try {
                synchronized (buffer) {
                    if (buffer.size() == capacity) {
                        buffer.wait();
                    }

                    int value = i + 1;
                    buffer.addLast(value);
                    System.out.println("生产者生产数据:" + value);
                    buffer.notifyAll();
                }

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者:

public class Consumer implements Runnable {

    private LinkedList<Integer> buffer;
    private int capacity;

    public Consumer(LinkedList<Integer> buffer, int capacity) {
        this.buffer = buffer;
        this.capacity = capacity;
    }

    @Override
    public void run() {
        while (true) {
            try {
                synchronized (buffer) {
                    while (buffer.size() == 0) {
                        buffer.wait();
                    }

                    int value = buffer.removeFirst();
                    System.out.println("消费者消费数据:" + value);
                    buffer.notifyAll();
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

启动类:

public class MainDemo {

    public static void main(String[] args) {
        LinkedList<Integer> buffer = new LinkedList<>();
        int capacity = 10;

        Producer producer = new Producer(buffer, capacity, 20);
        Consumer consumer = new Consumer(buffer, capacity);

        Thread t1 = new Thread(producer);
        Thread t2 = new Thread(consumer);

        t1.start();
        t2.start();
    }
}

输出:

生产者生产数据:1
消费者消费数据:1
生产者生产数据:2
消费者消费数据:2
生产者生产数据:3
消费者消费数据:3
生产者生产数据:4
消费者消费数据:4
生产者生产数据:5
消费者消费数据:5
生产者生产数据:6
消费者消费数据:6
生产者生产数据:7
消费者消费数据:7
生产者生产数据:8
消费者消费数据:8
生产者生产数据:9
消费者消费数据:9
...

# 3.2 锁

# 什么是死锁?

程序在执行过程中,两个或多个线程因争夺资源而造成相互等待,程序无法执行下去的一种状态称为死锁。

线程1占有资源 B 的同时还想要占有资源 A,线程2占有资源 A 的同时还想要占有资源 B,这种情况就是死锁。

# 产生死锁的四个必要条件是什么?

  • 互斥:资源 A 在同一时刻只能被一个线程占用
  • 占有且等待:线程1占有资源 A 的同时又对资源 B 提出请求
  • 不可抢占:资源 A 一旦被线程1占有,其他线程不能抢占资源 A
  • 环路等待:线程1持有资源 A 在等待资源 B,线程2持有资源 B 在等待资源 A

# 自己手写一个死锁?

public class DeadLockDemo {
    private static Object lock1 = new Object();
    private static Object lock2 = new Object();
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized(lock1) {
                System.out.println("t1 get lock1.");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("t1 wait to get lock2.");
                synchronized (lock2) {
                    System.out.println("t1 get lock2.");
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("t2 get lock2.");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("t2 wait to get lock1.");
                synchronized (lock1) {
                    System.out.println("t2 get lock1.");
                }
            }
        });

        t1.start();
        t2.start();
    }
}

输出

t1 get lock1.
t2 get lock2.
t1 wait to get lock2.
t2 wait to get lock1.

可以看出线程 t1 拿到资源 lock1 正在等待 lock2 资源,线程 t2 拿到资源 lock2 正在等待 lock1 资源,程序出现死锁状态。

# 锁池和等待池的区别?

锁池(Lock Pool),所有需要竞争同步锁的线程都会放到锁池中,锁池由操作系统管理,当一个线程想要获取一个被其他线程占用的锁时,该线程会被放到锁池中,一旦同步锁被释放,操作系统会从锁池中选取一个或者多个线程,这些线程会进入就绪(Runnable)状态,准备竞争同步锁的获取权。

等待池(Wait Pool),是由 Java 虚拟机管理的一组线程,当一个线程调用 wait() 方法,该线程将被放置到等待池中,并且释放锁。在等待池中的线程只有当其他线程调用 notify()notifyAll() 方法才会被唤醒,等待池中的线程被唤醒后,就会被放置到锁池中,准备重新竞争锁的获取权。

# 悲观锁和乐观锁的区别?

悲观锁,假设多个事务同时访问共享资源时会发生冲突,因此在访问共享资源时将其锁住,同一时刻只能一个事务访问共享资源。比如上卫生间的例子,每个人就是一个独立的事务,每个隔间都有一把锁,一个人上锁的时候,别的用户无法访问。

常见的悲观锁有 synchronized 关键字,ReentrantLock 类以及数据库锁。

乐观锁,假设多个事务访问共享资源时不会发生冲突,因此在访问共享资源时不会上锁。

乐观锁的实现一般使用版本号机制和 CAS 算法实现。

版本号机制实现方式,以 MySQL 举例,数据库表通常添加版本字段,或者状态字段,更新数据时使用版本进行判断。

例:id=1 的账户余额设置为1000,$version 代表上次从数据库中读取的变量。

update balance set amount = 1000, version = $version + 1 where id = 1 and version = $version;

CAS 全称为 Compare And Swap,属于原子操作,是一种进行多线程安全的比较和变更操作的算法,用预期值和要更新的变量值比较,一致才会更新为新值。

Java 中 CAS 具体实现在 Unsafe 类中的 CompareAndSwapXxx() 方法。

// java 源代码
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

# CAS 有哪些问题?

  1. ABA 问题
  • 线程A读取共享变量的值为A。
  • 在线程A读取共享变量的值后,线程B将共享变量的值改为B,然后又改回A,形成了一个“ABA”序列。
  • 线程A再次使用CAS操作将共享变量的值由A改为C,由于共享变量的值在CAS操作之前和之后都是A,所以线程A并不知道共享变量的值被其他线程改变过。
  1. 循环时间长开销大,如果 CAS 失败,自旋会服务器带来压力
  2. 只能保证对一个变量的原子性操作,无法保证像 i++ 这样操作的原子性

# 公平锁和非公平锁的区别?

公平锁,先到先得,多个线程按照排队的顺序来获取锁,ReentrantLock 类可以创建公平锁:

Lock lock = new ReentrantLock(Boolean.TRUE);

非公平锁,多个线程不会根据先到先得的顺序获取锁,并发场景下会造成饥饿状态,有的线程有可能一直无法获取锁。常见的非公平锁有 synchronized 关键字和 new ReentrantLock()

# 独占锁和共享锁的区别?

独占锁,也称为排他锁,一个线程对共享数据加上独占锁后,其他线程不能对该共享数据再加任何类型的锁,获得独占锁的线程拥有读写共享数据的权利。常用的独占锁有 synchronized 关键字和 Lock 类。

共享锁,可以被多个线程持有,一个线程对共享数据加上共享锁后,其他线程只能对共享数据再加共享锁,不能加独占锁,获得共享锁的线程只能读共享数据,不能执行写操作。常用的共享锁有 ReentrantReadWriteLock

# ReentrantLock 是什么?

ReentrantLockLock 的一个实现类,是 juc(java.util.concurrent)包中的锁。

看一个 ReentrantLock 的简单使用:

public class LockExample {

    private Lock lock = new ReentrantLock();

    public void func() {
        lock.lock();

        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(i + " ");
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        LockExample lockExample = new LockExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> lockExample.func());
        executorService.execute(() -> lockExample.func());
    }
}

输出:

0 
1 
2 
3 
4 
0 
1 
2 
3 
4 

# ReentrantLock 和 synchronized 的区别是什么?

  1. synchronized 是 JVM 实现的,ReentrantLock 是 JDK 实现的。
  2. 新版本对 synchronized 做了很多性能优化,比如有自旋锁,性能方面两者大致相同。
  3. ReentrantLock 可以绑定多个 Condition 条件。
  4. synchronized 是非公平的,ReentrantLock 默认也是非公平的,可以通过构造参数设置为公平锁。
  5. 持有锁的线程长期不放弃的锁的时候,ReentrantLock 可以中断线程放弃等待,synchronized 不行。

# 简单说下你对 AQS 的理解?

AQS(AbstractQueuedSynchronizer)是 JUC 包中的一个抽象类,是锁和同步器的一些通用实现,有很多锁和同步器都是使用 AQS 实现的,比如有 ReentrantLock(可重入锁)、ReentrantReadWriteLock(可重入读写锁)、Sempaphore(信号量)和 CountDownLatch 等。

AQS 的底层其实是使用 CLH(Craig, Landin, and Hagersten)队列锁,这是一种自旋锁的变体,通过 FIFO 队列和自旋锁来管理等待线程。当线程尝试获取锁时,如果发现当前共享资源被占用,则会将当前线程以节点的形式添加到队列尾部,并进行自旋尝试获取锁资源。每个 CLH 队列节点保存着线程的引用 thread、当前节点的状态 waitStatus、前驱节点 prev、后继节点 next。

AQS 的资源共享模式分为2种:独占模式和共享模式。

独占模式是指同一时刻只允许有一个线程可以获取到锁,典型的类有 ReentrantLock

共享模式是值同一时刻允许多个线程同时获取锁,常见的实现有 SemaphoreCountDownLatchCyclicBarrier 等。

ReentrantLock 使用 AQS 的大致流程:内部有一个 state 字段,初始化为0,表示未锁定状态。线程 t1 调用 lock() 后,内部调用 tryAcquire() 会使用 CAS 将 state 加1,其他线程调用 tryAcquire() 直接返回 false,直到调用 unlock()state 降到0,即释放资源,其他线程才有机会获取到锁。

在线程 t1 释放锁前,线程 t1 可以重复获取锁,state 字段会累加,这就是可重入概念。state 获取(累加)多少次就要释放(减少)多少次,最终保证 state 为0,这样其他线程才可以重新获取锁。

下面是 ReentrantLocktryAcquire() 源代码:

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                // 状态+1
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

参考:

  • https://www.cnblogs.com/waterystone/p/4920797.html
  • https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg

# 3.3 Atomic

TODO Atuomic

# 3.4 阻塞队列

TODO 阻塞队列

# 3.5 工具类和容器类

TODO 工具类和容器类

# 3.6 Executor

TODO Executor

# 3.7 Fork&Join 框架

TODO Fork and Join

# 3.8 线程池

# 线程池的参数有哪些?

这个问题问的是线程池创建类 ThreadPoolExecutor 创建线程池的构造器中各个参数的含义。

核心参数有7个,对应源代码为:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 省略具体实现代码                                
    }
    1. int corePoolSize:线程池核心线程数。

该参数用于设置线程池维护的最小核心线程数,即便这些线程处于空闲状态也不会被销毁。除非设置了 allowCoreThreadTimeOut。一般将 corePoolSize 设置为 CPU 数量的1到2倍,最大线程数设置为 CPU 的2到4倍即可。

    1. int maximumPoolSize:最大线程数。

一个任务创建由线程池中的空闲线程执行,如果没有空闲线程,就将任务放到工作队列缓存,工作队列满了才会创建新线程。之后从工作队列头部取出任务,交给新线程处理,新线程不会无限制创建,maximumPoolSize 就是指定创建新线程的最大的数量。

    1. long keepAliveTime:空闲线程存活时长。

大于 corePoolSize 的线程如果处于空闲状态,经过一段时间后将被销毁,这个参数就是设置的空闲线程的存活时间。

    1. TimeUnit:空闲线程存活时间单位。
    1. BlockingQueue:任务队列。

线程池存放任务的队列,用于存储线程池中待执行的任务。当线程池中的线程数达到核心线程数 corePoolSize 时,剩余任务将放到任务队列中等待执行。

    1. ThreadFactory:线程工厂。

用于创建新的线程、可以定制线程名字、线程组和线程优先级等。一般使用默认的线程工厂即可。

    1. RejectedExecutionHandler:拒绝策略,当线程队列满了并且工作线程大于等于线程池的最大数量 maximumPoolSize 时如果拒绝请求执行的策略。

常用的策略有:

  • AbortPolicy:直接抛出异常处理,为默认策略;
  • DiscardPolicy:直接抛弃任务不处理;
  • DiscardOldestPolicy:丢弃队列中最老的任务;
  • CallerRunsPolicy:将任务分配给当前执行 execute 方法线程处理。

整个线程池的执行流程是这样:

  • (1)线程池中线程数量小于核心线程数 corePoolSize 时,新任务进来将创建一个新的线程进行处理,不论此时线程池中是否存在空闲线程;
  • (2)线程池中的线程数大于 corePoolSize,新任务将放置到 workQueue 任务队列中,等待线程池中任务调度执行;
  • (3)工作队列 workQueue 已满,且最大线程数 maximumPoolSzie 大于核心线程数 corePoolSzie 时,新任务会创建新的线程执行;
  • (4)工作队列 workQueue 已满,且任务提交数大于 maximumPoolSize 时,任务交由 RejectedExecutionHandler 执行具体的拒绝策略;
  • (5)当线程池中的线程数量大于 corePoolSize 时,并且空闲时间大于 keepAliveTime,回收当前线程;
  • (6)如果参数 allowCoreThreadTimeOuttrue,线程池中小于 corePoolSize 的线程池空闲并且达到空闲时间 keepAliveTime,这些核心线程也将被回收。

放一张执行流程图供参考:

# 你在项目中是如何使用线程池的?

直接写一个使用线程池的 demo:

定义一个配置类,用于创建 ThreadPoolExecutor 并交给 Spring 管理。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class ThreadPoolConfig {

    @Bean("threadPoolExecutor")
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties properties) {
        return new ThreadPoolExecutor(properties.getCorePoolSize(),
                properties.getMaximumPoolSize(),
                properties.getKeepAliveTime(),
                TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(properties.getQueueSize()),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());
    }
}

然后定义一个配置类 ThreadPoolConfigProperties 映射到 application.yml 配置文件:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties("thread.pool")
@Component
public class ThreadPoolConfigProperties {
    /**
     * 核心线程数
     */
    private Integer corePoolSize;
    /**
     * 最大线程数
     */
    private Integer maximumPoolSize;
    /**
     * 队列长度
     */
    private Integer queueSize;
    /**
     * 空闲线程存活时间,毫秒
     */
    private Integer keepAliveTime;

    // 省略 getter/setter
}

记得在 pom.xml 文件中映入配置相关的包依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

application.properties 中定义线程池参数:

server.port=8899
thread.pool.corePoolSize=5
thread.pool.maximumPoolSize=10
thread.pool.queueSize=1000
thread.pool.keepAliveTime=5000

定义一个 controller 类,使用创建好的线程池:

@SpringBootApplication
@RestController
public class JavaJucDemoApplication {

    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;

    public static void main(String[] args) {
        SpringApplication.run(JavaJucDemoApplication.class, args);
    }

    @GetMapping("/thread")
    public void thread() {
        for (int i = 0; i < 20; i++) {
            threadPoolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + ":执行任务");
            });
        }
    }
}

项目启动后,访问 http://localhost:8899/thread,查看输出:

pool-1-thread-1:执行任务
pool-1-thread-2:执行任务
pool-1-thread-3:执行任务
pool-1-thread-4:执行任务
pool-1-thread-5:执行任务
pool-1-thread-1:执行任务
pool-1-thread-3:执行任务
pool-1-thread-2:执行任务
pool-1-thread-5:执行任务
pool-1-thread-5:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-3:执行任务
pool-1-thread-1:执行任务
pool-1-thread-4:执行任务
pool-1-thread-5:执行任务
pool-1-thread-2:执行任务
Disconnected from the target VM, address: '127.0.0.1:56391', transport: 'socket'

Process finished with exit code 130 (interrupted by signal 2: SIGINT)