JAVA高并发 无锁(CAS)



但是如果做重试的操作(比如循环体),除非重试的操作过多,否则一般基本上无锁的操作比有锁的方式要好很多。


1.无锁的原理详解

1.CASCompare And Swap/Set)比较并交换

CAS算法的过程是这样:它包含3个参数CAS(V,E,N)V表示要更新的变量(内存值)E表示预期值(旧的)N表示新值当且仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的(乐观锁),它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即时没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。

CAS 整个操作过程是原子操作,是一条CPU指令完成的

2.CPU指令


2.无锁类

JDK1.5的原子包:java.util.concurrent.atomic

这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解

1.AtomicInteger

可以用原子方式更新的 int 值。

主要用于在高并发环境下的高效程序处理。使用非阻塞算法来实现并发控制。

[java]   view plain  copy
  1. Public class AtomicInteger extends Number   
  2. implements java.io.Serializable {  
  3.   
  4. private static final Unsafe unsafe = Unsafe.getUnsafe();  
  5.     //偏移量  
  6.     private static final long valueOffset;  
  7.   
  8.     static {  
  9.       try {  
  10.         //获取偏移量  指value这个值在类当中的偏移量  
  11.         valueOffset = unsafe.objectFieldOffset  
  12.             (AtomicInteger.class.getDeclaredField("value"));  
  13.       } catch (Exception ex) { throw new Error(ex); }  
  14.     }  
  15.     //AtomicInteger内部对int类型的value进行包装,所有的操作都是对这个value操作  
  16.     private volatile int value;  
  17.   
  18. /** 
  19.  *  返回当前值 
  20.      */  
  21.     public final int get() {  
  22.         return value;  
  23. }  
  24.   
  25.     /** 
  26.      * 设置当前值 
  27.      * @param 当前值 
  28.      */  
  29.     public final void set(int newValue) {  
  30.         value = newValue;  
  31. }  
  32.   
  33.     /** 
  34.      * 设置新值并返回旧值 
  35.      * 
  36.      * @param 新值 
  37.      * @return 旧值(上一个值) 
  38.      */  
  39.     public final int getAndSet(int newValue) {  
  40.         for (;;) {  
  41.             int current = get();  
  42.             if (compareAndSet(current, newValue))  
  43.                 return current;  
  44.         }  
  45. }  
  46.   
  47.     /** 
  48.      * 如果当前值是期望值 expect 则设置为 update 值 
  49.      *  
  50.      * 
  51.      * @param 期望(老的)值 
  52.      * @param 新值 
  53.      * @return 是否设置成功,返回失败代表新值和期望值是不相同的 
  54.      */  
  55. public final boolean compareAndSet(int expect, int update) {  
  56.     //unsafe是个不安全的操作,会提供类似于指针之类的操作  
  57.     //对于这个类(this)的这个偏移量(valueOffset)上的数据查看期望值是多少  
  58.         return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
  59. }  
  60.   
  61.     /** 
  62.      * 当前值加一,并返回旧值 
  63.      * 
  64.      * @return  旧值 
  65.      */  
  66.     public final int getAndIncrement() {  
  67.         for (;;) {  
  68.             //获取当前值   
  69.             int current = get();  
  70.             int next = current + 1;  
  71.             //比较期望值是否是当前值(current) 目标是+1后的值  
  72.             //如果在做完+1操作后 有线程修改了value值,那么current就和期望值是不相符的,所以设置失败,继续下次执行.如果设置成功,则返回current  
  73.             if (compareAndSet(current, next))  
  74.                 return current;  
  75.         }  
  76. }  
  77.   
  78.     /** 
  79.      * 当前值减一,并返回旧值 
  80.      * 
  81.      * @return 旧值 
  82.      */  
  83.     public final int getAndDecrement() {  
  84.         for (;;) {  
  85.             int current = get();  
  86.             int next = current - 1;  
  87.             if (compareAndSet(current, next))  
  88.                 return current;  
  89.         }  
  90. }  
  91.   
  92.     /** 
  93.      * 当前值加delta,并返回 
  94.      * 
  95.      * @param  
  96.      * @return 旧值 
  97.      */  
  98.     public final int getAndAdd(int delta) {  
  99.         for (;;) {  
  100.             int current = get();  
  101.             int next = current + delta;  
  102.             if (compareAndSet(current, next))  
  103.                 return current;  
  104.         }  
  105. }  
  106.   
  107.     /** 
  108.      * 当前值加一,并返回新值 
  109.      * 
  110.      * @return 新值 
  111.      */  
  112.     public final int incrementAndGet() {  
  113.         for (;;) {  
  114.             int current = get();  
  115.             int next = current + 1;  
  116.             if (compareAndSet(current, next))  
  117.                 return next;  
  118.         }  
  119. }  
  120.   
  121.     /** 
  122.      * 当前值减一,并返回新值 
  123.      * 
  124.      * @return 新值 
  125.      */  
  126.     public final int decrementAndGet() {  
  127.         for (;;) {  
  128.             int current = get();  
  129.             int next = current - 1;  
  130.             if (compareAndSet(current, next))  
  131.                 return next;  
  132.         }  
  133. }  
  134.   
  135.     /** 
  136.      * 当前值加delta,并返回新值 
  137.      * 
  138.      * @param  
  139.      * @return 新值 
  140.      */  
  141.     public final int addAndGet(int delta) {  
  142.         for (;;) {  
  143.             int current = get();  
  144.             int next = current + delta;  
  145.             if (compareAndSet(current, next))  
  146.                 return next;  
  147.         }  
  148. }  
  149.   
  150.   
  151. }  

