并发基础

JMM(Java 内存模型)

原子性(Atomictiy)

原子性是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。

可见性(Visibility)

可见性是指当一个线程修改了某一个共享变量的值时,其他线程能否立即知道这个修改。

缓存优化,硬件优化,指令重排,编辑器的优化。可能导致的可见性问题。

有序性(Ordering)

有序性是指:在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

指令重排

在汇编语言层面,计算机使用流水线技术来执行命令,为了防止流水线中断,就需要使用指令重排。

那些指令不能重排:Happen-Before 规则

  • 程序顺序原则:一个线程内保证语义的串行性。
  • volatile 规则:volatile 变量的写先于读发生,保证了这个变量的可见性。
  • 一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。
  • Thread 对象的 start() 方法调用先行发生于此线程的每一个动作。
  • 一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。
  • Thread 对象的结束先行发生于 join() 方法返回。
  • 对线程 interrupt() 方法的调用先行发生于被中断线程的代码,因此可以通过 interrupted() 方法检测到是否有中断发生。
  • 如果操作 A 先行发生于操作 B,操作 B 先行发生于操作 C,那么操作 A 先行发生于操作 C。

线程中断

线程中断是一种重要的线程协作机制。线程中断并不会使线程立即退出,而是给线程发送一个通知,告知线程需要退出了。至于目标线程接到通知后如何处理是由它自己决定。这也是和stop方法的根本区别。

有三个方法与线程中断有关:

  • interrupt() // 通知目标线程中断,也就是设置中断标志位
  • isInterrupted() // 通过检查编中断标志位判断线程是否被中断
  • interrupted() // 静态方法,判断是否被中断并清楚当前中断状态

InterruptedException

通过调用一个线程的 interrupt() 来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。

对于以下代码,在 main() 中启动一个线程之后再中断它,由于线程中调用了 Thread.sleep() 方法,因此会抛出一个 InterruptedException,从而提前结束线程,不执行之后的语句。

public class InterruptExample {

    private static class MyThread1 extends Thread {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
                System.out.println("Thread run");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new MyThread1();
    thread1.start();
    thread1.interrupt();
    System.out.println("Main run");
}

interrupted()

如果一个线程的 run() 方法执行一个无限循环,并且没有执行 sleep() 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt() 方法就无法使线程提前结束。

但是调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。

public class InterruptExample {

    private static class MyThread2 extends Thread {
        @Override
        public void run() {
            while (!interrupted()) {
                // ..
            }
            System.out.println("Thread end");
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread2 = new MyThread2();
    thread2.start();
    thread2.interrupt();
}

等待(wait)和通知(notify)

这两个方法是Object类中的,任何对象都可以调用这两个方法。

wait()方法只能在synchronized方法或synchronized块中使用(原因:wait方法会释放锁,只有在syn中才有锁)

notifyAll会让所有处于等待池的线程全部进入锁池去竞争获取锁的机会

notify只会随机选取一个处于等待池中的线程进入锁池去竞争获取锁的机会。

锁池EntiyList:当一个线程需要调用调用此方法时必须获得该对象的锁,而该对象的锁被其他线程占用,该线程就需要在一个地方等待锁释放,这个地方就是锁池。(准备抢锁的池子)
等待池WaitSet:调用了wait方法的线程会释放锁并进入等待池,在等待池的线程不会竞争锁。(休息的池子)

值得注意的是无论是wait方法还是notify方法都需要首先获得目标对象的一个监视锁(monitor lock),得到锁后才可以执行方法。

wait 和 sleep 的区别:

Thread.sleep只会让出CPU ,不会释放任何资源;
Object.wait不仅让出CPU , 还会释放已经占有的同步资源锁。

volatile

JVM提供的轻量级同步机制
含义:
它是保证可见性的有效手段,还可以禁止指令重排序。但不能保证线程安全性
原因:因为 Java 里面的运算并非是原子操作(处理器要么把这组操作全部执行完,中间不允许被其他操作所打断,要么这组操作不要执行。)
如:int a = b + 1; 需要三步:取 a 的值,计算 a=b+1,然后把 a 的值写回内存
但是! 这三个操作处理器是不一定就会连续执行的,有可能执行了第一个操作之后,处理器就跑去执行别的操作了 。

当你使用 volatile 声明一个变量时就等于告诉虚拟机这个变量很有可能被修改,为了确保这个变量被修改后,所有线程都可以看到这个改动,虚拟机就必须采用一些措施保证这个变量的可见性。

volatile 关键字也可以保证数据的可见性和有序性。

volatile的作用

对 long、double 提供原子性操作
有序性:提供内存屏障,避免指令重排序
保证可见性

synchronized

synchronized的作用是实现线程间的同步。它的工作是对同步的代码加锁,使得每一次只有一个线程能进入同步块,从而保证线程间的安全性。

synchronized可以保证方法或者代码块在运行时,同一时刻只有一个方法可以进入到临界区,同时它还可以保证共享变量的内存可见性。

synchronized 的用法

