yeskery

源码阅读:全方位讲解 LongAdder

高并发下计数功能最好的数据结构就是 LongAdder 与 DoubleAdder,低并发下效率也非常优秀,这是我见过的 java 并发包中设计的最为巧妙的类,从软硬件方面将 java 并发累加操作优化到了极致,所以应该我们应该弄清楚它的每一行代码为什么要这样做,它俩的实现大同小异,下面以 LongAdder 类为例介绍下它的实现。

Striped64类

  1. public class LongAdder extends Striped64 implements Serializable

LongAdder 继承了 Striped64 类,来实现累加功能的,它是实现高并发累加的工具类;
Striped64 的设计核心思路就是通过内部的分散计算来避免竞争。
Striped64 内部包含一个 base 和一个 Cell[] cells 数组,又叫 hash 表。
没有竞争的情况下,要累加的数通过 cas 累加到 base 上;如果有竞争的话,会将要累加的数累加到 Cells 数组中的某个 cell 元素里面。所以整个 Striped64 的值为 sum=base+∑[0~n]cells

Striped64 内部三个重要的成员变量:

  1. /**
  2. * 存放Cell的hash表,大小为2的幂。
  3. */
  4. transient volatile Cell[] cells;
  5. /**
  6. * 基础值,
  7. * 1. 在没有竞争时会更新这个值;
  8. * 2. 在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base。
  9. */
  10. transient volatile long base;
  11. /**
  12. * 自旋锁,通过CAS操作加锁,用于保护创建或者扩展Cell表。
  13. */
  14. transient volatile int cellsBusy;

成员变量cells

cells 数组是 LongAdder 高性能实现的必杀器:
AtomicInteger 只有一个 value,所有线程累加都要通过 cas 竞争 value 这一个变量,高并发下线程争用非常严重;
而 LongAdder 则有两个值用于累加,一个是 base,它的作用类似于 AtomicInteger 里面的 value,在没有竞争的情况不会用到 cells 数组,它为 null,这时使用 base做累加,有了竞争后 cells 数组就上场了,第一次初始化长度为 2,以后每次扩容都是变为原来的两倍,直到 cells 数组的长度大于等于当前服务器 cpu 的数量为止就不在扩容(想下为什么到超过 cpu 数量的时候就不再扩容);每个线程会通过线程对 cells[threadLocalRandomProbe%cells.length] 位置的 Cell 对象中的 value 做累加,这样相当于将线程绑定到了 cells 中的某个 cell 对象上;

成员变量cellsBusy

cellsBusy,它有两个值 01,它的作用是当要修改 cells 数组时加锁,防止多线程同时修改 cells 数组,0 为无锁,1 为加锁,加锁的状况有三种:

  1. cells 数组初始化的时候;
  2. cells 数组扩容的时候;
  3. 如果 cells 数组中某个元素为 null,给这个位置创建新的 Cell 对象的时候;

成员变量base

它有两个作用:

  1. 在开始没有竞争的情况下,将累加值累加到 base;
  2. 在 cells 初始化的过程中,cells 不可用,这时会尝试将值累加到 base 上;

Cell 内部类

  1. //为提高性能,使用注解@sun.misc.Contended,用来避免伪共享,
  2. @sun.misc.Contended static final class Cell {
  3. //用来保存要累加的值
  4. volatile long value;
  5. Cell(long x) { value = x; }
  6. //使用UNSAFE类的cas来更新value值
  7. final boolean cas(long cmp, long val) {
  8. return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
  9. }
  10. private static final sun.misc.Unsafe UNSAFE;
  11. //value在Cell类中存储位置的偏移量;
  12. private static final long valueOffset;
  13. //这个静态方法用于获取偏移量
  14. static {
  15. try {
  16. UNSAFE = sun.misc.Unsafe.getUnsafe();
  17. Class<?> ak = Cell.class;
  18. valueOffset = UNSAFE.objectFieldOffset
  19. (ak.getDeclaredField("value"));
  20. } catch (Exception e) {
  21. throw new Error(e);
  22. }
  23. }
  24. }

这个类很简单,final 类型,内部有一个 value 值,使用 cas 来更新它的值;Cell 类唯一需要注意的地方就是 Cell 类的注解 @sun.misc.Contended

伪共享

要理解 Contended 注解的作用,要先弄清楚什么是伪共享,会有什么影响,如何解决伪共享。

缓存行 cache line

要理解伪共享先要弄清楚什么是 cache line,cpu 的缓存系统中是以缓存行(cache line)为单位存储的,缓存行是 2 的整数幂个连续字节,一般为 32-256 个字节。最常见的缓存行大小是 64 个字节,cache line 是 cache 和 memory 之间数据传输的最小单元。

