转载

聊聊原子操作类AtomicInteger

什么需要AtomicInteger原子操作类?

对于Java中的运算操作,例如自增或自减,若没有进行额外的同步操作,在多线程环境下就是线程不安全的。num++解析为num=num+1,明显,这个操作不具备原子性,多线程并发共享这个变量时必然会出现问题。测试代码如下:
public class AtomicIntegerTest {
 
 private static final int THREADS_CONUT = 20;
 public static int count = 0;
 
 public static void increase() {
 count++;
 }
 
 public static void main(String[] args) {
 Thread[] threads = new Thread[THREADS_CONUT];
 for (int i = 0; i < THREADS_CONUT; i++) {
 threads[i] = new Thread(new Runnable() {
 @Override
 public void run() {
 for (int i = 0; i < 1000; i++) {
 increase();
 }
 }
 });
 threads[i].start();
 }
 
 while (Thread.activeCount() > 1) {
 Thread.yield();
 }
 System.out.println(count);
 }
}
这里运行了20个线程,每个线程对count变量进行1000此自增操作,如果上面这段代码能够正常并发的话,最后的结果应该是20000才对,但实际结果却发现每次运行的结果都不相同,都是一个小于20000的数字。这是为什么呢?

要是换成volatile修饰count变量呢?

顺带说下volatile关键字很重要的两个特性:
  1. 保证变量在线程间可见,对volatile变量所有的写操作都能立即反应到其他线程中,换句话说,volatile变量在各个线程中是一致的(得益于java内存模型—"先行发生原则");
  2. 禁止指令的重排序优化;
那么换成volatile修饰count变量后,会有什么效果呢? 试一试:
public class AtomicIntegerTest {
 
 private static final int THREADS_CONUT = 20;
 public static volatile int count = 0;
 
 public static void increase() {
 count++;
 }
 
 public static void main(String[] args) {
 Thread[] threads = new Thread[THREADS_CONUT];
 for (int i = 0; i < THREADS_CONUT; i++) {
 threads[i] = new Thread(new Runnable() {
 @Override
 public void run() {
 for (int i = 0; i < 1000; i++) {
 increase();
 }
 }
 });
 threads[i].start();
 }
 
 while (Thread.activeCount() > 1) {
 Thread.yield();
 }
 System.out.println(count);
 }
}
结果似乎又失望了,测试结果和上面的一致,每次都是输出小于20000的数字。这又是为什么么? 上面的论据是正确的,也就是上面标红的内容,但是这个论据并不能得出"基于volatile变量的运算在并发下是安全的"这个结论,因为核心点在于java里的运算(比如自增)并不是原子性的。

用了AtomicInteger类后会变成什么样子呢?

把上面的代码改造成AtomicInteger原子类型,先看看效果
import java.util.concurrent.atomic.AtomicInteger;
 
public class AtomicIntegerTest {
 
 private static final int THREADS_CONUT = 20;
 public static AtomicInteger count = new AtomicInteger(0);
 
 public static void increase() {
 count.incrementAndGet();
 }
 
 public static void main(String[] args) {
 Thread[] threads = new Thread[THREADS_CONUT];
 for (int i = 0; i < THREADS_CONUT; i++) {
 threads[i] = new Thread(new Runnable() {
 @Override
 public void run() {
 for (int i = 0; i < 1000; i++) {
 increase();
 }
 }
 });
 threads[i].start();
 }
 
 while (Thread.activeCount() > 1) {
 Thread.yield();
 }
 System.out.println(count);
 }
}
结果每次都输出20000,程序输出了正确的结果,这都归功于AtomicInteger.incrementAndGet()方法的原子性。

非阻塞同步

同步:多线程并发访问共享数据时,保证共享数据再同一时刻只被一个或一些线程使用。 我们知道,阻塞同步和非阻塞同步都是实现线程安全的两个保障手段,非阻塞同步对于阻塞同步而言主要解决了阻塞同步中线程阻塞和唤醒带来的性能问题,那什么叫做非阻塞同步呢?在并发环境下,某个线程对共享变量先进行操作,如果没有其他线程争用共享数据那操作就成功;如果存在数据的争用冲突,那就才去补偿措施,比如不断的重试机制,直到成功为止,因为这种乐观的并发策略不需要把线程挂起,也就把这种同步操作称为非阻塞同步(操作和冲突检测具备原子性)。在硬件指令集的发展驱动下,使得 "操作和冲突检测" 这种看起来需要多次操作的行为只需要一条处理器指令便可以完成,这些指令中就包括非常著名的CAS指令(Compare-And-Swap比较并交换)。《深入理解Java虚拟机第二版.周志明》第十三章中这样描述关于CAS机制: 20190111101332407所以再返回来看AtomicInteger.incrementAndGet()方法,它的时间也比较简单
/**
 * Atomically increments by one the current value.
 *
 * @return the updated value
 */
 public final int incrementAndGet() {
 for (;;) {
 int current = get();
 int next = current + 1;
 if (compareAndSet(current, next))
 return next;
 }
 }
incrementAndGet()方法在一个无限循环体内,不断尝试将一个比当前值大1的新值赋给自己,如果失败则说明在执行"获取-设置"操作的时已经被其它线程修改过了,于是便再次进入循环下一次操作,直到成功为止。这个便是AtomicInteger原子性的"诀窍"了,继续进源码看它的compareAndSet方法:
/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return true if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
 public final boolean compareAndSet(int expect, int update) {
 return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
 }