关于偏移量:例如C里面的结构体

Java :所以unsafe可以通过偏移量对变量进行操作

2.Unsafe 

[java]   view plain  copy
  1. /** 
  2.  * 非安全的操作,比如: 
  3.  * 根据偏移量设置值 
  4.  * park() 
  5.  * 底层的CAS操作 
  6.  * 非公开API,在不同版本的JDK中,可能有较大差异 
  7.  */  
  8. public final class Unsafe {  
  9.   
  10.     private static native void registerNatives();  
  11.     static {  
  12.         registerNatives();  
  13.         sun.reflect.Reflection.registerMethodsToFilter(Unsafe.class"getUnsafe");  
  14.     }  
  15.   
  16.     private Unsafe() {}  
  17.   
  18.     private static final Unsafe theUnsafe = new Unsafe();  
  19.      
  20.     /** 
  21.      * 获取给定对象偏移量上的int值 
  22.      * @param 对象 
  23.      * @param 偏移量 
  24.      *         
  25.      * @return 该偏移量上的整数 
  26.      * @throws RuntimeException No defined exceptions are thrown, not even 
  27.      *         {@link NullPointerException} 
  28.      */  
  29. public native int getInt(Object o, long offset);  
  30.   
  31.     /** 
  32.      * 设置给定对象偏移量上面的值(int类型) 
  33.      * @param 对象 
  34.      * @param 偏移量 
  35.      * @param 整数 
  36.      * @throws RuntimeException No defined exceptions are thrown, not even 
  37.      *         {@link NullPointerException} 
  38.      */  
  39.  public native void putInt(Object o, long offset, int x);  
  40.   
  41.     /** 
  42.      * 获取字段在对象中的偏移量 
  43.      * @param 对象中的字段 
  44.      */  
  45.  public native long objectFieldOffset(Field f);  
  46.   
  47.   
  48. /**  
  49.   * 设置给定对象偏移量上面的值(int 类型) volatile语义 
  50.   */  
  51.  public native void putIntVolatile(Object o, long offset, int x);  
  52.   
  53.   
  54. /**  
  55.   * 获取给定对象偏移量上的int值 volatile语义 
  56.   */  
  57. public native int getIntVolatile(Object o, long offset);  
  58.   
  59. /**  
  60.   * 和putIntVolatile()一样,但它要求被操作的字段就是volatile类型的 
  61.   */  
  62. public native void    putOrderedInt(Object o, long offset, int x);  
  63. }  