大多数现代 cpu 都 one-die 了 L1 和 L2cache。对于 L1 cache,大多是 write though 的;L2 cache 则是 write back 的,不会立即写回 memory,这就会导致 cache 和 memory 的内容的不一致;另外,对于 mp(multi processors) 的环境,由于 cache 是 cpu 私有的,不同 cpu 的 cache 的内容也存在不一致的问题,因此很多 mp 的的计算架构,不论是 ccnuma 还是 smp 都实现了 cache coherence 的机制,即不同 cpu 的 cache 一致性机制。

  • Write-through(直写模式)在数据更新时,同时写入缓存Cache和后端存储。此模式的优点是操作简单;缺点是因为数据修改需要同时写入存储,数据写入速度较慢。

  • Write-back(回写模式)在数据更新时只写入缓存Cache。只在数据被替换出缓存时,被修改的缓存数据才会被写到后端存储。此模式的优点是数据写入速度快,因为不需要写存储;缺点是一旦更新后的数据未被写入存储时出现系统掉电的情况,数据将无法找回。

cache coherence 的一种实现是通过 cache-snooping 协议,每个 cpu 通过对 bus 的 snoop 实现对其它 cpu 读写 cache 的监控:

  1. 当 cpu1 要写 cache 时,其它 cpu 就会检查自己 cache 中对应的 cache line,如果是 dirty 的,就 write back 到 memory,并且会将 cpu1 的相关 cache line 刷新;如果不是 dirty 的,就 invalidate 该 cache line;
  2. 当 cpu1 要读 cache 时,其它 cpu 就会将自己 cache 中对应的 cache line 中标记为 dirty 的部分 write back 到 memory,并且会将 cpu1 的相关 cache line 刷新。

所以,提高 cpu 的 cache hit rate,减少 cache 和 memory 之间的数据传输,将会提高系统的性能。

因此,在程序和二进制对象的内存分配中保持 cache line aligned 就十分重要,如果不保证 cache line 对齐,出现多个 cpu 中并行运行的进程或者线程同时读写同一个 cache line 的情况的概率就会很大。这时 cpu 的 cache 和 memory 之间会反复出现 write back 和 refresh 情况,这种情形就叫做 cache thrashing。

为了有效的避免 cache thrashing,通常有以下两种途径:

  1. 对于 heap 的分配,很多系统在 malloc 调用中实现了强制的 alignment.
  2. 对于 stack 的分配,很多编译器提供了 stack aligned 的选项。

当然,如果在编译器指定了 stack aligned,程序的尺寸将会变大,会占用更多的内存。因此,这中间的取舍需要仔细考虑;

关于伪共享详情请看这里介绍以及这里

为了解决这个问题在 jdk1.6 会采用 long padding 的方式,就是在防止被伪共享的变量的前后加上 7 个 long 类型的变量,如下所示:

  1. public class VolatileLongPadding {
  2. volatile long p0, p1, p2, p3, p4, p5, p6;
  3. volatile long v = 0L;
  4. volatile long q0, q1, q2, q3, q4, q5, q6;
  5. }

jdk1.7 的某个版本后会优化掉 long padding,为了解决这个问题,在 jdk1.8 中加入了 @sun.misc.Contended;

LongAdder

前面说了一大堆,现在终于进入到正题了。

LongAdder –>add 方法

add 方法是 LongAdder 累加的方法,传入的参数x为要累加的值;

  1. public void add(long x) {
  2. Cell[] as; long b, v; int m; Cell a;
  3. /**
  4. * 如果一下两种条件则继续执行if内的语句
  5. * 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组)
  6. * 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
  7. * casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
  8. * casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。
  9. */
  10. if ((as = cells) != null || !casBase(b = base, b + x)) {
  11. //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。
  12. boolean uncontended = true;
  13. /**
  14. *1. as == null : cells数组未被初始化,成立则直接进入if执行cell初始化
  15. *2. (m = as.length - 1) < 0: cells数组的长度为0
  16. *条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
  17. *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置为null,这说明这个位置从来没有线程做过累加,需要进入if继续执行,在这个位置创建一个新的Cell对象;
  18. *4. !(uncontended = a.cas(v = a.value, v + x)):尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;
  19. 如果进入if语句执行longAccumulate方法,有三种情况
  20. 1. 前两个条件代表cells没有初始化,
  21. 2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
  22. 3. 第四个条件代表产生了冲突,uncontended=false
  23. **/
  24. if (as == null || (m = as.length - 1) < 0 ||
  25. (a = as[getProbe() & m]) == null ||
  26. !(uncontended = a.cas(v = a.value, v + x)))
  27. longAccumulate(x, null, uncontended);
  28. }
  29. }

