一个进程可以有多个线程,一个线程可以有多个协程。
先说说线程跟进程:协程平时听的比较少,所以这里引用维基百科的解释:
start()方法是运行多线程的主方法,在start()方法内部,通过调用run()方法启动这个线程。run()方法用以启动一个线程,然后主线程立刻返回。该启动的线程不会马上执行,而是在等待队列中等待CPU的调度。
我们知道在操作系统中,进程有五种状态: 新建、运行、就绪、阻塞、终止。 注意这只是比较广泛的说法,不同的操作系统对进程状态定义也有不同。
但是在Java中,Thread类定义了六种线程状态,千万不要混淆了:
补充: 进入Thread.getState()方法可以发现在JVM中也定义了六种线程的状态:
public State getState() {
// get current thread state
return sun.misc.VM.toThreadState(threadStatus);
}
复制代码
sun.misc包中的VM类中的线程状态:
private static final int JVMTI_THREAD_STATE_ALIVE = 1;
private static final int JVMTI_THREAD_STATE_TERMINATED = 2;
private static final int JVMTI_THREAD_STATE_RUNNABLE = 4;
private static final int JVMTI_THREAD_STATE_BLOCKED_ON_MONITOR_ENTER = 1024;
private static final int JVMTI_THREAD_STATE_WAITING_INDEFINITELY = 16;
private static final int JVMTI_THREAD_STATE_WAITING_WITH_TIMEOUT = 32;
复制代码
synchronized({Object})可以用以锁住一个对象,同样synchronized还可以用在方法签名上。那么问题来了,对于非静态方法,synchronized很明显是锁住调用这个方法的实体,那么对于静态方法,锁住的是什么?答案是锁住的是类在JVM中所存储的Class对象。
private static List<Integer> list = Collections.synchronizedList(new ArrayList<>()); 复制代码
首先,他们都是Object类中的方法,接下来看下面这个例子:
public static void main(String[] args) {
new Thread(new Thread1()).start();
new Thread(new Thread2()).start();
new Thread(new Thread3()).start();
}
static class Thread1 extends Thread {
@Override
public void run() {
System.out.println("Thread1");
}
}
static class Thread2 extends Thread {
@Override
public void run() {
System.out.println("Thread2");
}
}
static class Thread3 extends Thread {
@Override
public void run() {
System.out.println("Thread3");
}
}
复制代码
这里我开辟了三个线程,分别去实现自己的方法。结果很明显,就是按顺序打印结果:
Thread1 Thread2 Thread3 复制代码
但是当我对Thread1使用wait()方法时,
static class Thread1 extends Thread {
@Override
public void run() {
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Thread1");
}
}
复制代码
Thread会交出自己所持有的锁,让其他进程去争夺这个锁,而自己就一直等待着其他线程的唤醒。
他就一直在这等啊,直到我们在其他线程中使用notify()方法将其唤醒:
static class Thread3 extends Thread {
@Override
public void run() {
synchronized (lock){
lock.notify();
}
System.out.println("Thread3");
}
}
复制代码
这时候,执行的顺序也改变了:
Thread2 Thread3 Thread1 复制代码
那么notifyAll()就是唤醒其他正在等待的线程,然后让他们之间重新去争夺锁:
static class Thread1 extends Thread {
@Override
public void run() {
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Thread1");
}
}
static class Thread2 extends Thread {
@Override
public void run() {
synchronized (lock){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Thread2");
}
}
static class Thread3 extends Thread {
@Override
public void run() {
synchronized (lock){
lock.notifyAll();
}
System.out.println("Thread3");
}
}
复制代码
结果:
Thread1 Thread2 Thread3 复制代码
注意:使用Object的wait(),notify(),notifyAll()方法都需要持有这个对象的监视器,就是代码中的synchronized()。java doc中的原话:
* This method should only be called by a thread that is the owner * of this object's monitor. 复制代码
否则会报出 IllegalMonitorStateException 异常。
由于Java的线程调度完全依赖于操作系统,所以每个线程都会占用一定的资源,进而我们不可能随心所欲的去开辟线程。所以,这时候就需要线程池了,线程池就是预先在内存中开辟一块资源,专门用于线程的调度。当需要线程执行任务时,就通过线程池管理器来实现线程的分配。
java.util.concurrent包中的Executors类中,封装了四种开辟线程池的方法。
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(" ");
}
});
System.out.println(submit.get());
//线程池使用完毕后,必须关闭
executorService.shutdown();
复制代码
程序允许完毕,jvm会等待非守护线程完成后关闭,但是jvm不会等待守护线程关闭,守护线程最典型的例子就是GC进程。
像下面这种情况锁与锁之间相互竞争的情况:
public class MultiplyThreadTest {
private static final Object lock = new Object();
private static final Object lock1 = new Object();
public static void main(String[] args) {
new ThreadClass1().start();
new ThreadClass2().start();
}
static class ThreadClass1 extends Thread {
@Override
public void run() {
synchronized (lock) {
try {
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock1) {
System.out.println("Thread1");
}
}
}
}
static class ThreadClass2 extends Thread {
@Override
public void run() {
synchronized (lock1) {
synchronized (lock) {
System.out.println("Thread2");
}
}
}
}
}
复制代码
其实我刚开始写出的来的时候,我自己都有疑问:
其实这地方,涉及到一个冷门的知识点,可以参考stackoverflow的 回答 ,那就是绝大部分的操作系统的时间精度都在10ms,所以看上去是sleep(0),但由于精度问题,实际上不是睡0ms。另外一个,不加 Thread.sleep(0) 就不会产生死锁的原因在于,线程执行这两个简单的方法的过程其实是非常短暂的,所以锁之间根本来不及相互竞争。
jsp 命令查看哪些进程正在执行:
jstack 17424 > ~/Desktop/1.txt 复制代码
可以看到日志中,有详细的死锁信息。
public class ProducerConsumer1 {
private static final Object lock = new Object();
private static Optional<Integer> optional = Optional.empty();
public static void main(String[] args) throws InterruptedException {
Container container = new Container(optional);
Producer producer = new Producer(container);
Consumer consumer = new Consumer(container);
producer.start();
consumer.start();
producer.join();
producer.join();
}
public static class Producer extends Thread {
Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
while (container.getOptional().isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer integer = new Random().nextInt();
System.out.println("Product:" + integer);
container.setOptional(Optional.of(integer));
lock.notify();
}
}
}
}
public static class Consumer extends Thread {
Container container;
public Consumer(Container container) {
this.container = container;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
while (!container.getOptional().isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Consume:" + container.getOptional().get());
container.setOptional(Optional.empty());
lock.notify();
}
}
}
}
}
复制代码
虽然看起来代码挺多的,但其实就是一个思想,就是生产者在生产之前,先检查容器中是否为空,如果不为空就等待,为空才生产。同样,消费者也需要在容器中有值的情况下才会消费,否则就等待。
借鉴Condition接口中的提示:
* public void put(Object x) throws InterruptedException {
* <b>lock.lock();
* try {</b>
* while (count == items.length)
* <b>notFull.await();</b>
* items[putptr] = x;
* if (++putptr == items.length) putptr = 0;
* ++count;
* <b>notEmpty.signal();</b>
* <b>} finally {
* lock.unlock();
* }</b>
* }
复制代码
代码如下:
public class ProducerConsumer2 {
private static final Lock lock = new ReentrantLock();
private static final Condition notConsumer = lock.newCondition();
private static final Condition notProduct = lock.newCondition();
private static Optional<Integer> optional = Optional.empty();
public static void main(String[] args) throws InterruptedException {
Container container = new Container(optional);
Producer producer = new Producer(container);
Consumer consumer = new Consumer(container);
producer.start();
consumer.start();
producer.join();
producer.join();
}
public static class Producer extends Thread {
private Container container;
public Producer(Container container) {
this.container = container;
}
@Override
public void run() {
lock.lock();
try {
while (container.getOptional().isPresent()) {
try {
//注意:是await()而不是wait()
notConsumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int random = new Random().nextInt();
System.out.println("Product" + random);
container.setOptional(Optional.of(random));
notProduct.signal();
} finally {
lock.unlock();
}
}
}
public static class Consumer extends Thread {
private Container container;
public Consumer(Container container) {
this.container = container;
}
@Override
public void run() {
try {
lock.lock();
while (!container.getOptional().isPresent()) {
try {
notProduct.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(container.getOptional().get());
container.setOptional(Optional.empty());
notConsumer.notify();
} finally {
lock.unlock();
}
}
}
}
复制代码
与第一种方法类似,这里增加了Condition条件判断,并用到了ReentrantLock。
public class ProducerConsumer3 {
private static final BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(1);
private static final BlockingQueue<Integer> signal = new LinkedBlockingDeque<>(1);
public static void main(String[] args) throws InterruptedException {
Producer producer = new Producer(queue, signal);
Consumer consumer = new Consumer(queue, signal);
producer.start();
consumer.start();
producer.join();
producer.join();
}
public static class Producer extends Thread {
BlockingQueue<Integer> queue;
BlockingQueue<Integer> signal;
public Producer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) {
this.queue = queue;
this.signal = signal;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
int random = new Random().nextInt();
System.out.println("Product:" + random);
try {
queue.put(random);
signal.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static class Consumer extends Thread {
BlockingQueue<Integer> queue;
BlockingQueue<Integer> signal;
public Consumer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) {
this.queue = queue;
this.signal = signal;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("Consumer:" + queue.take());
signal.put(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
复制代码
这种方法就比较简单了,不用判断容器是否为空,因为BlockingQueue在内部为我们实现了判断,直接拿来用就可以了。这里需要注意一点,由于take()与put()方法几乎是同时执行的,所有需要加一个signal标志信息,避免乱序。
Container容器:
public class Container {
Optional<Integer> optional;
public Container(Optional<Integer> optional) {
this.optional = optional;
}
public Optional<Integer> getOptional() {
return optional;
}
public void setOptional(Optional<Integer> optional) {
this.optional = optional;
}
}
复制代码