3.AtomicReference

[java]   view plain  copy
  1. /** 
  2.  * 对引用进行修改 
  3.  * AtomicInteger 是对整数进行封装,而AtomicReference封装的其实是对象的引用 
  4.  * 是一个模板类,抽象化了数据类型 
  5.  */  
  6. public class AtomicReference<V>  implements java.io.Serializable {  
  7.   
  8.     private static final long serialVersionUID = -1848883965231344442L;  
  9. //偏移量  
  10. private static final Unsafe unsafe = Unsafe.getUnsafe();  
  11.   
  12.     private static final long valueOffset;  
  13.   
  14.     static {  
  15.       try {  
  16.         //获取偏移量  指value这个值在类当中的偏移量  
  17.         valueOffset = unsafe.objectFieldOffset  
  18.             (AtomicReference.class.getDeclaredField("value"));  
  19.       } catch (Exception ex) { throw new Error(ex); }  
  20.     }  
  21.   
  22.       
  23.     private volatile V value;  
  24.       
  25.     /** 
  26.      * 获取当前值 
  27.      * 
  28.      * @return 当前值 
  29.      */  
  30.     public final V get() {  
  31.         return value;  
  32. }  
  33.   
  34.     /** 
  35.      * 设置当前值 
  36.      * 
  37.      * @param 当前值 
  38.      */  
  39.     public final void set(V newValue) {  
  40.         value = newValue;  
  41. }  
  42.       
  43.      /** 
  44.      * 如果当前值是期望值 expect 则设置为 update 值 
  45.      * @param 期望(老的)值 
  46.      * @param 新值 
  47.      * @return 是否设置成功,返回失败代表新值和期望值是不相同的 
  48.      */  
  49.     public final boolean compareAndSet(V expect, V update) {  
  50.         return unsafe.compareAndSwapObject(this, valueOffset, expect, update);  
  51.     }  
  52.       
  53.     /** 
  54.      * 设置新值并返回旧值 
  55.      * 
  56.      * @param 新值 
  57.      * @return 旧值(上一个值) 
  58.      */  
  59.     public final V getAndSet(V newValue) {  
  60.         while (true) {  
  61.             V x = get();  
  62.             if (compareAndSet(x, newValue))  
  63.                 return x;  
  64.         }  
  65.     }  
  66. }  

AtomicReferencevolatile的区别

 

首先volatilejava中关键字用于修饰变量,AtomicReference是并发包java.util.concurrent.atomic下的类。

首先volatile作用,当一个变量被定义为volatile之后,看做“程度较轻的 synchronized”,具备两个特性:

1.保证此变量对所有线程的可见性(当一条线程修改这个变量值时,新值其他线程立即得知)

2.禁止指令重新排序

注意volatile修饰变量不能保证在并发条件下是线程安全的,因为java里面的运算并非原子操作。

volatile说明

java.util.concurrent.atomic工具包,支持在单个变量上解除锁的线程安全编程。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解


4.AtomicStampedReference

在运用CAS做操作中有一个经典的ABA问题:


如果场景是和过程状态无关的,只跟结果有关系,那么影响不大,但是有些情况之下,场景可能和过程有关的.当你对数据变化过程是敏感的时候,普通的CAS操作是无法辨别上图2A的区别的.

