Java 的源代码学习(4)——ReentrantLock 相关类(上)

“Java 的源代码学习”系列

(1)基本类型和对应的类

(2)HashMap 和 ConcurrentHashMap

(3)ConcurrentSkipListMap

(4)ReentrantLock 相关类(上)(本文)

经过前两篇与各种基础类库的 CAS 的“战斗”,大部分人估计已经晕了。在大多数情况,我们并不会直接参与 CAS 相关的实现细节,而是使用 JDK 提供的各种并发控制的类包。

例如我们常见的锁机制。这里老夫就要来研究一下 ReentrantLock 这个类相关的源代码实现。

老夫把此篇拆为上、下两篇,上篇(本文)是在看 ReentrantLock 相关代码前的一些知识准备(包括对 ThreadLocal 类的代码学习)。在下篇再开始读 ReentrantLock 相关的代码。

现在博客用的垃圾代码高亮插件貌似有问题,如果遇到下面的代码混乱一片,通常刷新下页面就行了。等老夫有空再把这个插件换掉。

一、Object 对象提供的同步机制

先回忆一下 Object 这个类提供的并发控制机制。比如我们要写一个循环队列,当队列满的时候,入队线程被挂起;当有元素出队时,唤醒被挂起的线程。出队的示例代码如下(这些代码抄自这里):

      synchronized (taskQueue)
      {
         while (taskQueue.isEmpty())
         {
            taskQueue.wait();
         }
         int i = (Integer) taskQueue.remove(0);
         taskQueue.notifyAll();
      }

而入队的代码如下:

      synchronized (taskQueue)
      {
         while (taskQueue.size() == MAX_CAPACITY)
         {
            taskQueue.wait();
         }
         taskQueue.add(i);
         taskQueue.notifyAll();
      }

首先要明确两点:

  • 在调用 wait 或 notifyAll 方法时,必须要获得该对象的监视器(monitor),也就是上面加的 synchronized 关键字。在没有 monitor 的情况下调用 wait 或 notifyAll,Java 会抛出 IllegalMonitorStateException 的异常。在 wait 返回后,会自动获得 monitor。
  • 线程被唤醒后,必须要再次判断条件是否成立,也就是要写到一个 while 循环里面。这个应该是没啥好说的。
  • 对于上面的代码,不能用 notify 代替 notifyAll。因为使用 notify 后被唤醒的线程是随机选取的,比如入队后我们调用 notify,此时被唤醒的线程可能是另一个入队的线程,而不是我们所期望的出队线程。因此我们只能 notifyAll 让这些线程去竞争,以“期待”出队线程能够被选中。

至于一个线程何时被 notify(也就是调用的 wait 方法返回),Java 的文档中指出,当满足下面四种条件之一时,wait 方法会返回:

  • 其他线程调用了此对象的 notify 方法,并且当前线程被选择为唤醒线程。
  • 其他线程调用了此对象的 notifyAll 方法。
  • 当前线程被中断(调用了 interrupt 方法)。
  • 如果在调用 wait 时指定了超时时间,并且时限已过。

此外要特别注意,上面四种情况之一并不是线程被唤醒的充分必要条件,即使上述条件都不满足,线程还是【有可能】被唤醒,这种情况叫“spurious wakeup”,此时无论如何用 while 循环判断条件是必要的(虽然概率很低,但是一旦发生,便会让人找不着头脑)。

可以看出直接使用 Object 类的这些方法在唤醒时会比较混乱,这样导致的后果就是很多不必要的竞争。所以现在大多数情况都是使用的其他类包,比如上述队列可以改用 Lock 和 Condition 来实现,这也就是老夫需要分析的重点内容。老夫以前做过的一道题目,就是使用其他类库代替 Object 类的这些方法来控制同步,这也是 Google 的一道面试题,具体参见这里

二、ThreadLocal 和 ThreadLocalMap

