这 20 多个高并发编程必备的知识点,你都会吗?

转载自并发编程网 – ifeve.com
http://ifeve.com/%e9%ab%98%e5...

一、前言

借用Java并发编程实践中的话”编写正确的程序并不容易,而编写正常的并发程序就更难了”,相比于顺序执行的情况,多线程的线程安全问题是微妙而且出乎意料的,因为在没有进行适当同步的情况下多线程中各个操作的顺序是不可预期的,本文算是对多线程情况下同步策略的一个简单介绍。

二、 什么是线程安全问题

线程安全问题是指当多个线程同时读写一个状态变量,并且没有任何同步措施时候,导致脏数据或者其他不可预见的结果的问题。Java中首要的同步策略是使用Synchronized关键字,它提供了可重入的独占锁。

三、 什么是共享变量可见性问题

要谈可见性首先需要介绍下多线程处理共享变量时候的Java中内存模型。

Java内存模型规定了所有的变量都存放在主内存中,当线程使用变量时候都是把主内存里面的变量拷贝到了自己的工作空间或者叫做工作内存。

当线程操作一个共享变量时候操作流程为:*

  • 线程首先从主内存拷贝共享变量到自己的工作空间
  • 然后对工作空间里的变量进行处理
  • 处理完后更新变量值到主内存

那么假如线程A和B同时去处理一个共享变量,会出现什么情况那?

首先他们都会去走上面的三个流程,假如线程A拷贝共享变量到了工作内存,并且已经对数据进行了更新但是还没有更新会主内存(结果可能目前存放在当前cpu的寄存器或者高速缓存),这时候线程B拷贝共享变量到了自己的工作内存进行处理,处理后,线程A才把自己的处理结果更更新到主内存或者缓存,可知 线程B处理的并不是线程A处理后的结果,也就是说线程A处理后的变量值对线程B不可见,这就是共享变量的不可见性问题。

构成共享变量内存不可见原因是因为三步流程不是原子性操作,下面知道使用恰当同步就可以解决这个问题。

我们知道ArrayList是线程不安全的,因为他的读写方法没有同步策略,会导致脏数据和不可预期的结果,下面我们就一一讲解如何解决。

这是线程不安全的
public class ArrayList<E> 
{

    public E get(int index) {
        rangeCheck(index);

        return elementData(index);
    }

    public E set(int index, E element) {
        rangeCheck(index);

        E oldValue = elementData(index);
        elementData[index] = element;
        return oldValue;
    }
}

四、原子性

4.1 介绍

假设线程A执行操作Ao和线程B执行操作Bo ,那么从A看,当B线程执行Bo操作时候,那么Bo操作全部执行,要么全部不执行,我们称Ao和Bo操作互为原子性操作,在设计计数器时候一般都是先读取当前值,然后+1,然后更新会变量,是读-改-写的过程,这个过程必须是原子性的操作。

public class ThreadNotSafeCount {

    private  Long value;

    public Long getCount() {
        return value;
    }

    public void inc() {
        ++value;
    }
}

如上代码是线程不安全的,因为不能保证++value是原子性操作。方法一是使用Synchronized进行同步如下:

public class ThreadSafeCount {

    private  Long value;

    public synchronized Long getCount() {
        return value;
    }

    public synchronized void inc() {
        ++value;
    }
}

注意: 这里不能简单的使用volatile修饰value进行同步,因为变量值依赖了当前值

使用Synchronized确实可以实现线程安全,即实现可见性和同步,但是Synchronized是独占锁,没有获取内部锁的线程会被阻塞掉,那么有没有刚好的实现那?答案是肯定的。

4.2 原子变量类

原子变量类比锁更轻巧,比如AtomicLong代表了一个Long值,并提供了get,set方法,get,set方法语义和volatile相同,因为AtomicLong内部就是使用了volatile修饰的真正的Long变量。另外提供了原子性的自增自减操作,所以计数器可以改下为:

public class ThreadSafeCount {

    private  AtomicLong value = new AtomicLong(0L);

    public  Long getCount() {
        return value.get();
    }

    public void inc() {
        value.incrementAndGet();
    }
}

那么相比使用synchronized的好处在于原子类操作不会导致线程的挂起和重新调度,因为他内部使用的是cas的非阻塞算法。

常用的原子类变量为:

  • AtomicLong
  • AtomicInteger
  • AtomicBoolean
  • AtomicReference

五、CAS介绍

CAS 即CompareAndSet,也就是比较并设置,CAS有三个操作数分别为:内存位置,旧的预期值,新的值,操作含义是当内存位置的变量值为旧的预期值时候使用新的值替换旧的值。通俗的说就是看内存位置的变量值是不是我给的旧的预期值,如果是则使用我给的新的值替换他,如果不是返回给我旧值。这个是处理器提供的一个原子性指令。上面介绍的AtomicLong的自增就是使用这种方式实现:

public final long incrementAndGet() {
    for (;;) {
        long current = get();(1)
        long next = current + 1;(2)
        if (compareAndSet(current, next))(3)
            return next;
    }
}

