Thread 01


$ java -version
java version “13.0.2” 2020-01-14
Java(TM) SE Runtime Environment (build 13.0.2+8)
Java HotSpot(TM) 64-Bit Server VM (build 13.0.2+8, mixed mode, sharing)

上下文切换

CPU 通过时间片分配算法来循环执行任务, 一个时间片一般为几十毫秒,而时间片是用来给各个线程运行指令的时间。

当线程 A 消耗完时间片后, CPU 需要保存当前线程的状态,这样,当时间片再次分配给线程 A 时,可以知道它之前的执行状态。

这里,从保存到在加载的过程,就是一次上下文切换。

CPU 屏障

  • LoadLoad Barrier

  • LoadStore Barrier

  • StoreStore Barrier

  • StoreLoad Barrier

load1 LoadLoad load2 表示, 保证 load1 指令先于 load2 指令

load1 LoadStore store1 表示, 保证 load1 指令先于 store1 的写入。 同时读写一个值时,保证读先于写

store1 StoreStore store2 表示, 保证 store1 写入且其他 cpu 可见, 先于 store2 指令。 同时写入一个值, 保证 store1 的写入先于 store2

store1 StoreLoad load1 表示, 保证 store1 写入且其他 cpu 可见,先于 load1 指令,即 load1 可以获取到最新的值。

volatile

我们知道, java 中 volatile 关键字的作用有两个:

  • 多线程环境中,保证共享变量的可见性
  • 禁止指令重排

volatile 的可见性保证,是由 JMM 语义保证的, JMM 根据不同 CPU ,制定不同的插入屏障策略。

作为非底层开发者,我认为了解到, volatile 能通过各种 Barrier(或者叫做 Fence) ,来保证变量在多线程环境下的可见性就好了。

synchronized

在 HotSpot 虚拟机中, 一个对象包含

  • 对象头
    • markword 4 字节
    • class 对象指针 4 字节
    • 数组长度(如果是数组对象,非数组对象则无该值) 4字节
  • 实例数据
  • 对其填充

在 markword 中,又存储了如下信息:

  • 锁状态
  • hashcode
  • 分代年龄
  • 是否为偏向锁
  • 锁标志位

synchronized 在从未发生竞争时, 使用偏向锁。即同一个线程访问 synchronized 同步块时, 会在对象头和栈帧中的锁记录里存储偏向锁的线程 id ,之后该线程再次进入此同步块时,只要 markword 中存储了该线程 id ,就表示该线程获得了锁。

出现竞争后, 偏向锁就会膨胀成轻量级锁。轻量级锁通过 CAS + 自旋来获取锁 。

当自旋获取锁失败时,就会进一步膨胀成重量级锁。重量级锁会导致其他尝试获取锁的线程阻塞住。

CAS

compare and swap 。

java 中的原子类型,都是通过 CAS 实现的原子操作。在 java 9 之后, 最终会调用到 VarHandler

假设我们对变量 a 进行 compareAndSwap(1,2) ,1 为 expected , 2 为 new value 。

VarHandler 在访问 a 变量时,会保证对变量 a 的读 –> 改 –> 写 是一个原子操作,也就是说,在这个过程中,其他线程是无法对变量 a 做出更改的。

至于 VarHandler 是怎样保证这一过程的原子性的,主要是通过各型号 CPU 的指令集,进行总线锁定或者缓存锁定,禁止锁定期间,其他 CPU 的访问 [1]

CAS 的 ABA 问题指,线程 A 先将变量 a=0 写成 1, 后又写成 0 , 那么对于线程 B 而言, 这个变量没有发生改变。这个问题可以通过引入版本号来解决。即 a0=0 -> a1=1 -> a2=0 , 此时 a 的值虽然还是 0 , 但版本已经从 0 变成了 2。

java 中,AtomicStampedReference 即为带版本号的实现

Atomic & ThreadLocal

Atomic 类的实现, 主要依赖于 CAS ,在 java 9 之前, CAS 最终调用 Unsafe , 在 java 9 之后, 很多地方都改成使用 VarHandler [2]AtomicReference

ThreadLocal 能够为线程提供一份单独的副本。

var tmp = new ThreadLocal<Integer>();
tmp.set(3); 
new Thread(() -> {
    System.out.println(Thread.currentThread().getName() + ":" + tmp.get());
}).start();
new Thread(() -> {
    System.out.println(Thread.currentThread().getName() + ":" + tmp.get());
}).start();

此时, 两个线程都会输出 null 。

点进去看不难发现, ThreadLocal 的 set 操作, 会把值和当前线程绑定, 放入 一个 k=thread id , v=value 的 ThreadLocalMap 中 。而这个 map , 则是由当前的 Thread 来维护的。整个的类图如下所示:

Thread Local

其中, Entry 里的 key 是 ThreadLocal 的弱引用。这意味着, 如果 ThreadLocal 没有任何强引用指向它,则该 key 会被 GC 掉。

也就是说,只要 Thread 结束,所谓的 ThreadLocal 内存泄露,就不会发生。但是有可能你的 Thread 是由 thread pool 维护的…

所以一般的建议是, 使用完 ThreadLocal 后,执行 remove ,手动释放那些 key 为 null 的 value 对象。

Concurrent

ConcurrentHashMap

高版本 java (version >= 8) 中, 通过 CAS + Synchronized 实现多线程安全。

Map 一文中,我们介绍过 HashMap 的一般结构,ConcurrentHashMap 的结构与其基本一致。

我们看下 putget 两个方法的粗略实现。

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
        break;
}

当插入的数据, 在数组头时,直接通过 cas 进行插入