JDK 提供了 ThreadLocal<T> 类用于保存线程的私有变量。例如乃想为每一个线程分配一个 id,但这个 id 仅仅是为了不重复而设定的,那使用 ThreadLocal 就很好啦,JDK 的示例代码如下:

 import java.util.concurrent.atomic.AtomicInteger;

 public class ThreadId {
     // Atomic integer containing the next thread ID to be assigned
     private static final AtomicInteger nextId = new AtomicInteger(0);

     // Thread local variable containing each thread's ID
     private static final ThreadLocal<Integer> threadId =
         new ThreadLocal<Integer>() {
             @Override protected Integer initialValue() {
                 return nextId.getAndIncrement();
         }
     };

     // Returns the current thread's unique ID, assigning it if necessary
     public static int get() {
         return threadId.get();
     }
 }

在实现上,Thread 类有两个 ThreadLocal.ThreadLocalMap 类型的变量 threadLocals 和 inheritableThreadLocals。ThreadLocalMap 是一个内部的静态类,其成员变量的定义如下:

    static class ThreadLocalMap {
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

        private static final int INITIAL_CAPACITY = 16;
        private Entry[] table;
        private int size = 0;
        private int threshold; // Default to 0

        ......
    }

这里的 Entry 是一个 WeakReference,同时注意 super(k) 这句话,弱引用的是 ThreadLocal<?> 这个对象。这样做的好处是,当 ThreadLocal 被回收后,相关的 Entry 也就可以扔掉了,这样使 ThreadLocal 被回收后,Entry 能够“自动”被回收,而不用把处理 Entry 的相关代码糅杂在其他类中。

这个 Map 实际上是一个单线程访问(废话!)的哈希表,在有冲突时,直接放到下一格去:

        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

很显然,冲突是一个麻烦事。先看一下 set 方法:

        private void set(ThreadLocal<?> key, Object value) {

            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();

                if (k == key) {
                    e.value = value;
                    return;
                }

                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

如果相关区域已经存在元素,并且 key 相同,直接替换 value,这是最简答的情况。如果 key 为 null,就麻烦了,因为相关的 ThreadLocal 已经被回收(注意,如果此区域不存在元素,e 就为 null,而不是 e.get() 为 null),此时调用 replaceStaleEntry 方法。

        private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            int slotToExpunge = staleSlot;
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

看上去像是向前找一个被回收的 Entry(直到遇到空格子前)。这是在干啥呢?

            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                if (k == key) {
                    e.value = value;

                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

接着向后遍历每一个格子,如果遇到 key 与待插入的相同的 Entry,则将该 Entry 与待插入的位置的元素进行交换。然后调用 expungeStaleEntry 方法对这个格子进行回收。

        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;

            // expunge entry at staleSlot
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            size--;

            // Rehash until we encounter null
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {
                    int h = k.threadLocalHashCode & (len - 1);
                    if (h != i) {
                        tab[i] = null;

                        // Unlike Knuth 6.4 Algorithm R, we must scan until
                        // null because multiple entries could have been stale.
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }

这样子在遇到下一个 null 之前,把所有 key 为 null 的(也就是被回收的 ThreadLocal 相关的值)回收掉。如果不是 null,则尝试把它放到“正确”的位置上(指直接映射的位置),以提高第一次命中的概率。

而 cleanSomeSlots,则是扫描 log n 次元素把可以回收的干掉。至于为啥是 log n 次,看似是随便估计的,使其能够回收一定量的格子,又不至于耗时太久。

        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                i = nextIndex(i, len);
                Entry e = tab[i];
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    i = expungeStaleEntry(i);
                }
            } while ( (n >>>= 1) != 0);
            return removed;
        }

所以可以看出,在插入元素的时候,ThreadLocalMap 的复杂度是 O(n),因为它要负责清理一些陈旧的元素。因此,使用 WeakReference 的实现模式带来的负面影响就是实现起来很麻烦,而且时间复杂度稍高。不过带来的好处就是 Thread 类不用负责 ThreadLocal 里元素的生命周期管理,大大降低了耦合性。

顺便一提,网上很多讨论 ThreadLocal 带来的内存泄露,其本质上是使用方式有问题,而不是 ThreadLocal 类本身的 bug。这就好比在开发 Android 时一大堆内存泄露的问题都是由于没有好好看官方文档所引起的,面对这种情况,能有啥办法……

总结一下,ThreadLocalMap 其实就是存储于 Thread 对象里的,其中使用的 WeakReference 使乃不用去刻意去 remove 相关的元素,只要 ThreadLocal 被回收了就行了(当然前提是正确使用)。