  • 作用于对象:对给定的对象加锁,线程进入同步代码块之前要获得给定对象的锁。
  • 作用于实例方法:相当于对当前实例加锁,线程进入同步代码前要获得当前实例的锁
  • 作用于静态方法:相当于对当前类加锁,线程进入同步代码前要获得当前类的锁

synchronized 和 volatile 的区别是什么?

volatile本质是在告诉jvm当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取; synchronized则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住。

volatile仅能使用在变量级别;synchronized则可以使用在变量、方法、和类级别的。

volatile仅能实现变量的修改可见性,不能保证原子性;而synchronized则可以保证变量的修改可见性和原子性。

volatile不会造成线程的阻塞;synchronized可能会造成线程的阻塞。

volatile标记的变量不会被编译器优化;synchronized标记的变量可以被编译器优化。

JDK 并发包(JUC)

重入锁 ReentrantLock

ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。两者性能差距不大, 完全可以使用 ReentrantLock替代 synchronized。

public class LockExample {

    private Lock lock = new ReentrantLock();

    public void func() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        } finally {
            lock.unlock(); // 确保释放锁,从而避免发生死锁。
        }
    }
}

与 synchronized 相比,重入锁有着显式的操作过程,必须由开发人员指定何时加锁,何时释放。

对于一个线程来说,连续获得两次同一把锁是允许的,但要记得释放同样的次数。这也是为什么叫重入锁的原因。

ReentrantLock的高级功能:

1.中断响应

ReentrantLock 提供了在等待锁的过程中,取消对锁的请求的方法 lockInterruptibly() 方法。可以对线程中断进行响应的锁申请动作,即在等待锁的过程中可以响应线程的中断。

2.锁申请等待限时

ReentrantLock.tryLock()方法可以实现等待超时后自动放弃锁请求。

3.公平锁

在大多数情况下,锁的申请都是非公平的。儿而重入锁可以对公平性进行控制,从而避免了线程饥饿现象。然而公平锁需要系统维护一个有序队列,成本较高,性能低下,因此非必要情况不要使用公平锁。

ReentrantLock 的实现,包含三要素:

  • 原子状态。原子状态使用 CAS 操作来存储当前锁的状态,判断锁是否被其他线程持有。
  • 等待队列(等待池)。
  • 阻塞原语 park(),unpark(),用来挂起和恢复线程。有关 park(),unpark() 的介绍,见下文线程阻塞工具类: LockSupport。

重入锁的搭档:Condition

ReentrantLock 和 Condition 的关系类似于synchronized 与 wait(),notify() 的关系。

Condition 接口提供基本方法如下:

