所有示例代码,请见/下载于
https://github.com/Wasabi1234/concurrency
 
 
   
 
   
 
 #1 基本概念
##1.1 并发
同时拥有两个或者多个线程,如果程序在单核处理器上运行多个线程将交替地换入或者换出内存,这些线程是同时“存在"的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上,此时,程序中的每个线程都将分配到一个处理器核上,因此可以同时运行.
##1.2 高并发( High Concurrency)
互联网分布式系统架构设计中必须考虑的因素之一,通常是指,通过设计保证系统能够同时并行处理很多请求.
##1.3 区别与联系
 
  
  #3 项目准备
 #3 项目准备  
  
  ##3.2 并发模拟-Jmeter压测
 ##3.2 并发模拟-Jmeter压测  
  
  
  
  ##3.3 并发模拟-代码
 ##3.3 并发模拟-代码  ###Semaphore(信号量)
 ###Semaphore(信号量)  以上二者通常和线程池搭配
 以上二者通常和线程池搭配 下面开始做并发模拟
package com.mmall.concurrency;
import com.mmall.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * @author shishusheng
 * @date 18/4/1
 */
@Slf4j
@NotThreadSafe
public class ConcurrencyTest {
    /**
     * 请求总数
     */
    public static int clientTotal = 5000;
    /**
     * 同时并发执行的线程数
     */
    public static int threadTotal = 200;
    public static int count = 0;
    public static void main(String[] args) throws Exception {
        //定义线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,给出允许并发的线程数目
        final Semaphore semaphore = new Semaphore(threadTotal);
        //统计计数结果
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        //将请求放入线程池
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    //信号量的获取
                    semaphore.acquire();
                    add();
                    //释放
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        //关闭线程池
        executorService.shutdown();
        log.info("count:{}", count);
    }
    /**
     * 统计方法
     */
    private static void add() {
        count++;
    }
} 
 运行发现结果随机,所以非线程安全
#4线程安全性
##4.1 线程安全性
 当多个线程访问某个类时,不管运行时环境采用 何种调度方式 或者这些进程将如何交替执行,并且在主调代码中 不需要任何额外的同步或协同 ,这个类都能表现出 正确的行为 ,那么就称这个类是线程安全的 
##4.2 原子性
###4.2.1 Atomic 包
AtomicXXX:CAS,Unsafe.compareAndSwapInt
提供了互斥访问,同一时刻只能有一个线程来对它进行操作
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
/**
br/>*/
@Slf4j
@ThreadSafepublic class AtomicExample2 {
/**
请求总数
*/
public static int clientTotal = 5000;
/**
同时并发执行的线程数
*/
public static int threadTotal = 200;
/**
工作内存
*/
public static AtomicLong count = new AtomicLong(0);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
System.out.println();
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
//主内存
log.info("count:{}", count.get());
}
private static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
/**
 * @author shishusheng
 * @date 18/4/3
 */
@Slf4j
@ThreadSafe
public class AtomicExample4 {
    private static AtomicReference<Integer> count = new AtomicReference<>(0);
    public static void main(String[] args) {
        // 2
        count.compareAndSet(0, 2);
        // no
        count.compareAndSet(0, 1);
        // no
        count.compareAndSet(1, 3);
        // 4
        count.compareAndSet(2, 4);
        // no
        count.compareAndSet(3, 5); 
        log.info("count:{}", count.get());
    }
} 
   
 
 
 AtomicBoolean
 
 
    
  
 package com.mmall.concurrency.example.count;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
br/>*/
@Slf4j
@ThreadSafepublic class CountExample3 {
/**
请求总数
*/
public static int clientTotal = 5000;
/**
同时并发执行的线程数
*/
public static int threadTotal = 200;
public static int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private synchronized static void add() {
count++;
}
}
synchronized 修正计数类方法 - 修饰类:括号括起来的部分,作用于所有对象 子类继承父类的被 synchronized 修饰方法时,是没有 synchronized 修饰的!!!
Lock: 依赖特殊的 CPU 指令,代码实现
###4.2.3 对比
加锁与解锁是同一把锁 )  
  
  
 volatile boolean inited = false;
//线程1:
context = loadContext();
inited= true;
// 线程2:
while( !inited ){
sleep();
}
doSomethingWithConfig(context)
##4.4 有序性 一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序 JMM允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性 ###4.4.1 happens-before 规则 #5发布对象    ##5.1 安全发布对象    
package com.mmall.concurrency.example.singleton;
import com.mmall.concurrency.annoations.NotThreadSafe;
/**
br/>*/
@NotThreadSafe
public class SingletonExample4 {/**
私有构造函数
*/
private SingletonExample4() {
}
// 1、memory = allocate() 分配对象的内存空间
// 2、ctorInstance() 初始化对象
// 3、instance = memory 设置instance指向刚分配的内存
// JVM和cpu优化,发生了指令重排
// 1、memory = allocate() 分配对象的内存空间
// 3、instance = memory 设置instance指向刚分配的内存
// 2、ctorInstance() 初始化对象
/**
单例对象
*/
private static SingletonExample4 instance = null;
/**
}


#7 AQS
##7.1 介绍

- 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
- 利用了一个int类型表示状态
- 使用方法是继承
- 子类通过继承并通过实现它的方法管理其状态{acquire 和release} 的方法操纵状态
- 可以同时实现排它锁和共享锁模式(独占、共享)
同步组件
###CountDownLatch package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
br/>*/
@Slf4j
public class CountDownLatchExample1 {private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;
    exec.execute(() -> {
        try {
            test(threadNum);
        } catch (Exception e) {
            log.error("exception", e);
        } finally {
            countDownLatch.countDown();
        }
    });
}
countDownLatch.await();
log.info("finish");
exec.shutdown(); }
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
br/>@Slf4j
public class CountDownLatchExample2 {private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;
    exec.execute(() -> {
        try {
            test(threadNum);
        } catch (Exception e) {
            log.error("exception", e);
        } finally {
            countDownLatch.countDown();
        }
    });
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown(); }
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
##Semaphore用法    ##CycliBarrier
package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
br/>*/
@Slf4j
public class CyclicBarrierExample1 {private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    final int threadNum = i;
    Thread.sleep(1000);
    executor.execute(() -> {
        try {
            race(threadNum);
        } catch (Exception e) {
            log.error("exception", e);
        }
    });
}
executor.shutdown(); }
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}

package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
br/>*/
@Slf4j
public class CyclicBarrierExample2 {private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    final int threadNum = i;
    Thread.sleep(1000);
    executor.execute(() -> {
        try {
            race(threadNum);
        } catch (Exception e) {
            log.error("exception", e);
        }
    });
}
executor.shutdown(); }
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}

package com.mmall.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
br/>*/
@Slf4j
public class SemaphoreExample3 {private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;
    exec.execute(() -> {
        try {
            // 尝试获取一个许可
            if (semaphore.tryAcquire()) {
                test(threadNum);
                // 释放一个许可
                semaphore.release();
            }
        } catch (Exception e) {
            log.error("exception", e);
        }
    });
}
exec.shutdown(); }
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
#9 线程池 ##9.1 newCachedThreadPool  ##9.2 newFixedThreadPool  ##9.3 newSingleThreadExecutor 看出是顺序执行的  ##9.4 newScheduledThreadPool   #10 死锁  