三、自旋锁和 CLH 锁

乃会说,说了一大堆,这和 CLH 有啥关系?有了 ThreadLocal,我们就可以“轻易”实现 CLH。

CLH 是由 Craig、Landin 和 Hagersten 这几个哥们在九十年代提出的。以前老夫在分析 M$ 的 ConurrentQueue 的时候,见到过自旋锁这个东东。当临界区非常小时,正在占用临界区的线程可能只需要少数几条指令就能够释放锁,而如果此时将进程睡眠(使用操作系统内核提供的 mutex 等机制)再等待唤醒,那至少是几十毫秒(从用户态到内核态的相互切换)的开销,相较于几天指令就能完事的情况,浪费的时间太多。

而自旋锁使用一个循环来检查临界资源是否已经被释放,例如:

   public void lock() {
       Thread currentThread = Thread.currentThread();
       while (!owner.compareAndSet(null, currentThread)) {
   }

可以看到,自旋锁在多处理器(此时,需要 lock 指令锁总线)或者内存可抢占时使用才有效。此外,由于一直在 while 占用 CPU 资源,如果尝试了几次还是得不到资源,最好是使用 CPU 提供的 PAUSE 等待一段时间(如果没有这条指令其实除了耗电以外,倒没啥别的影响)。如果长时间得不到,择可以使用 sched_yield 短时间出让一下 CPU 资源。由此我们可以看出:

  • CAS 需要硬件的相关指令(不过目前不支持这个指令的处理器应该很少见了吧?);
  • 需要保存 CPU 各级缓存和内存的数据一致性,通讯开销比较大,而且在多处理器时还要跨 CPU Socket;
  • 没法保证公平性,调用此方法的线程只能互相碰运气竞争资源。

为了改进公平性,我们为每一个来排队的线程分配一个编号,另外设置一个临界区正在使用的线程编号。当某个线程使用完后,把编号加一,此时其它正在轮询的线程检查是否是自己所等待的编号,如果是,则进入临界区。

很容易发现这样只会使情况更糟。为此有两个哥们提出了 MCS 锁(也是以他们名字的首字母命名的),使用链表来实现自旋锁,线程只在本地变量上自选,上一结点通知后一个结点结束自选,从而减少了缓存同步的次数。

之后 CLH 等人提出了一种改进(之后称之为 CLH 自旋锁)。比如下面这种实现(代码抄自文末参见的第三篇)。

class ClhSpinLock {
	private final ThreadLocal<Node> prev;
	private final ThreadLocal<Node> node;
	private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());

	public ClhSpinLock() {
		this.node = new ThreadLocal<Node>() {
			protected Node initialValue() {
				return new Node();
			}
		};

		this.prev = new ThreadLocal<Node>() {
			protected Node initialValue() {
				return null;
			}
		};
	}

	public void lock() {
		final Node node = this.node.get();
		node.locked = true;
		Node pred = this.tail.getAndSet(node);
		this.prev.set(pred);
		while (pred.locked) {
		}
	}

	public void unlock() {
		final Node node = this.node.get();
		node.locked = false;
		this.node.set(this.prev.get());
	}

	private static class Node {
		private volatile boolean locked;
	}
}

其实上述代码有问题,getAndSet 这段逻辑应该在一个 while 循环里,或者直接使用 AtomicReferenceFieldUpdater 这个类提供的 getAndSet,因为可能会出现多个线程竞争更新 tail 的情况。

对于每一个线程,都保存了队列中前面一个结点的引用(直接使用 ThreadLocal,这样我们就不用搞另外一个结构来管理这些线程私有的变量了)。每次调用 lock 方法,都会在当前的 tail 后添加一个新结点(使用 AtomicReference.getAndSet 方法保证操作的原子性),之后使用 while 循环等待前一个结点释放(locked 为 true)。

此时,对于每一个线程,只等待其前一个结点释放,避免了多个线程竞争同一个变量的情况,同时可以实现公平性。

四、LockSupport

LockSupport 是实现锁(和相关同步类)的基本线程阻塞原语。

ReentrantLock 内部使用的 Sync 继承自 AbstractQueuedSynchronizer(下一篇文章会讲到),其实现是基于 LockSupport 的。所以是 Java 中各种锁的“始祖”,因此有必要好好理解一下这个类。