  • void await() throws InterruptedException; 类似 wait 方法。
  • void awaitUninterruptibly(); 与 await() 方法基本相同。但不会在等待过程中响应线程中断。
  • void signal(); 与 notify() 方法类似。

信号量(Semaphore)

无论是内部锁还是重入锁,一次都只允许一个线程访问一个资源,而信号量可以制定多个线程访问同一个资源。

读写锁(ReadWriteLock)

ReadWriteLock是读写分离锁们可以有限帮助减少锁竞争,提升系统性能。

如果在系统中,读操作次数远远大于写操作,则可以使用读写锁。

倒计数器(CountDownLatch)

这个工具常用来控制线程等待,可以让一个线程等待到倒计数结束再执行。

public class CountDownLatchDemo implements Runnable {
    static final CountDownLatch end = new CountDownLatch(10);
    static final CountDownLatchDemo demo=new CountDownLatchDemo();
    @Override
    public void run() {
        try {
            //模拟检查任务
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println("check complete");
            end.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for(int i=0;i<10;i++){
            exec.submit(demo);
        }
        //等待检查
        end.await();
        //发射火箭
        System.out.println("Fire!");
        exec.shutdown();
    }
}

countDown 方法通知计数器一个线程已经完成,倒计数减一。主线程在CountDownLatch 上等待,所有检查任务完成后继续执行。

循环栅栏(CyclicBarrier)

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环栅栏。

CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。

线程阻塞工具类(LockSupport)

LockSupport 可以在线程内任意位置让线程阻塞,与 suspend 方法相比,弥补了由于 resume 方法发生导致线程无法继续执行的情况,和 wait 方法相比,他不需要获得锁也不会抛出中断异常。

LockSupport 包含静态方法 park 可以阻塞当前线程,unpark 方法可以解开。

LockSupport 使用类似信号量的机制,为每个线程准备了一个许可,如果这个许可可用,park 方法会返回,并把这个许可变为不可用,如果许可不可用,就阻塞线程。unpark 则相反。这个机制使得即使 unpark 方法操作发生在 park 之前,也可以使下一次的 park方法操作立即返回。

线程池

创建线程池的方式

①. newFixedThreadPool(int nThreads)
创建一个固定线程数量的线程池,每当提交一个任务就创建一个线程,直到达到线程池的最大数量,这时线程规模将不再变化,新的任务会暂存在任务队列中,待有线程空闲时便处理任务。

②. newCachedThreadPool()
创建一个可缓存的线程池,如果线程池的规模超过了处理需求,将自动回收空闲线程,而当需求增加时,则可以自动添加新线程,线程池的规模不存在任何限制。

③. newSingleThreadExecutor()
这是一个单线程的Executor,它创建单个工作线程来执行任务,如果这个线程异常结束,会创建一个新的来替代它;它的特点是能确保依照任务在队列中的顺序来串行执行。

④. newScheduledThreadPool(int corePoolSize)(推荐)
创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。他也有Single版本。

线程池实现原理

FixedThreadPoolSingleThreadExecutorCachedThreadPool 三个方法内部均使用了 ThreadPoolExecutor 类。

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数含义如下:

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 最大线程数量。
  • workQueue: 任务队列,当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
  • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :线程工厂,用于创建线程,一般用默认即可。
  • handler :拒绝策略。当任务太多来不及处理时,如何拒绝任务。

这里给出 ThreadPoolExecutor 的核心调度方法 execute() 方法:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     		//保存的线程池当前的一些状态信息
    		int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //如果再次检查线程池的状态不在运行,那就移除请求,然后采取拒绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                 // 如果线程池在运行,并且当前线程池为空就新创建一个线程并执行。
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
     		//如果不为空就直接添加执行
            else if (!addWorker(command, false))
                reject(command);
        }
  1. 如果 workerCountOf© 函数取得的线程池的线程总数小于 corePoolSize 核心线程数时,将任务通过 addWorker(command, true) 直接调度执行。
  2. 否则,会在 workQueue.offer(command) 准备进入等待队列,如果进入成功就等待。
  3. 如果进入等待队列失败,就判断最大线程池是否满,如果没有就直接把任务提交给线程池,如果满了就执行拒绝策略。

addWorker() 的第二个参数true表示核心线程数为限制,false 表示最大线程数为限制。

拒绝策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy 直接丢弃新进来无法处理的任务。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

ThreadFactory

自定义线程创建,ThreadFactory 是一个接口,只有一个创建线程的方法,我们可以通过这个自定义线程池。

下面这个例子就是自定义线程池:

public static void main(String[] args) throws InterruptedException {
		MyTask task = new MyTask();
		ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory(){
					@Override
					public Thread newThread(Runnable r) {
						Thread t= new Thread(r);
						t.setDaemon(true);
						System.out.println("create "+t);
						return t;
					}
				}
               );
		for (int i = 0; i < 5; i++) {
			es.submit(task);
		}
		Thread.sleep(2000);
	}
}