public final boolean compareAndSet(long expect, long update) {
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

假如当前值为1,那么线程A和检查B同时执行到了(3)时候各自的next都是2,current=1,假如线程A先执行了3,那么这个是原子性操作,会把档期值更新为2并且返回1,if判断true所以incrementAndGet返回2.这时候线程B执行3,因为current=1而当前变量实际值为2,所以if判断为false,继续循环,如果没有其他线程去自增变量的话,这次线程B就会更新变量为3然后退出。

这里使用了无限循环使用CAS进行轮询检查,虽然一定程度浪费了cpu资源,但是相比锁来说避免的线程上下文切换和调度。

六、什么是可重入锁

当一个线程要获取一个被其他线程占用的锁时候,该线程会被阻塞,那么当一个线程再次获取它自己已经获取的锁时候是否会被阻塞那?如果不需要阻塞那么我们说该锁是可重入锁,也就是说只要该线程获取了该锁,那么可以无限制次数进入被该锁锁住的代码。

先看一个例子如果锁不是可重入的,看看会出现什么问题。

public class Hello{
     public Synchronized void helloA(){
        System.out.println("hello");
     }

     public Synchronized void helloB(){
        System.out.println("hello B");
        helloA();
     }

}

如上面代码当调用helloB函数前会先获取内置锁,然后打印输出,然后调用helloA方法,调用前会先去获取内置锁,如果内置锁不是可重入的那么该调用就会导致死锁了,因为线程持有并等待了锁。

实际上内部锁是可重入锁,例如synchronized关键字管理的方法,可重入锁的原理是在锁内部维护了一个线程标示,标示该锁目前被那个线程占用,然后关联一个计数器,一开始计数器值为0,说明该锁没有被任何线程占用,当一个线程获取了该锁,计数器会变成1,其他线程在获取该锁时候发现锁的所有者不是自己所以被阻塞,但是当获取该锁的线程再次获取锁时候发现锁拥有者是自己会把计数器值+1, 当释放锁后计数器会-1,当计数器为0时候,锁里面的线程标示重置为null,这时候阻塞的线程会获取被唤醒来获取该锁。

七、Synchronized关键字

7.1 Synchronized介绍

synchronized块是Java提供的一种强制性内置锁,每个Java对象都可以隐式的充当一个用于同步的锁的功能,这些内置的锁被称为内部锁或者叫监视器锁,执行代码在进入synchronized代码块前会自动获取内部锁,这时候其他线程访问该同步代码块时候会阻塞掉。拿到内部锁的线程会在正常退出同步代码块或者异常抛出后释放内部锁,这时候阻塞掉的线程才能获取内部锁进入同步代码块。

7.2 Synchronized同步实例

内部锁是一种互斥锁,具体说是同时只有一个线程可以拿到该锁,当一个线程拿到该锁并且没有释放的情况下,其他线程只能等待。

对于上面说的ArrayList可以使用synchronized进行同步来处理可见性问题。

使用synchronized对方法进行同步
public class ArrayList<E>
{

    public synchronized  E get(int index) {
        rangeCheck(index);

        return elementData(index);
    }

    public synchronized E set(int index, E element) {
        rangeCheck(index);

        E oldValue = elementData(index);
        elementData[index] = element;
        return oldValue;
    }
}

如图当线程A获取内部锁进入同步代码块后,线程B也准备要进入同步块,但是由于A还没释放锁,所以B现在进入等待,使用同步可以保证线程A获取锁到释放锁期间的变量值对B获取锁后都可见。也就是说当B开始执行A执行的代码同步块时候可以看到A操作的所有变量值,这里具体说是当线程B获取b的值时候能够保证获取的值是2。这时因为线程A进入同步块修改变量值后,会在退出同步块前把值刷新到主内存,而线程B在进入同步块前会首先清空本地内存内容,从主内存重新获取变量值,所以实现了可见性。但是要注意一点所有线程使用的是同一个锁。

注意: Synchronized关键字会引起线程上下文切换和线程调度。

八、 ReentrantReadWriteLock介绍

使用synchronized可以实现同步,但是缺点是同时只有一个线程可以访问共享变量,但是正常情况下,对于多个读操作操作共享变量时候是不需要同步的,synchronized时候无法实现多个读线程同时执行,而大部分情况下读操作次数多于写操作,所以这大大降低了并发性,所以出现了ReentrantReadWriteLock,它可以实现读写分离,多个线程同时进行读取,但是最多一个写线程存在。

对于上面的方法现在可以修改为:

public class ArrayList<E>
{
  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

  public E get(int index) {

        Lock readLock = readWriteLock.readLock();
        readLock.lock();
        try {
            return list.get(index);
        } finally {
            readLock.unlock();
        }
    }

    public E set(int index, E element) {

        Lock wirteLock = readWriteLock.writeLock();
        wirteLock.lock();
        try {
            return list.set(index, element);
        } finally {
            wirteLock.unlock();
        }
    }
}

如代码在get方法时候通过 readWriteLock.readLock()获取了读锁,多个线程可以同时获取这读锁,set方法通过readWriteLock.writeLock()获取了写锁,同时只有一个线程可以获取写锁,其他线程在获取写锁时候会阻塞直到写锁被释放。假如一个线程已经获取了读锁,这时候如果一个线程要获取写锁时候要等待直到释放了读锁,如果一个线程获取了写锁,那么所有获取读锁的线程需要等待直到写锁被释放。所以相比synchronized来说运行多个读者同时存在,所以提高了并发量。

注意: 需要使用者显示调用Lock与unlock操作

九、 Volatile变量

对于避免不可见性问题,Java还提供了一种弱形式的同步,即使用了volatile关键字。该关键字确保了对一个变量的更新对其他线程可见。当一个变量被声明为volatile时候,线程写入时候不会把值缓存在寄存器或者或者在其他地方,当线程读取的时候会从主内存重新获取最新值,而不是使用当前线程的拷贝内存变量值。

volatile虽然提供了可见性保证,但是不能使用他来构建复合的原子性操作,也就是说当一个变量依赖其他变量或者更新变量值时候新值依赖当前老值时候不在适用。

与synchronized相似之处在于如图

如图线程A修改了volatile变量b的值,然后线程B读取了改变量值,那么所有A线程在写入变量b值前可见的变量值,在B读取volatile变量b后对线程B都是可见的,图中线程B对A操作的变量a,b的值都可见的。volatile的内存语义和synchronized有类似之处,具体说是说当线程写入了volatile变量值就等价于线程退出synchronized同步块(会把写入到本地内存的变量值同步到主内存),读取volatile变量值就相当于进入同步块(会先清空本地内存变量值,从主内存获取最新值)。

下面的Integer也是线程不安全的,因为没有进行同步措施:

public class ThreadNotSafeInteger {

    private int value;

    public int get() {
        return value;
    }

    public void set(int value) {
        this.value = value;
    }
}

使用synchronized关键字进行同步如下:

public class ThreadSafeInteger {

    private int value;

    public synchronized int get() {
        return value;
    }

    public synchronized  void set(int value) {
        this.value = value;
    }
}

等价于使用volatile进行同步如下:

public class ThreadSafeInteger {

    private volatile int value;

    public int get() {
        return value;
    }

    public void set(int value) {
        this.value = value;
    }
}

这里使用synchronized和使用volatile是等价的,但是并不是所有情况下都是等价,一般只有满足下面所有条件才能使用volatile

  • 写入变量值时候不依赖变量的当前值,或者能够保证只有一个线程修改变量值。
  • 写入的变量值不依赖其他变量的参与。
  • 读取变量值时候不能因为其他原因进行加锁。

另外 加锁可以同时保证可见性和原子性,而volatile只保证变量值的可见性。

注意: volatile关键字不会引起线程上下文切换和线程调度。另外volatile还用来解决重排序问题,后面会讲到。

十、 乐观锁与悲观锁

10.1 悲观锁

悲观锁,指数据被外界修改持保守态度(悲观),在整个数据处理过程中,将数据处于锁定状态。 悲观锁的实现,往往依靠数据库提供的锁机制 。数据库中实现是对数据记录进行操作前,先给记录加排它锁,如果获取锁失败,则说明数据正在被其他线程修改,则等待或者抛出异常。如果加锁成功,则获取记录,对其修改,然后事务提交后释放排它锁。

一个例子:select * from 表 where .. for update;

悲观锁是先加锁再访问策略,处理加锁会让数据库产生额外的开销,还有增加产生死锁的机会,另外在多个线程只读情况下不会产生数据不一致行问题,没必要使用锁,只会增加系统负载,降低并发性,因为当一个事务锁定了该条记录,其他读该记录的事务只能等待。

10.2 乐观锁

乐观锁是相对悲观锁来说的,它认为数据一般情况下不会造成冲突,所以在访问记录前不会加排他锁,而是在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,具体说根据update返回的行数让用户决定如何去做。乐观锁并不会使用数据库提供的锁机制,一般在表添加version字段或者使用业务状态来做。

乐观锁直到提交的时候才去锁定,所以不会产生任何锁和死锁。

十一、独占锁与共享锁

根据锁能够被单个线程还是多个线程共同持有,锁又分为独占锁和共享锁。独占锁保证任何时候都只有一个线程能读写权限,ReentrantLock就是以独占方式实现的互斥锁。共享锁则可以同时有多个读线程,但最多只能有一个写线程,读和写是互斥的,例如ReadWriteLock读写锁,它允许一个资源可以被多线程同时进行读操作,或者被一个线程 写操作,但两者不能同时进行。

独占锁是一种悲观锁,每次访问资源都先加上互斥锁,这限制了并发性,因为读操作并不会影响数据一致性,而独占锁只允许同时一个线程读取数据,其他线程必须等待当前线程释放锁才能进行读取。

共享锁则是一种乐观锁,它放宽了加锁的条件,允许多个线程同时进行读操作。

十二、公平锁与非公平锁

根据线程获取锁的抢占机制锁可以分为公平锁和非公平锁,公平锁表示线程获取锁的顺序是按照线程加锁的时间多少来决定的,也就是最早加锁的线程将最早获取锁,也就是先来先得的FIFO顺序。而非公平锁则运行闯入,也就是先来不一定先得。

ReentrantLock提供了公平和非公平锁的实现:

  • 公平锁ReentrantLock pairLock = new ReentrantLock(true);
  • 非公平锁 ReentrantLock pairLock = new ReentrantLock(false);

如果构造函数不传递参数,则默认是非公平锁。

在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销。
假设线程A已经持有了锁,这时候线程B请求该锁将会被挂起,当线程A释放锁后,假如当前有线程C也需要获取该锁,如果采用非公平锁方式,则根据线程调度策略线程B和C两者之一可能获取锁,这时候不需要任何其他干涉,如果使用公平锁则需要把C挂起,让B获取当前锁。

十三、 AbstractQueuedSynchronizer介绍

AbstractQueuedSynchronizer提供了一个队列,大多数开发者可能从来不会直接用到AQS,AQS有个变量用来存放状态信息 state,可以通过protected的getState,setState,compareAndSetState函数进行调用。对于ReentrantLock来说,state可以用来表示该线程获可重入锁的次数,semaphore来说state用来表示当前可用信号的个数,FutuerTask用来表示任务状态(例如还没开始,运行,完成,取消)。

十四、CountDownLatch原理

14.1 一个例子

public class Test {

    private static final int ThreadNum = 10;

    public static void main(String[] args)  {

        //创建一个CountDownLatch实例,管理计数为ThreadNum
        CountDownLatch countDownLatch = new CountDownLatch(ThreadNum);

        //创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(ThreadNum);

        //添加线程到线程池
        for(int i =0;i<ThreadNum;++i){
            executor.execute(new Person(countDownLatch, i+1));
        }

        System.out.println("开始等待全员签到...");

        try {
            //等待所有线程执行完毕
            countDownLatch.await();
            System.out.println("签到完毕,开始吃饭");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            executor.shutdown();
        }

    }

    static class Person implements Runnable{

        private CountDownLatch countDownLatch;
        private int index;

        public Person(CountDownLatch cdl,int index){
            this.countDownLatch = cdl;
            this.index = index;
        }

        @Override
        public void run() {

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("person " + index +"签到");

            //线程执行完毕,计数器减一
            countDownLatch.countDown();

        }

    }
}

如上代码,创建一个线程池和CountDownLatch实例,每个线程通过构造函数传入CountDownLatch的实例,主线程通过await等待线程池里面线程任务全部执行完毕,子线程则执行完毕后调用countDown计数器减一,等所有子线程执行完毕后,主线程的await才会返回。

14.2 原理

先看下类图:

可知CountDownLatch内部还是使用AQS实现的。
首先通过构造函数初始化AQS的状态值

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
Sync(int count) {
    setState(count);
}

然后看下await方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程被中断则抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试看当前是否计数值为0,为0则直接返回,否者进入队列等待
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

如果tryAcquireShared返回-1则 进入doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
       //加入队列状态为共享节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                       //如果多个线程调用了await被放入队列则一个个返回。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //shouldParkAfterFailedAcquire会把当前节点状态变为SIGNAL类型,然后调用park方法把当先线程挂起,
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

调用await后,当前线程会被阻塞,直到所有子线程调用了countdown方法,并在计数为0时候调用该线程unpark方法激活线程,然后该线程重新tryAcquireShared会返回1。

然后看下 countDown方法:

委托给sync
public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

首先看下tryReleaseShared

protected boolean tryReleaseShared(int releases) {
    //循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一更新到state
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

该函数一直返回false直到当前计数器为0时候才返回true。
返回true后会调用doReleaseShared,该函数主要作用是调用uppark方法激活调用await的线程,代码如下:

private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //节点类型为SIGNAL,把类型在通过cas设置回去,然后调用unpark激活调用await的线程
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

激活主线程后,主线程会在调用tryAcquireShared获取锁。

十五、ReentrantLock独占锁原理

15.1 ReentrantLock结构

可知ReentrantLock最终还是使用AQS来实现,并且根据参数决定内部是公平还是非公平锁,默认是非公平锁

 public ReentrantLock() {
        sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

加锁代码:

public void lock() {
        sync.lock();
}

15.2 公平锁原理

lock方法最终调用FairSync重写的tryAcquire方法

protected final boolean tryAcquire(int acquires) {
            //获取当前线程和状态值
            final Thread current = Thread.currentThread();
            int c = getState();
           //状态为0说明该锁未被任何线程持有
            if (c == 0) {
             //为了实现公平,首先看队列里面是否有节点,有的话再看节点所属线程是不是当前线程,是的话hasQueuedPredecessors返回false,然后使用原子操作compareAndSetState保证一个线程更新状态为1,设置排他锁归属为当前线程。其他线程通过cass则返回false.
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
//状态不为0说明该锁已经被线程持有,则看是否是当前线程持有,是则重入锁次数+1.
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");

                setState(nextc);
                return true;
            }
            return false;
        }
    }

公平性保证代码:

    public final boolean hasQueuedPredecessors() {

        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

再看看unLock方法,最终调用了Sync的tryRelease方法:

protected final boolean tryRelease(int releases) {
           //如果不是锁持有者调用UNlock则抛出异常。
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
           //如果当前可重入次数为0,则清空锁持有线程
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //设置可重入次数为原始值-1
            setState(c);
            return free;
        }

15.3 非公平锁原理

final void lock() {

   //如果当前锁空闲0,则设置状态为1,并且设置当前线程为锁持有者
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);//调用重写的tryAcquire方法->nonfairTryAcquire方法
}
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//状态为0说明没有线程持有该锁
                if (compareAndSetState(0, acquires)) {//cas原子性操作,保证只有一个线程可以设置状态
                    setExclusiveOwnerThread(current);//设置锁所有者
                    return true;
                }
            }//如果当前线程是锁持有者则可重入锁计数+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

15.3 总结

可知公平与非公平都是先执行tryAcquire尝试获取锁,如果成功则直接获取锁,如果不成功则把当前线程放入队列。对于放入队列里面的第一个线程A在unpark后会进行自旋调用tryAcquire尝试获取锁,假如这时候有一个线程B执行了lock操作,那么也会调用tryAcquire方法尝试获取锁,但是线程B并不在队列里面,但是线程B有可能比线程A优先获取到锁,也就是说虽然线程A先请求的锁,但是却有可能没有B先获取锁,这是非公平锁实现。而公平锁要保证线程A要比线程B先获取锁。所以公平锁相比非公平锁在tryAcquire里面添加了hasQueuedPredecessors方法用来保证公平性。

十六、ReentrantReadWriteLock原理

如图读写锁内部维护了一个ReadLock和WriteLock,并且也提供了公平和非公平的实现,下面只介绍下非公平的读写锁实现。我们知道AQS里面只维护了一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态,一个state是无法表示写和读状态的。所以ReentrantReadWriteLock使用state的高16位表示读状态也就是读线程的个数,低16位表示写锁可重入量。

static final int SHARED_SHIFT   = 16;

共享锁(读锁)状态单位值65536 
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
共享锁线程最大个数65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

排它锁(写锁)掩码 二进制 15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 返回读锁线程数  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

16.1 WriteLock

lock 获取锁


protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    //c!=0说明读锁或者写锁已经被某线程获取
    if (c != 0) {
        //w=0说明已经有线程获取了读锁或者w!=0并且当前线程不是写锁拥有者,则返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
       //说明某线程获取了写锁,判断可重入个数
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");

       // 设置可重入数量(1)
        setState(c + acquires);
        return true;
    }

   //写线程获取写锁
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
    

unlock 释放锁:

protected final boolean tryRelease(int releases) {
// 看是否是写锁拥有者调用的unlock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
//获取可重入值,这里没有考虑高16位,因为写锁时候读锁状态值肯定为0
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
//如果写锁可重入值为0则释放锁,否者只是简单更新状态值。
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

16.2 ReadLock

对应读锁只需要分析下Sync的tryAcquireShared和tryReleaseShared

lock 获取锁:

protected final int tryAcquireShared(int unused) {

 //获取当前状态值
  Thread current = Thread.currentThread();
  int c = getState();

  //如果写锁计数不为0说明已经有线程获取了写锁,然后看是不是当前线程获取的写锁。
  if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
      return -1;

  //获取读锁计数
  int r = sharedCount(c);
  //尝试获取锁,多个读线程只有一个会成功,不成功的进入下面fullTryAcquireShared进行重试
  if (!readerShouldBlock() &&
      r < MAX_COUNT &&
      compareAndSetState(c, c + SHARED_UNIT)) {
      if (r == 0) {
          firstReader = current;
          firstReaderHoldCount = 1;
      } else if (firstReader == current) {
          firstReaderHoldCount++;
      } else {
          HoldCounter rh = cachedHoldCounter;
          if (rh == null || rh.tid != current.getId())
              cachedHoldCounter = rh = readHolds.get();
          else if (rh.count == 0)
              readHolds.set(rh);
          rh.count++;
      }
      return 1;
  }
  return fullTryAcquireShared(current);
}

unlock 释放锁:

protected final boolean tryReleaseShared(int unused) {
  Thread current = Thread.currentThread();
  if (firstReader == current) {
      // assert firstReaderHoldCount > 0;
      if (firstReaderHoldCount == 1)
          firstReader = null;
      else
          firstReaderHoldCount--;
  } else {
      HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != current.getId())
          rh = readHolds.get();
      int count = rh.count;
      if (count <= 1) {
          readHolds.remove();
          if (count <= 0)
              throw unmatchedUnlockException();
      }
      --rh.count;
  }

  //循环直到自己的读计数-1 cas更新成功
  for (;;) {
      int c = getState();
      int nextc = c - SHARED_UNIT;
      if (compareAndSetState(c, nextc))

          return nextc == 0;
  }
}

十七、 什么是重排序问题

Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序可以保证最终执行的结果是与程序顺序执行的结果一致,并且只会对不存在数据依赖性的指令进行重排序,这个重排序在单线程下对最终执行结果是没有影响的,但是在多线程下就会存在问题。

一个例子

int a = 1;(1)
int b = 2;(2)
int c= a + b;(3)

如上c的值依赖a和b的值,所以重排序后能够保证(3)的操作在(2)(1)之后,但是(1)(2)谁先执行就不一定了,这在单线程下不会存在问题,因为并不影响最终结果。

一个多线程例子

public static class ReadThread extends Thread {
        public void run() {

            while(!Thread.currentThread().isInterrupted()){
                if(ready){(1)
                    System.out.println(num+num);(2)
                }
                System.out.println("read thread....");
            }

        }
    }

    public static class Writethread extends Thread {
        public void run() {
             num = 2;(3)
             ready = true;(4)
             System.out.println("writeThread set over...");
        }
    }

    private static int num =0;
    private static boolean ready = false;

    public static void main(String[] args) throws InterruptedException {

        ReadThread rt = new ReadThread();
        rt.start();

        Writethread  wt = new Writethread();
        wt.start();

        Thread.sleep(10);
        rt.interrupt();
        System.out.println("main exit");
    }

如代码由于(1)(2)(3)(4) 之间不存在依赖,所以写线程(3)(4)可能被重排序为先执行(4)在执行(3),那么执行(4)后,读线程可能已经执行了(1)操作,并且在(3)执行前开始执行(2)操作,这时候打印结果为0而不是4.

解决:使用volatile 修饰ready可以避免重排序。

十八、 什么是中断

Java中断机制是一种线程间协作模式,通过中断并不能直接终止另一个线程,而是需要被中断的线程根据中断状态自行处理。

例如当线程A运行时,线程B可以调用A的 interrupt()方法来设置中断标志为true,并立即返回。设置标志仅仅是设置标志,线程A并没有实际被中断,会继续往下执行的,然后线程A可以调用isInterrupted方法来看自己是不是被中断了,返回true说明自己被别的线程中断了,然后根据状态来决定是否终止自己活或者干些其他事情。

Interrupted经典使用代码

public void run(){    
    try{    
         ....    
         //线程退出条件
         while(!Thread.currentThread().isInterrupted()&& more work to do){    
                // do more work;    
         }    
    }catch(InterruptedException e){    
                // thread was interrupted during sleep or wait    
    }    
    finally{    
               // cleanup, if required    
    }    
}

使用场景

故意调用interrupt()设置中断标志,作为线程退出条件:

  public static class MyThread extends Thread {
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {

                System.out.println("do Someing....");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {

        MyThread t = new MyThread();
        t.start();
        Thread.sleep(1000);
        t.interrupt();
    }

当线程中为了等待一些特定条件的到来时候,一般会调用Thread.sleep(),wait,join方法在阻塞当前线程,比如sleep(3000);那么到3s后才会从阻塞下变为激活状态,但是有可能在在3s内条件已经满足了,这时候可以调用该线程的interrupt方法,sleep方法会抛出InterruptedException异常,线程恢复激活状态:

 public static class SleepInterrupt extends Object implements Runnable{  
        public void run(){  
            try{  
                System.out.println("thread-sleep for 2000 seconds"); 

                Thread.sleep(2000000);  
                System.out.println("thread -waked up");  
            }catch(InterruptedException e){  
                System.out.println("thread-interrupted while sleeping");  

                return;    
            }  
            System.out.println("thread-leaving normally");  
        }  
    }

    public static void main(String[] args) throws InterruptedException {

        SleepInterrupt si = new SleepInterrupt();  
        Thread t = new Thread(si);  
        t.start();  

        //主线程休眠2秒,从而确保刚才启动的线程有机会执行一段时间  
        try {  
            Thread.sleep(2000);   
        }catch(InterruptedException e){  
            e.printStackTrace();  
        }  
        System.out.println("main() - interrupting other thread");  
        //中断线程t  
        t.interrupt();  

        System.out.println("main() - leaving");  

    }

InterruptedException的处理
如果抛出 InterruptedException那么就意味着抛出异常的方法是阻塞方法,比如Thread.sleep,wait,join。
那么接受到异常后如何处理的,醉简单的是直接catch掉,不做任何处理,但是中断发生一般是为了取消任务或者退出线程来使用的,所以如果直接catch掉那么就会失去做这些处理的时机,出发你能确定不需要根据中断条件做其他事情。

  • 第一种方式 catch后做一些清理工作,然后在throw出去
  • 第二种方式 catch后,重新设置中断标示

十九、FutureTask 原理

19.1 一个例子

static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("子线程在进行计算");
            Thread.sleep(1000);
            int sum = 0;
            for (int i = 0; i < 100; i++)
                sum += i;
            return sum;
        }
    }

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);

        System.out.println("主线程在执行任务");

        try {
            System.out.println("task运行结果" + futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("所有任务执行完毕");
        executor.shutdown();

    }

如上代码主线程会在futureTask.get()出阻塞直到task任务执行完毕,并且会返回结果。

19.2原理

FutureTask 内部有一个state用来展示任务的状态,并且是volatile修饰的:

/** Possible state transitions:
 * NEW -> COMPLETING -> NORMAL 正常的状态转移
 * NEW -> COMPLETING -> EXCEPTIONAL 异常
 * NEW -> CANCELLED 取消
 * NEW -> INTERRUPTING -> INTERRUPTED 中断
 */

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

其中构造FutureTask实例时候状态为new

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;      
}