LockSupport 是一个“静态类”。这里的静态类指其构造方法是私有的,所有方法和字段都是 static 的。

偶们来看一下它的用法(JDK 里面的示例代码):

class FIFOMutex {
   private final AtomicBoolean locked = new AtomicBoolean(false);
   private final Queue<Thread> waiters
     = new ConcurrentLinkedQueue<Thread>();

   public void lock() {
     boolean wasInterrupted = false;
     Thread current = Thread.currentThread();
     waiters.add(current);

     // Block while not first in queue or cannot acquire lock
     while (waiters.peek() != current ||
            !locked.compareAndSet(false, true)) {
        LockSupport.park(this);
        if (Thread.interrupted()) // ignore interrupts while waiting
          wasInterrupted = true;
     }

     waiters.remove();
     if (wasInterrupted)          // reassert interrupt status on exit
        current.interrupt();
   }

   public void unlock() {
     locked.set(false);
     LockSupport.unpark(waiters.peek());
   }
 }

初看起来,这个东西貌似和 Object 的 wait、notify 没啥区别嘛?但是注意,LockSupport 是基于线程的;而 Object 的 wait 和 notify 是被动的。具体说来:

  • LockSupport 为每个线程提供两种状态:许可、不许可。
  • LockSupport.park(Object blocker):如果当前线程处于“许可”状态,该方法立即返回;否则将会等到其他线程为当前线程调用 unpark 重新回到许可状态或者中断当前线程。
  • LockSpport.unpark(Thread thread):解除 thread 线程的阻塞状态,如果未被阻塞则在下次调用 park 时不会阻塞。如果指定线程没有启动,则没有效果。
  • 连续调用 park 时,将会一直被阻塞。
  • park 有虚假唤醒的情况,因此要放在 while 循环里面重复检查条件是否满足。

简单来说,LockSupport 和 Object 类提供的方法相比:

  • 面向的对象不同。LockSupport 针对线程来设置其许可、不许可状态,Object 类针对某个对象。
  • LockSupport 与监视器(monitor)无关,而 Object 类操作前必须要有监视器。
  • LockSupport 是实现各种锁的基本原语,一般的项目中应使用更高层次的实现,不建议直接使用。(Object 类的 wait、notify 等实际上最好也不要直接在项目中使用。)
  • LockSupport 在线程中断时不会抛出异常。需要自己去判断线程的 Thread.interupted (如上面代码所示)。

LockSupport 的大部分代码都是直接调用的 Unsafe 类中的相关方法,这些方法都是 native 的,与操作系统提供的 API 有关。例如 park 方法:

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
private static void setBlocker(Thread t, Object arg) {
    // Even though volatile, hotspot doesn't need a write barrier here.
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

LockSupport 类直接在 Thread 对象里的 parkBlocker 字段里写入一个 Object 对象用于标记使用(和 ThreadLocalMap 类似),主要是为了方便调试。当然你也可以不提供或随便给一个东西。

在 LockSupport 中实现了一个我们经常使用的 nextSecondarySeed 方法(ThreadLocalRandom 也提供了这个方法),用于生成每个线程独立的随机数种子。

    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13;   // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        }
        else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

这个信息存储于 Thread 对象的 threadLocalRandomSeed 等字段。无论是 LockSupport 还是 ThreadLocal,实际上都是封装了对 Thread 类不同功能的访问,实际上所有内容都是塞在 Thread 类里面。

AbstractQueuedSynchronized 里面主要是用了 LockSupport 进行并发控制。

知道了上面这些乱七八糟的基础知识,我们就能够比较方便地分析 ReentrantLock 了。具体老夫会在下一篇中具体分析。

参见:

[1] 自旋锁、排队自旋锁、MCS锁、CLH锁(http://coderbee.net/index.php/concurrent/20131115/577);

[2] 【linux】spinlock 的实现(http://www.cnblogs.com/chenpingzhao/p/5043746.html)

[3] Java并发包源码学习之AQS框架(二)(http://zhanjindong.com/2015/03/11/java-concurrent-package-aqs-clh-and-spin-lock)

✏️ 有任何想法?欢迎发邮件告诉老夫:daozhihun@outlook.com