前言
生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。生产者消费者模式,主要探讨的问题是,两个线程(生产者与消费者),一块共享区域,生产者负责生产同时操作共享区域,消费者负责消费同时操作共享区域,在这个过程中,生产者与消费者没有任何耦合,互相不关心对方的行为,那么这种情况下,考虑如下场景:
假如共享区域有商品5个,消费者消耗一个商品后去检查数量时,消费者期待的是还剩下4个,但是由于没有做线程并发协作,在消耗完商品后的时间点,生产者刚好生产好一个商品并要将商品数量+1,在+1之后,消费者才去检查数量,这时候的数量还是5与他的期望不符,这就是问题所在,我们讨论的也都是要解决这个问题。
当然,除了上述假设情况之外,我们还得考虑,共享区域的数据变化如何阻塞及唤醒线程
- 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内
- 如果共享数据区为空的话,阻塞消费者继续消费数据
其实说到底,就是要实现线程同步,java中实现线程同步的方法主要有 Sychronized,对应的要使用Object的wait和notify机制Lock,对应的要使用Lock的Condition的await/signal机制
另外,使用BlockingQueue也能实现生产者消费者模式
wait notify机制
java的Object类里有这么几个方法至关重要
- wait
该方法用来将当前线程置入休眠状态,直到接到这个对象调用了notify或者notifyAll,或者超过一个指定的超时时长。
当前线程必须拥有这个对象的monitor,也就是必须获得这个对象的锁才能调用,否则会报java.lang.IllegalMonitorStateException错误。
该方法会将当前线程(就是获得锁的那个,假设为线程A)放进这个对象的wait set中,并且线程A会放弃所有获取对该对象锁的主张,线程A停止执行并进入休眠,直到下面四个情况之一出现:- 其他线程调用该对象的notify,并且线程A刚好被选中唤醒(是否被选中与操作系统实现有关)
- 其他线程调用该对象的notifyAll
- 其他线程打断线程A,中断线程会抛出InterruptedException异常,因此,sleep和wait都要catch或者抛出这个异常
- 指定的超时时长已经过去,如果超时时长为0,那么线程会直接进入休眠
线程A被唤醒后会和其他被唤醒的线程竞争获得对象锁,一旦成功获得锁,线程A的锁信息就会全部记录下来,然后线程A就可以从wait中返回,并继续执行下去。
wait只能出现在循环中,因为唤醒的时候可能还是不满足执行条件
如果在线程wait前或者wait中被打断,那么会抛出一个InterruptedException异常,并且直到对象锁状态被重新存储后才会抛出。
在调用 wait()之前,线程必须要获得该对象的对象监视器锁,即只能在同步方法或同步块中调用 wait()方法。调用wait()方法之后,当前线程会释放锁。如果调用wait()方法时,线程并未获取到锁的话,则会抛出IllegalMonitorStateException异常,这是以个RuntimeException。如果再次获取到锁的话,当前线程才能从wait()方法处成功返回。
- wait(long timeout)
timeout时间后会唤醒线程,重新竞争锁。timeout是最长时间,具体时长会有误差 wait(long timeout, long nanos)
文档里说是为了更精确的唤醒时间1000000*timeout+nanos,但是代码里这么写的(java 1.8),看不出来哪里精确了1
2
3
4
5
6
7
8
9
10
11
12
13public final void wait(long timeout, int nanos) throws InterruptedException {
if (timeout < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (nanos < 0 || nanos > 999999) {
throw new IllegalArgumentException(
"nanosecond timeout value out of range");
}
if (nanos > 0) {
timeout++;
}
wait(timeout);
}notify()
在等待队列中唤醒一个线程,选择规则由系统具体实现,被唤醒的线程要和其他线程公平竞争,重新获得锁才能继续执行下去。该方法只能在当前线程获得锁的情况下才能调用,也就是说必须在Sychronized代码中。一次只能有一个线程拥有对象的监视器。- notifyAll
该方法与 notify ()方法的工作方式相同,重要的一点差异是:
notifyAll 使所有原来在该对象上 wait 的线程统统退出WAITTING状态,使得他们全部从等待队列中移入到同步队列中去,等待下一次能够有机会获取到对象监视器锁。wait notify实现的生产者消费者模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87package com.zgq.java;
import com.zgq.Algorithm;
import com.zgq.Utils;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
/**
* Created by zgq on 2019/4/6.
*/
public class ProducerConsumer implements Algorithm{
public static final Object LOCK = new Object();
public static int count = 0;
public static final int FULL = 10;
@Override
public void execut() {
for (int i = 0; i < 5; i++){
new Thread(new Producer("producer" + i)).start();
}
for (int i = 0; i < 5; i++){
new Thread(new Consumer("consumer" + i)).start();
}
}
public static class Producer implements Runnable {
private String name;
public Producer(String name){
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
synchronized (LOCK) {
Utils.printString(name + " lock");
try {
while (count >= FULL) {
Utils.printString(name + " wait");
LOCK.wait();
}
Thread.sleep(500);
count++;
Utils.printString(name + " product 1 " + " now is " + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.notifyAll();
Utils.printString(name + " unlock");
}
}
}
}
}
public static class Consumer implements Runnable {
public String name;
public Consumer(String name){
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
synchronized (LOCK) {
Utils.printString(name + " lock");
try {
while (count <= 0) {
Utils.printString(name + " wait");
LOCK.wait();
}
Thread.sleep(1000);
count--;
Utils.printString(name + " consume 1 " + " now is " + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.notifyAll();
Utils.printString(name + " unlock");
}
}
}
}
}
}
Lock
使用Lock实现消费者生产者模式与wait-notify相差不多,Lock中Condition的await/signalAll可以达到相同的效果,Lock具体用法这里不深究,具体可以看下篇博客。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93package com.zgq.java;
import com.zgq.Algorithm;
import com.zgq.Utils;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by zgq on 2019/4/6.
*/
public class ProducerConsumer1 implements Algorithm {
public static final Lock LOCK = new ReentrantLock();
public static Condition NOTFULL = LOCK.newCondition();
public static Condition NOTEMPTY = LOCK.newCondition();
public static int count = 0;
public static final int FULL = 10;
@Override
public void execut() {
for (int i = 0; i < 5; i++) {
new Thread(new Producer("producer" + i)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(new Consumer("consumer" + i)).start();
}
}
public static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
LOCK.lock();
Utils.printString(name + " lock");
try {
while (count >= FULL) {
Utils.printString(name + " wait");
NOTFULL.await();
}
Thread.sleep(20);
count++;
Utils.printString(name + " product 1 " + " now is " + count);
NOTEMPTY.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
Utils.printString(name + " unlock");
}
}
}
}
public static class Consumer implements Runnable {
public String name;
public Consumer(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 4; i++) {
LOCK.lock();
Utils.printString(name + " lock");
try {
while (count <= 0) {
Utils.printString(name + " wait");
NOTEMPTY.await();
}
Thread.sleep(50);
count--;
Utils.printString(name + " consume 1 " + " now is " + count);
NOTFULL.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
Utils.printString(name + " unlock");
}
}
}
}
}
BlockingQueue实现
BlockingQueue是一个接口,LinkedBlockingDeque是他的一个实现类,我们看下部分实现代码1
2
3
4
5
6
7
8
9
10
11
12public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
其实就是lock实现
实现
这里用了BlockingQueue的具体实现类LinkedBlockingDeque。链表实现。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76package com.zgq.java;
import com.zgq.Algorithm;
import com.zgq.Utils;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by zgq on 2019/4/6.
*/
public class ProducerConsumer2 implements Algorithm {
public static BlockingQueue storage = new LinkedBlockingDeque<>();
public static final int FULL = 10;
@Override
public void execut() {
for (int i = 0; i < 5; i++) {
new Thread(new Producer("producer" + i)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(new Consumer("consumer" + i)).start();
}
}
public static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(20);
if (storage.size() < FULL) {
Random random = new Random();
int k = random.nextInt();
storage.add(k);
}
Utils.printString(name + " product 1 " + " now is " + storage.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static class Consumer implements Runnable {
public String name;
public Consumer(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 4; i++) {
try {
Thread.sleep(50);
if (storage.size() > 0) {
storage.take();
}
Utils.printString(name + " consume 1 " + " now is " + storage.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
可以看出,使用BlockingQueue来实现生产者-消费者很简洁,这正是利用了BlockingQueue插入和获取数据附加阻塞操作的特性。