可以看到,compareAndSet()调用的就是Unsafe.compareAndSwapInt()方法,即Unsafe类的CAS操作。 使用示例如下图,用于标识程序执行过程中是否发生了异常,使用quartz实现高级定制化定时任务(包含管理界面)实现中: 20190202100809150

补充如下内容:

原子类相比于普通的锁,粒度更细、效率更高(除了高度竞争的情况下) 如果对于上面的示例代码中使用了thread.yield()之类的方法不清晰的,可以直接看下面的代码压测:
public class AtomicIntegerTest implements Runnable {
 
 static AtomicInteger atomicInteger = new AtomicInteger(0);
 
 static int commonInteger = 0;
 
 public void addAtomicInteger() {
 atomicInteger.getAndIncrement();
 }
 
 public void addCommonInteger() {
 commonInteger++;
 }
 
 @Override
 public void run() {
 //可以调大10000看效果更明显
 for (int i = 0; i < 10000; i++) {
 addAtomicInteger();
 addCommonInteger();
 }
 }
 
 public static void main(String[] args) throws InterruptedException {
 AtomicIntegerTest atomicIntegerTest = new AtomicIntegerTest();
 Thread thread1 = new Thread(atomicIntegerTest);
 Thread thread2 = new Thread(atomicIntegerTest);
 thread1.start();
 thread2.start();
 //join()方法是为了让main主线程等待thread1、thread2两个子线程执行完毕
 thread1.join();
 thread2.join();
 System.out.println("AtomicInteger add result = " + atomicInteger.get());
 System.out.println("CommonInteger add result = " + commonInteger);
 }
}
原子类一览图参考如下: 20200212180741122 如何把普通变量升级为原子变量?主要是AtomicIntegerFieldUpdater<T>类,参考如下代码:
/**
 * @description 将普通变量升级为原子变量
 **/
public class AtomicIntegerFieldUpdaterTest implements Runnable {
 
 static Goods phone;
 static Goods computer;
 
 AtomicIntegerFieldUpdater<Goods> atomicIntegerFieldUpdater =
 AtomicIntegerFieldUpdater.newUpdater(Goods.class, "price");
 
 @Override
 public void run() {
 for (int i = 0; i < 10000; i++) {
 phone.price++;
 atomicIntegerFieldUpdater.getAndIncrement(computer);
 }
 }
 
 static class Goods {
 //商品定价
 volatile int price;
 }
 
 public static void main(String[] args) throws InterruptedException {
 phone = new Goods();
 computer = new Goods();
 AtomicIntegerFieldUpdaterTest atomicIntegerFieldUpdaterTest = new AtomicIntegerFieldUpdaterTest();
 Thread thread1 = new Thread(atomicIntegerFieldUpdaterTest);
 Thread thread2 = new Thread(atomicIntegerFieldUpdaterTest);
 thread1.start();
 thread2.start();
 //join()方法是为了让main主线程等待thread1、thread2两个子线程执行完毕
 thread1.join();
 thread2.join();
 System.out.println("CommonInteger price = " + phone.price);
 System.out.println("AtomicInteger price = " + computer.price);
 }
}
在高并发情况下,LongAdder(累加器)比AtomicLong原子操作效率更高,LongAdder累加器是java8新加入的,参考以下压测代码:
/**
 * @description 压测AtomicLong的原子操作性能
 **/
public class AtomicLongTest implements Runnable {
 
 private static AtomicLong atomicLong = new AtomicLong(0);
 
 @Override
 public void run() {
 for (int i = 0; i < 10000; i++) {
 atomicLong.incrementAndGet();
 }
 }
 
 public static void main(String[] args) {
 ExecutorService es = Executors.newFixedThreadPool(30);
 long start = System.currentTimeMillis();
 for (int i = 0; i < 10000; i++) {
 es.submit(new AtomicLongTest());
 }
 es.shutdown();
 //保证任务全部执行完
 while (!es.isTerminated()) { }
 long end = System.currentTimeMillis();
 System.out.println("AtomicLong add 耗时=" + (end - start));
 System.out.println("AtomicLong add result=" + atomicLong.get());
 }
}
/**
 * @description 压测LongAdder的原子操作性能
 **/
public class LongAdderTest implements Runnable {
 
 private static LongAdder longAdder = new LongAdder();
 
 @Override
 public void run() {
 for (int i = 0; i < 10000; i++) {
 longAdder.increment();
 }
 }
 
 public static void main(String[] args) {
 ExecutorService es = Executors.newFixedThreadPool(30);
 long start = System.currentTimeMillis();
 for (int i = 0; i < 10000; i++) {
 es.submit(new LongAdderTest());
 }
 es.shutdown();
 //保证任务全部执行完
 while (!es.isTerminated()) {
 }
 long end = System.currentTimeMillis();
 System.out.println("LongAdder add 耗时=" + (end - start));
 System.out.println("LongAdder add result=" + longAdder.sum());
 }
}
在高度并发竞争情形下,AtomicLong每次进行add都需要flush和refresh(这一块涉及到java内存模型中的工作内存和主内存的,所有变量操作只能在工作内存中进行,然后写回主内存,其它线程再次读取新值),每次add()都需要同步,在高并发时会有比较多冲突,比较耗时导致效率低;而LongAdder中每个线程会维护自己的一个计数器,在最后执行LongAdder.sum()方法时候才需要同步,把所有计数器全部加起来,不需要flush和refresh操作。
正文到此结束
Loading...