把FutureTask提交到线程池或者线程执行start时候会调用run方法:

public void run() {

    //如果当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程可以成功。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        //当前状态为new 则调用任务的call方法执行任务
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移
            }

            //执行任务成功则保存结果更新状态,unpark所有等待线程。
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
    //状态从new->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        //状态从COMPLETING-》NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        //unpark所有等待线程。
        finishCompletion();
    }
}

任务提交后,会调用 get方法获取结果,这个get方法是阻塞的。

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //如果当前状态是new或者COMPLETING则等待,因为位normal或者exceptional时候才说明数据计算完成了。
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {

        //如果被中断,则抛异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        //组建单列表
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {

            nanos = deadline - System.nanoTime();
            //超时则返回
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //否者设置park超时时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            //直接挂起当前线程
            LockSupport.park(this);
    }
}
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
}

在submit任务后还可以调用futuretask的cancel来取消任务:

public boolean cancel(boolean mayInterruptIfRunning) {
        //只有任务是new的才能取消
        if (state != NEW)
            return false;
       //运行时允许中断
        if (mayInterruptIfRunning) {
           //完成new->INTERRUPTING
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            //完成INTERRUPTING->INTERRUPTED
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
       //不允许中断则直接new->CANCELLED
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
    }

二十、ConcurrentHashMap原理简述

翻看ConcurrentHashMap的源码知道ConcurrentHashMap使用分离锁,整个map分段segment,每个segments是继承了ReentrantLock,使用ReentrantLock的独占锁用来控制同一个段只能有一个线程进行写,但是不同段可以多个线程同时写。另外无论是段内还是段外多个线程都可以同时读取,因为他使用了volatile语义的读,并没加锁。并且当前段有写线程时候,该段也允许多个读线程存在。

put的大概逻辑,首先计算key的hash值,然后根据一定算法(位移和与操作)计算出该元素应该放到那个segment,然后调用segment.put方法,该方法里面使用ReentrantLock进行写控制,第一个线程tryLock获取锁进行写入,其他写线程则自旋调用tryLock 循环尝试。

get的大概逻辑,使用UNSAFE.getObjectVolatile 在不加锁情况下获取volatile语义的值。

关注公众号《架构文摘》,每天一篇架构领域重磅好文,涉及一线互联网公司应用架构(高可用、高性能、高稳定)、大数据、机器学习、Java架构等各个热门领域。

Image placeholder
xuyongkaijs
未设置
  69人点赞

没有讨论,发表一下自己的看法吧

推荐文章
云原生时代,分布式系统设计必备知识图谱(内含22个知识点)

作者|杨泽强(竹涧)阿里云技术专家我们身处于一个充斥着分布式系统解决方案的计算机时代,无论是支付宝、微信这样顶级流量产品、还是区块链、IOT等热门概念、抑或如火如荼的容器生态技术如Kubernetes

专业人士必备的10个渗透测试工具

渗透测试,也被称为穿透测试或道德黑客攻击,就像电影《Sneakers》中那样,黑客顾问在攻击者之前侵入你的公司网络,找出弱点。这是一个模拟的网络攻击,pentester使用恶意黑客可用的工具和技术。在

Java并发编程,深入理解ReentrantLock

ReentrantLock简介ReentrantLock重入锁, 是实现Lock接口的一个类 ,也是在实际编程中使用频率很高的一个锁,支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次

一些JavaScript 类(class)中需要了解的知识

JavaScript使用原型继承:每个对象都从原型对象继承属性和方法。在Java或Swift等语言中使用的传统类作为创建对象的蓝图,在JavaScript中不存在,原型继承仅处理对象。原型继承可以模拟

可能是全网最好的MySQL重要知识点

什么是MySQL?MySQL是一种关系型数据库,在Java企业级开发中非常常用,因为MySQL是开源免费的,并且方便扩展。阿里巴巴数据库系统也大量用到了MySQL,因此它的稳定性是有保障的。MySQL

面试题总结:可能是全网最好的MySQL重要知识点

作者:Snailclimb 整理编辑:SegmentFault本文原载于SegmentFault专栏JavaGuide,如侵删。标题有点标题党的意思,但希望你在看了文章之后不会有这个想法——这篇文章是

Vuex的一些常用知识点介绍

一、为什么要使用Vuex1、多个组件依赖同一个状态,使用组件之间通信方法会非常繁琐,例如多层嵌套组件。2、需要全局保存的数据,例如用户、权限信息,全局系统设置二、Vuex的五个核心属性1、state:

css 常用知识点全在这里了。哪位大侠还能补充?

努力学习与总结是对自己能力的提升,也希望能帮助到同学们。BFC块状格式化上下文(blockformattingcontext)简称BFC:是页面上的一个隔离的独立容器,容器里面的子元素不会影响到外面的

3种可靠的物联网开发编程语言

物联网设备的普及程度持续上升,人们与物联网的联系愈加紧密。物联网为结合虚拟和现实世界提供了最大的平台。大多数支持IoT设备的命令都可以通过智能手机上的一个图标来实现。物联网的发展和成长不能归结为某一方

高并发下的接口幂等性解决方案!

一、背景我们实际系统中有很多操作,是不管做多少次,都应该产生一样的效果或返回一样的结果。例如:前端重复提交选中的数据,应该后台只产生对应这个数据的一个反应结果。我们发起一笔付款请求,应该只扣用户账户一

96秒100亿!如何抗住双11高并发流量?

今年双11全民购物狂欢节进入第十一个年头,1分36秒,交易额冲到100 亿 !比2018年快了近30 秒,比2017年快了近1分半!这个速度再次刷新天猫双11成交总额破100亿的纪录。那么如何抗住双1

慌了,居然被问到怎么做高并发系统的限流

来源:uee.me/cDuRD在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。本文结合作者的一些经验介绍限流的相关概念、算法和常规的实现方式。缓存缓存比较好理解,在大型高并发系统中,如果没

1000亿文本信息,高并发MD5查询,这么大数据量的业务怎么弄?

==提问== 沈老师,你好,想请教一个身份证信息检索的问题。公司有一个每秒5万并发查询的业务,(假设)根据身份证MD5查询身份证信息,目前有1000亿条数据,纯文本存储,前几天看你写LevelDB,请

阿里支付宝架构师:谈谈我眼中的高并发架构【好文】

来源:my.oschina.net/u/3772106/blog/1793561前言高并发经常会发生在有大活跃用户量,用户高聚集的业务场景中,如:秒杀活动,定时领取红包等。为了让业务可以流畅的运行并且

架构师眼中的高并发架构

前言高并发经常发生在有大活跃用户量和用户高聚集的业务场景中,如:秒杀活动、定时领取红包等。为了让业务可以流畅的运行并且给用户一个好的交互体验,我们需要根据业务场景预估达到的并发量等因素,来设计适合自己

阿里达摩院 vs Gartner:2020 科技趋势预测,你更信谁?

信息革命、移动互联网革命尚未落幕,智能革命又像一头大象一样撞进人类的生活,激荡着整个世界。任何足够先进的科技,初看都与魔法无异,但魔法背后是对规律和趋势的洞悉。2020年初,阿里巴巴旗下达摩院发布了2

2020年,你是否更关注个人数据隐私了?

日前,阿里达摩院发布了2020年十大科技趋势,其中趋势九指出保护数据隐私的AI技术将加速落地。“数据流通所产生的合规成本越来越高。使用AI技术保护数据隐私正在成为新的技术热点,其能够在保证各方数据安全

揭秘!一个高准确率的Flutter埋点框架如何设计

背景用户行为埋点是用来记录用户在操作时的一系列行为,也是业务做判断的核心数据依据,如果缺失或者不准确将会给业务带来不可恢复的损失。闲鱼将业务代码从Native迁移到Flutter上过程中,发现原先Na

传统灾备的痛大家都懂,阿里云定义企业级云灾备

灾备,并不是一个新鲜词,它起源于70年代。灾备,顾名思义,即容灾+备份。容灾,通常是在相隔较远的两地(同城或者异地)建立两套或多套功能相同的IT系统,目的是为了在遭遇灾害时,能保证业务连续性。而备份是

挑重点,第五届甲骨文数据库云技术大会小记

今年,是笔者参加甲骨文数据库云技术大会的第5个年头,从第一届到第五届,一届没落下。今年,也是特殊的一年,从1989年进入中国市场,甲骨文在中国走过了30年,今年上半年,甲骨文中国刚经历了一轮大裁员,而

挑重点,详解华为最新 “一奖两组”

继开源GaussDB,成立鲲鹏智能数据产业联盟数据库产业推进组等之后,华为围绕数据基础设施,构建数据产业格局和生态方面再爆大动作。11月19日,“以引领数据基础设施,携手迈入智能时代”为主题,由华为主

Spring Boot到底是怎么运行的,你知道吗?

导读SpringBoot方式的项目开发已经逐步成为Java应用开发领域的主流框架,它不仅可以方便地创建生产级的Spring应用程序,还能轻松地通过一些注解配置与目前比较流行的微服务框架SpringCl

DTCC | 云数据库时代已来,你准备好了吗?

作为基础软件之一,数据库一直是企业IT系统的核心,过去数十年,数据库技术发展缓慢。而随着云计算的到来及相关技术的不断成熟推动了数据库行业的快速发展,传统数据库铁打的防线也正在被撕裂。截至目前,全球主流

BAT大牛推荐开发人员必备Spring源码剖析文档,深度剖析Spring

为什么学习读源码我们每天都和代码打交道。经过数年的基础教育和职业培训,大部分程序员都会「写」代码,或者至少会抄代码和改代码。但是,会读代码的并不在多数,会读代码又真正读懂一些大项目的源码的,少之又少。

Python程序员进阶必备:从新手到高手的100个模块

在知乎和CSDN的圈子里,经常看到、听到一些python初学者说,学完基础语法后,不知道该学什么,学了也不知道怎么用,一脸的茫然。近日,CSDN的公众号推送了一篇博客,题目叫做《迷思:Python学到