[java]   view plain  copy
  1. /** 
  2.  * 原子更新带有版本号的引用类型。 
  3.  * 该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号。 
  4.  * 可以解决使用CAS进行原子更新时,可能出现的ABA问题。 
  5.  */  
  6. public class AtomicStampedReference<V> {  
  7.   
  8.     private static class Pair<T> {  
  9.         final T reference;  
  10.         //最好不要重复的一个数据,决定数据是否能设置成功  
  11.         final int stamp;  
  12.         private Pair(T reference, int stamp) {  
  13.             this.reference = reference;  
  14.             this.stamp = stamp;  
  15.         }  
  16.         //根据reference和stamp来生成一个Pair的实例  
  17.         static <T> Pair<T> of(T reference, int stamp) {  
  18.             return new Pair<T>(reference, stamp);  
  19.         }  
  20.     }  
  21.     //对Value 进行封装,放入Pair里面  
  22. private volatile Pair<V> pair;  
  23.   
  24.     /** 
  25.      * 返回当前的对象 
  26.      */  
  27.     public V getReference() {  
  28.         return pair.reference;  
  29.     }  
  30.   
  31.     /** 
  32.      * 返回当前的stamp 
  33.      */  
  34.     public int getStamp() {  
  35.         return pair.stamp;  
  36. }  
  37.   
  38.     /** 
  39.      * 当当前的引用数据和期望的引用数据相等并且当前stamp和期望的stamp也相等 
  40.      * 并且 
  41.      * (当前的引用数据和新的引用数据相等并且当前stamp和新的stamp也相等 
  42.      * 或者cas操作成功 
  43.      * ) 
  44.      * @param 期望(老的)的引用数据 
  45.      * @param 新的引用数据 
  46.      * @param 期望(老的)的stamp值 
  47.      * @param 新的stamp值 
  48.      * @return  
  49.      */  
  50.     public boolean compareAndSet(V   expectedReference,  
  51.                                  V   newReference,  
  52.                                  int expectedStamp,  
  53.                                  int newStamp) {  
  54.         Pair<V> current = pair;  
  55.         return  
  56.             expectedReference == current.reference &&  
  57.             expectedStamp == current.stamp &&  
  58.             ((newReference == current.reference &&  
  59.               newStamp == current.stamp) ||  
  60.              casPair(current, Pair.of(newReference, newStamp)));  
  61. }  
  62.   
  63.     /** 
  64.      * 当前值和期望值比较设置 
  65.      */  
  66.     private boolean casPair(Pair<V> cmp, Pair<V> val) {  
  67.         return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);  
  68. }  
  69. }  

使用方法:

[java]   view plain  copy
  1. AtomicStampedReference<Integer> num = new AtomicStampedReference<Integer>(10);  
  2.     public void test () {  
  3.         Integer i = num.getReference();  
  4.         int stamped = num.getStamp();  
  5.         if (num.compareAndSet(i, i+1, stamped, stamped+1 )) {  
  6.             System.out.println("设置成功");  
  7.         }  
  8.     }  

5.AtomicIntegerArray