longAccumulate 方法

三个参数第一个为要累加的值,第二个为 null,第三个为 wasUncontended 表示调用方法之前的add方法是否未发生竞争;

  1. final void longAccumulate(long x, LongBinaryOperator fn,
  2. boolean wasUncontended) {
  3. //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员静态私有变量probeGenerator的值,后面会详细将hash值的生成;
  4. //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况
  5. //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组后争用的),是第一次执行base的cas累加操作失败;
  6. //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,那么这里会将其重新置为true;第一次执行操作失败;
  7. //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0;
  8. int h;
  9. if ((h = getProbe()) == 0) {
  10. //初始化ThreadLocalRandom;
  11. ThreadLocalRandom.current(); // force initialization
  12. //将h设置为0x9e3779b9
  13. h = getProbe();
  14. //设置未竞争标记为true
  15. wasUncontended = true;
  16. }
  17. //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;collide=true代表有冲突,collide=false代表无冲突
  18. boolean collide = false;
  19. for (;;) {
  20. Cell[] as; Cell a; int n; long v;
  21. //这个主干if有三个分支
  22. //1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4)
  23. //2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2)
  24. //3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,及cellbusy=1;则尝试将累加值通过cas累加到base上
  25. //先看主分支一
  26. if ((as = cells) != null && (n = as.length) > 0) {
  27. /**
  28. *内部小分支一:这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,说明没有线程在这个位置设置过值,没有竞争,可以直接使用,则用x值作为初始值创建一个新的Cell对象,对cells数组使用cellsBusy加锁,然后将这个Cell对象放到cells[m%cells.length]位置上
  29. */
  30. if ((a = as[(n - 1) & h]) == null) {
  31. //cellsBusy == 0 代表当前没有线程cells数组做修改
  32. if (cellsBusy == 0) {
  33. //将要累加的x值作为初始值创建一个新的Cell对象,
  34. Cell r = new Cell(x);
  35. //如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁
  36. if (cellsBusy == 0 && casCellsBusy()) {
  37. //标记Cell是否创建成功并放入到cells数组被hash的位置上
  38. boolean created = false;
  39. try {
  40. Cell[] rs; int m, j;
  41. //再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null
  42. if ((rs = cells) != null &&
  43. (m = rs.length) > 0 &&
  44. rs[j = (m - 1) & h] == null) {
  45. //将新的cell设置到该位置
  46. rs[j] = r;
  47. created = true;
  48. }
  49. } finally {
  50. //去掉锁
  51. cellsBusy = 0;
  52. }
  53. //生成成功,跳出循环
  54. if (created)
  55. break;
  56. //如果created为false,说明上面指定的cells数组的位置cells[m%cells.length]已经有其它线程设置了cell了,继续执行循环。
  57. continue;
  58. }
  59. }
  60. //如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false
  61. collide = false;
  62. /**
  63. *内部小分支二:如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的value值设置为v+x失败,说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,最后重新计算一个新的probe,然后重新执行循环;
  64. */
  65. } else if (!wasUncontended)
  66. //设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。
  67. wasUncontended = true;
  68. /**
  69. *内部小分支三:新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]的value ,如果成功直接退出
  70. */
  71. else if (a.cas(v = a.value, ((fn == null) ? v + x :
  72. fn.applyAsLong(v, x))))
  73. break;
  74. /**
  75. *内部小分支四:分支3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值(大于等于cup数量),或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值
  76. else if (n >= NCPU || cells != as)
  77. collide = false;
  78. /**
  79. *内部小分支五:如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环
  80. */
  81. else if (!collide)
  82. //设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
  83. collide = true;
  84. /**
  85. *内部小分支六:扩容cells数组,新参与cell争用的线程两次均失败,且符合库容条件,会执行该分支
  86. */
  87. else if (cellsBusy == 0 && casCellsBusy()) {
  88. try {
  89. //检查cells是否已经被扩容
  90. if (cells == as) { // Expand table unless stale
  91. Cell[] rs = new Cell[n << 1];
  92. for (int i = 0; i < n; ++i)
  93. rs[i] = as[i];
  94. cells = rs;
  95. }
  96. } finally {
  97. cellsBusy = 0;
  98. }
  99. collide = false;
  100. continue; // Retry with expanded table
  101. }
  102. //为当前线程重新计算hash值
  103. h = advanceProbe(h);
  104. //这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0,先尝试获取cellsBusy锁。
  105. }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
  106. boolean init = false;
  107. try { // Initialize table
  108. //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
  109. if (cells == as) {
  110. Cell[] rs = new Cell[2];
  111. rs[h & 1] = new Cell(x);
  112. cells = rs;
  113. init = true;
  114. }
  115. } finally {
  116. //解锁
  117. cellsBusy = 0;
  118. }
  119. //如果init为true说明初始化成功,跳出循环
  120. if (init)
  121. break;
  122. }
  123. /**
  124. *如果以上操作都失败了,则尝试将值累加到base上;
  125. */
  126. else if (casBase(v = base, ((fn == null) ? v + x :
  127. fn.applyAsLong(v, x))))
  128. break; // Fall back on using base
  129. }
  130. }

