同步容器。它的原理是将状态封装起来,并对每个公有方法都实行同步,使得每次只有1个线程能够访问容器的状态。
并发容器。java.util.concurrent包里面的一系列实现
分段锁缺陷在于虽然一般情况下只要一个锁,但是遇到需要扩容等类似的事情,只能去获取所有的锁
需要对整个容器中的内容进行计算的方法,比如size、isEmpty、contains等等。由于并发的存在,在计算的过程中可能已进过期了,它实际上就是个估计值,但是在并发的场景下,需要使用的场景是很少的。
以ConcurrentHashMap的size方法为例:
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
//为了能够算准数量,会算2次,如果两次算的不准,就锁住再算
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // 第一轮的计算总数不重试
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
//RETRIES_BEFORE_LOCK 默认是2
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//第一次计算的时候
if (sum == last)
break; //如果前后两次数数一致,就认为已经算好了
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
复制代码
每次修改都复制底层数组,存在开销,因此使用场景一般是迭代操作远多于修改操作
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
*
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
public E get(int index) {
return get(getArray(), index);
}
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}
private E get(Object[] a, int index) {
return (E) a[index];
}
复制代码
阻塞队列,BlockingQueue。它提供了put和take方法,在队列不满足各自条件时将产生阻塞
BlockingQueue使用示例,生产者-消费者
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(2000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println("Wait 1 sec");
System.out.println(queue.take());
System.out.println("Wait 2 sec");
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
输出为
1 Wait 1 sec 2 Wait 2 sec 3 复制代码
闭锁
CountDownLatch示例:
public class TestHarness{
public long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i=0;i<nThreads;i++){
Thread t = new Thread(){
public void run(){
try {
startGate.await();
try {
task.run();
}finally {
endGate.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end=System.nanoTime();
return end-start;
}
}
复制代码
启动门使主线程能够同时释放所有的工作线程,结束门使得主线程能够等待最后一个线程执行完
信号量,Semaphore 。它管理着一组虚拟的许可,许可的数量可通过构造函数指定,在执行操作时首先获得许可,并在使用后释放许可,如果没有,那么accquire将阻塞直到有许可。
Semaphore示例
public class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
this.sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
}finally {
if (!wasAdded){
sem.release();
}
}
}
public boolean remove(Object o){
boolean wasRemoved = set.remove(o);
if(wasRemoved){
sem.release();
}
return wasRemoved;
}
}
复制代码
栅栏。它能阻塞一组线程直到某个事件发生。 与闭锁的区别:
CyclicBarrier使用示例
public static void main(String[] args) {
//第k步执行完才能执行第k+1步
CyclicBarrier barrier = new CyclicBarrier(3,new StageKPlusOne());
StageK[] stageKs = new StageK[3];
for (int i=0;i<3;i++){
stageKs[i] = new StageK(barrier,"k part "+(i+1));
}
for (int i=0;i<3;i++){
new Thread(stageKs[i]).start();
}
}
class StageKPlusOne implements Runnable{
@Override
public void run() {
System.out.println("stage k over");
System.out.println("stage k+1 start counting");
}
}
class StageK implements Runnable{
private CyclicBarrier barrier;
private String stage;
public StageK(CyclicBarrier barrier, String stage) {
this.barrier = barrier;
this.stage = stage;
}
@Override
public void run() {
System.out.println("stage "+stage+" counting...");
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("stage "+stage+" count over");
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
复制代码
输出为
stage k part 1 counting... stage k part 3 counting... stage k part 2 counting... stage k part 2 count over stage k part 3 count over stage k part 1 count over stage k over stage k+1 start counting 复制代码
Exchanger。它是一种两方栅栏,各方在栅栏位置交换数据
Exchanger 使用示例:
public static void main(String[] args) {
Exchanger exchanger = new Exchanger();
ExchangerRunnable er1 = new ExchangerRunnable(exchanger,"1");
ExchangerRunnable er2 = new ExchangerRunnable(exchanger,"2");
new Thread(er1).start();
new Thread(er2).start();
}
class ExchangerRunnable implements Runnable{
private Exchanger e;
private Object o;
public ExchangerRunnable(Exchanger e, Object o) {
this.e = e;
this.o = o;
}
@Override
public void run() {
Object pre=o;
try {
o=e.exchange(o);
System.out.println("pre:"+pre+" now:"+o);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
复制代码
案例
java并发变成实战