synchronized (f) {
    if (tabAt(tab, i) == f) {
        if (fh >= 0) {
           // 链表操作 ...
        }
        else if (f instanceof TreeBin) {
            // 红黑树操作 ...
        }
    }
}

如果插入的数据, 需要插入链表或者红黑树的时候, 则需要先用 synchronized 锁住头,再操作。

get 的时候:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0) // 红黑树查找
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) { // 链表查找
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

可以看到, get 操作全程无锁。这是因为它的 tablevolatile 关键字修饰,这就保证了多线程下的可见性, 这也是和 HashMap 的区别之一。

transient volatile Node<K,V>[] table;

哦, 还有一个地方, 在 table 初始话的时候,有一个变量:

private transient volatile int sizeCtl;

当需要对 table 初始化的时候, 需要先看 sizeCtl 的值:

  • < 0 , 表示有其他线程正在初始化 table ,此时需要通过 CAS + DCL + 自旋来等待
  • >= 0 , 表示可以对 table 初始化

ConcurrentLinkedQueue

非阻塞式队列。

通过 VarHandler 的 CAS + 自旋实现。虽然代码只有几行,可是内容却很多… 具体分析可参考 这篇文章

这里需要注意的一个点是, 它的 tail 节点,并不总是指向真正的尾节点。

BlockingQueue

阻塞式队列

在 zk 中, 很多地方都用到了它的子类, 如选举时,通过 LinkedBlockingQueue 来接收选票,在处理客户端请求时, 整个的处理链流程,也是将 Request 放入到 LinkedBlockingQueue 中。其他的线程通过监听该队列,来处理不断添加进来的数据。

无论是 ArrayBlockingQueue 还是 LinkedBlockingQueue , 其内部实现,都是通过 ReentrantLock 来实现的。

ArrayBlockingQueue 提供公平锁实现,有界,即大小固定,如果队列满后,仍尝试添加元素,则抛出 Queue full exception

LinkedBlockingQueue 不提供公平锁,可扩容,如果实例化时,没有提供 capacity,则默认容量为 Integer.Max_Value

Thread Pool

ThreadPool Class Flow

使用 threadpool , 有两个 task 类型可选:

  • runnable : void, 无返回值, 不能 throw 一个异常
  • callable : 带返回值, 可 throw 异常

事实上, 在真正提交给 Thread 的时候, callable 需要先包装成 RunnableFuture 的 task 。

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

一个 ThreadPoolExecutor 完整的构造函数如上所示

  • corePoolSize :池中的线程数量,即使无 task , 也不会被销毁。当然, 如果你设置 allowCoreThreadTimeOuttrue , 则会在 keepAliveTime 时间后被销毁
  • maximumPoolSize :最大线程数,如果无 task ,则 在 keepAliveTime 时间后, 被销毁
  • keepAliveTime :最大线程大于 core 线程时,如果无 task ,则 在 keepAliveTime 时间后, 被销毁
  • unit :keepAliveTime 的单位
  • workQueue : 当 task 无线程处理时, 会先放到 queue 中。
  • threadFactory :ThreadFactory 是一个 interface , 你可以实现自己的 factory ,或者使用 defaultThreadFactory(实例化的时候, 该参数不填即使用默认 factory)
  • handler :当 pool 负载时(线程和 queue 都满了) ,再添加 task 时的策略。 可以实现 RejectedExecutionHandler 接口来定制自己的策略, ThreadPoolExecutor 内置了四种策略 :
    • AbortPolicy : 直接丢弃
    • CallerRunsPolicy : 由执行 execute 方法的线程(一般为调用 submit 的线程) 执行
    • DiscardOldestPolicy : 会丢弃 queue 中最老的线程
    • DiscardPolicy : 会偷偷把你想加进来的 task 丢弃掉。静默丢弃

ForkJoin

Future & CompeletableFuture

Kinds of Locks

AQS

TODO

Reference

  • [1] CAS-seg & CAS-zhihu & CAS-juejin

  • [2] VarHandler : 主要是为底层开发人员提供便利。在此之前, 可以通过 JNI + C 或者通过 Unsafe ,现在可以通过 VarHandler 来做一些底层开发。

    Developers of Java language initially created the sun.misc.Unsafe to use it internally in APIs of JVM. But many third party softwares like Hazelcast, JMock, Netty, Hibernate, Spring, etc started using it as it enabled them to do powerful low level operations. The Unsafe API did these operations with better performance than JNI. Critical internal APIs like sun.misc.Unsafe are still available but deprecated in Java 9. They may be eventually encapsulated or removed in Java 10. Therefore, it is a better idea for developers of such softwares to adopt VarHandle API early in their code as an alternative to sun.misc.Unsafe class.

  • [3] ThreadLocal

  • Cache L1 L2 L3 究竟在哪里

  • CPU 内存结构


文章作者: peifeng
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 peifeng !
 上一篇
一趣二三句 一趣二三句
所谓越界,就是到那里为止的意思。守住那界限,就是继续现在所熟悉的感觉。那意味着维持那里的世界、那里的规则和关系。那也代表着,如果不越界,就无法预见另一个世界的规则和关系。 ---- 请回答 1988
2020-05-05
下一篇 
用微信公众号给你的 Blog 鉴权 用微信公众号给你的 Blog 鉴权
通过 Nginx 代理的静态 blog , 虽然只是静态 blog , 可也希望有些页面能做权限管理。 Nginx 提供了 Basic Auth 功能。不过我更喜欢通过微信来做简单的验证功能。下面就是我个人的实现过程。
2020-05-02
  目录