[java]   view plain  copy
  1. /** 
  2.  * 原子更新整型数组里的元素。 
  3.  */  
  4. public class AtomicIntegerArray implements java.io.Serializable {  
  5.     private static final long serialVersionUID = 2862133569453604235L;  
  6.   
  7. private static final Unsafe unsafe = Unsafe.getUnsafe();  
  8. //数组所在的基地址  
  9.     private static final int base = unsafe.arrayBaseOffset(int[].class);  
  10.     private static final int shift;  
  11.     private final int[] array;  
  12.   
  13. static {  
  14.     //数组中元素的宽度,比如int占4个byte,所以此处scale应该是4  
  15.         int scale = unsafe.arrayIndexScale(int[].class);  
  16.         if ((scale & (scale - 1)) != 0)  
  17.             throw new Error("data type scale not a power of two");  
  18.         //31减去前导零(数字变成二进制后前面0的个数)这里shift 为2  
  19.         shift = 31 - Integer.numberOfLeadingZeros(scale);  
  20.     }  
  21.   
  22.     /** 
  23.      * 获取第i个元素在数组中的偏移量 
  24.      */  
  25.     private long checkedByteOffset(int i) {  
  26.         if (i < 0 || i >= array.length)  
  27.             throw new IndexOutOfBoundsException("index " + i);  
  28.   
  29.         return byteOffset(i);  
  30.     }  
  31.   
  32.     /** 
  33.      * 获取第i个元素在数组中的偏移量 
  34.      */  
  35. private static long byteOffset(int i) {  
  36.     //因为shift=2 所以相当于  i*4+base  
  37.         return ((long) i << shift) + base;  
  38. }  
  39.   
  40.     /** 
  41.      * 获取数组的大小 
  42.      */  
  43.     public final int length() {  
  44.         return array.length;  
  45. }  
  46.   
  47.     /** 
  48.      * 获取数组中第i个下标的元素 
  49.      */  
  50.     public final int get(int i) {  
  51.         return getRaw(checkedByteOffset(i));  
  52.     }  
  53.   
  54.     /** 
  55.      * 通过偏移量获取数组中的元素 
  56.      */  
  57.     private int getRaw(long offset) {  
  58.         return unsafe.getIntVolatile(array, offset);  
  59. }  
  60.   
  61.     /** 
  62.      * 设置数组中第i个下标的元素 
  63.      */  
  64.     public final void set(int i, int newValue) {  
  65.         unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);  
  66. }  
  67.    /** 
  68.      * 将数组中第i个下标设置为新值,并返回旧值 
  69.      */  
  70.     public final int getAndSet(int i, int newValue) {  
  71.         long offset = checkedByteOffset(i);  
  72.         while (true) {  
  73.             int current = getRaw(offset);  
  74.             if (compareAndSetRaw(offset, current, newValue))  
  75.                 return current;  
  76.         }  
  77. }  
  78.     /** 
  79.      *如果第i个下标的元素等于expect,则设置为update,设置成功返回true 
  80.      */  
  81.     public final boolean compareAndSet(int i, int expect, int update) {  
  82.         return compareAndSetRaw(checkedByteOffset(i), expect, update);  
  83. }  
  84.   
  85.     private boolean compareAndSetRaw(long offset, int expect, int update) {  
  86.         return unsafe.compareAndSwapInt(array, offset, expect, update);  
  87. }  
  88.   
  89.     /** 
  90.      * 数组中第i个下标的元素+1,返回旧值 
  91.      */  
  92.     public final int getAndIncrement(int i) {  
  93.         return getAndAdd(i, 1);  
  94.     }  
  95.   
  96.     /** 
  97.      * 数组中第i个下标的元素-1,返回旧值 
  98.      */  
  99.     public final int getAndDecrement(int i) {  
  100.         return getAndAdd(i, -1);  
  101. }  
  102.   
  103.     /** 
  104.      * 数组中第i个下标元素+delta 并返回旧值 
  105.      */  
  106.     public final int getAndAdd(int i, int delta) {  
  107.         long offset = checkedByteOffset(i);  
  108.         while (true) {  
  109.             int current = getRaw(offset);  
  110.             if (compareAndSetRaw(offset, current, current + delta))  
  111.                 return current;  
  112.         }  
  113.     }  
  114.   
  115.     /** 
  116.      * 数组中第i个下标的元素+1,返回新值 
  117.      */  
  118.     public final int incrementAndGet(int i) {  
  119.         return addAndGet(i, 1);  
  120.     }  
  121.   
  122.     /** 
  123.      * 数组中第i个下标的元素-1,返回新值 
  124.      */  
  125.     public final int decrementAndGet(int i) {  
  126.         return addAndGet(i, -1);  
  127.     }  
  128.   
  129.     /** 
  130.      * 数组中第i个下标元素+delta 并返回新值 
  131.      */  
  132.     public final int addAndGet(int i, int delta) {  
  133.         long offset = checkedByteOffset(i);  
  134.         while (true) {  
  135.             int current = getRaw(offset);  
  136.             int next = current + delta;  
  137.             if (compareAndSetRaw(offset, current, next))  
  138.                 return next;  
  139.         }  
  140. }  
  141. }  
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值