关于 hash 的生成

hash 是 LongAdder 定位当前线程应该将值累加到 cells 数组哪个位置上的,所以hash的算法是非常重要的,下面就来看看它的实现。
java 的 Thread 类里面有一个成员变量

  1. @sun.misc.Contended("tlr")
  2. int threadLocalRandomProbe;

threadLocalRandomProbe 这个变量的值就是 LongAdder 用来 hash 定位 Cells 数组位置的,平时线程的这个变量一般用不到,它的值一直都是 0。

在 LongAdder 的父类 Striped64 里通过 getProbe 方法获取当前线程 threadLocalRandomProbe 的值:

  1. static final int getProbe() {
  2. //PROBE是threadLocalRandomProbe变量在Thread类里面的偏移量,所以下面语句获取的就是threadLocalRandomProbe的值;
  3. return UNSAFE.getInt(Thread.currentThread(), PROBE);
  4. }

threadLocalRandomProbe 的初始化

线程对 LongAdder 的累加操作,在没有进入 longAccumulate 方法前,threadLocalRandomProbe 一直都是 0,当发生争用后才会进入 longAccumulate 方法中,进入该方法第一件事就是判断 threadLocalRandomProbe 是否为 0,如果为 0,则将其设置为 0x9e3779b9

  1. int h;
  2. if ((h = getProbe()) == 0) {
  3. ThreadLocalRandom.current();
  4. h = getProbe();
  5. //设置未竞争标记为true
  6. wasUncontended = true;
  7. }

重点在这行 ThreadLocalRandom.current();

  1. public static ThreadLocalRandom current() {
  2. if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
  3. localInit();
  4. return instance;
  5. }

在 current 方法中判断如果 probe 的值为 0,则执行 locaInit() 方法,将当前线程的 probe 设置为非 0 的值,该方法实现如下:

  1. static final void localInit() {
  2. //private static final AtomicInteger probeGenerator =
  3. new AtomicInteger();
  4. //private static final int PROBE_INCREMENT = 0x9e3779b9;
  5. int p = probeGenerator.addAndGet(PROBE_INCREMENT);
  6. //prob不能为0
  7. int probe = (p == 0) ? 1 : p; // skip 0
  8. long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
  9. //获取当前线程
  10. Thread t = Thread.currentThread();
  11. UNSAFE.putLong(t, SEED, seed);
  12. //将probe的值更新为probeGenerator的值
  13. UNSAFE.putInt(t, PROBE, probe);
  14. }

probeGenerator 是 static 类型的 AtomicInteger 类,每执行一次 localInit() 方法,都会将 probeGenerator 累加一次 0x9e3779b9 这个值;,0x9e3779b9 这个数字的得来是 2^32 除以一个常数,这个常数就是传说中的黄金比例 1.6180339887;然后将当前线程的 threadLocalRandomProbe 设置为 probeGenerator 的值,如果 probeGenerator 为 0,这取1

threadLocalRandomProbe 重新生成

就是将 prob 的值左右移位 、异或操作三次

  1. static final int advanceProbe(int probe) {
  2. probe ^= probe << 13; // xorshift
  3. probe ^= probe >>> 17;
  4. probe ^= probe << 5;
  5. UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
  6. return probe;
  7. }

probe 从 =1 开始反复执行 10 次,结果如下:

  1. 1
  2. 270369
  3. 67634689
  4. -1647531835
  5. 307599695
  6. -1896278063
  7. 745495504
  8. 632435482
  9. 435756210
  10. 2005365029
  11. -1378868364

本文转载自:https://blog.csdn.net/zqz_zqz/article/details/70665941

评论

发表评论 点击刷新验证码

提示

该功能暂未开放