ForkJoin 框架

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。

并发集合

JUC包中包含了并发集合类有:ConcurrentHashMap,CopyOnWriteArrayList,ConcurrentLinkedQueue,

BlockingQueue,ConcurrentSkipListMap.

CAS(乐观锁)

CAS(Compare And Swap),比较交换。与锁相比,使用 CAS 会使程序看起来更复杂一些,但天生避免死锁,线程间的互相影响也小于锁的方式,拥有比锁更好的性能。

CAS 的算法过程:它包含三个参数V,E,N,其中 V(value)表示要更新的变量,E(expect)表示预期值,N(new)表示新值。仅当 V 值 等于 E 时,才会将 V 设置成 N,如果V 和 E 不同,说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS 返回当前 V 的真实值。

CAS 是抱着乐观的态度进行的,他总认为自己可以完成操作,当多个线程同时使用 CAS 操作以操作一个变量时,只有一个会胜出,其余会失败。失败的线程不会被挂起,仅是被告知失败,允许再次尝试或者放弃操作。基于这样的原理,CAS 操作即使没有锁也可以发现各个线程之间的干扰,进行恰当的处理。

高效读写的队列:ConcurrentLinkedQueue

ConcurrentLinkedQueue 这个队列使用链表作为其数据结构.ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 内部代码我们就不分析了,大家知道 ConcurrentLinkedQueue 主要使用 CAS 非阻塞算法来实现线程安全就好了。

ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替代。

高效读取:CopyOnWriteArrayList

读取完全不加锁,写入也不会阻塞读取,当写入时进行一次自我赋值进行操作,不修改原本的内容,详解请看 Java 容器分析。

数据共享通道:BlockingQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列非阻塞队列,其中阻塞队列的典型例子就是 BlockingQueue。

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

BlockingQueue是一个接口而不是一个具体实现,主要实现包括: ArrayBlockingQueue、LinkedBlockingQueue。

锁优化

提高锁性能的几点建议

1.减少锁持有时间,只在必要的时候进行同步

2.减小锁粒度,就是缩小锁定对象的范围,降低锁冲突的可能性。

3.在读多写少的场合使用读写分离锁替换独占锁

4.锁分离

5.锁粗化, 如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。 锁粗化就是把加锁的范围扩展(粗化)到整个操作序列的外部。

JVM对锁的优化

这里的锁优化主要是指 JVM 对 synchronized 的优化。

偏向锁

偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要。

当锁对象第一次被线程获得的时候,进入偏向状态,标记为 1 01。同时使用 CAS 操作将线程 ID 记录到 Mark Word 中,如果 CAS 操作成功,这个线程以后每次进入这个锁相关的同步块就不需要再进行任何同步操作。

当有另外一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定状态或者轻量级锁状态。如果锁竞争比较大的情况就不要使用了。

轻量级锁

如果偏向锁失败,虚拟机不会立即挂起线程,还会使用一种轻量级锁的优化手段, 轻量级锁是相对于传统的重量级锁而言,它使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。

如果 CAS 操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的虚拟机栈,如果是的话说明当前线程已经拥有了这个锁对象,那就可以直接进入同步块继续执行,否则说明这个锁对象已经被其他线程线程抢占了。如果有两条以上的线程争用同一个锁,那轻量级锁就不再有效,要膨胀为重量级锁。

自旋锁

自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。

锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。

ThreadLocal (待办)

死锁

多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。

如何避免线程死锁?

破坏互斥条件

这个条件我们没有办法破坏,因为我们用锁本来就是想让他们互斥的(临界资源需要互斥访问)。

破坏请求与保持条件

一次性申请所有的资源。

破坏不剥夺条件

占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源。

破坏循环等待条件

靠按序申请资源来预防。按某一顺序申请资源,释放资源则反序释放。破坏循环等待条件。