生产者消费者模式

前言

生产者-消费者模式是一个十分经典的多线程并发协作的模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。生产者消费者模式,主要探讨的问题是,两个线程(生产者与消费者),一块共享区域,生产者负责生产同时操作共享区域,消费者负责消费同时操作共享区域,在这个过程中,生产者与消费者没有任何耦合,互相不关心对方的行为,那么这种情况下,考虑如下场景:
假如共享区域有商品5个,消费者消耗一个商品后去检查数量时,消费者期待的是还剩下4个,但是由于没有做线程并发协作,在消耗完商品后的时间点,生产者刚好生产好一个商品并要将商品数量+1,在+1之后,消费者才去检查数量,这时候的数量还是5与他的期望不符,这就是问题所在,我们讨论的也都是要解决这个问题。
当然,除了上述假设情况之外,我们还得考虑,共享区域的数据变化如何阻塞及唤醒线程

  1. 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内
  2. 如果共享数据区为空的话,阻塞消费者继续消费数据
    其实说到底,就是要实现线程同步,java中实现线程同步的方法主要有
  3. Sychronized,对应的要使用Object的wait和notify机制
  4. Lock,对应的要使用Lock的Condition的await/signal机制
    另外,使用BlockingQueue也能实现生产者消费者模式

wait notify机制

java的Object类里有这么几个方法至关重要

  1. wait
    该方法用来将当前线程置入休眠状态,直到接到这个对象调用了notify或者notifyAll,或者超过一个指定的超时时长。
    当前线程必须拥有这个对象的monitor,也就是必须获得这个对象的锁才能调用,否则会报java.lang.IllegalMonitorStateException错误。
    该方法会将当前线程(就是获得锁的那个,假设为线程A)放进这个对象的wait set中,并且线程A会放弃所有获取对该对象锁的主张,线程A停止执行并进入休眠,直到下面四个情况之一出现:
    • 其他线程调用该对象的notify,并且线程A刚好被选中唤醒(是否被选中与操作系统实现有关)
    • 其他线程调用该对象的notifyAll
    • 其他线程打断线程A,中断线程会抛出InterruptedException异常,因此,sleep和wait都要catch或者抛出这个异常
    • 指定的超时时长已经过去,如果超时时长为0,那么线程会直接进入休眠
      线程A被唤醒后会和其他被唤醒的线程竞争获得对象锁,一旦成功获得锁,线程A的锁信息就会全部记录下来,然后线程A就可以从wait中返回,并继续执行下去。
      wait只能出现在循环中,因为唤醒的时候可能还是不满足执行条件
      如果在线程wait前或者wait中被打断,那么会抛出一个InterruptedException异常,并且直到对象锁状态被重新存储后才会抛出。
      在调用 wait()之前,线程必须要获得该对象的对象监视器锁,即只能在同步方法或同步块中调用 wait()方法。调用wait()方法之后,当前线程会释放锁。如果调用wait()方法时,线程并未获取到锁的话,则会抛出IllegalMonitorStateException异常,这是以个RuntimeException。如果再次获取到锁的话,当前线程才能从wait()方法处成功返回。
  2. wait(long timeout)
    timeout时间后会唤醒线程,重新竞争锁。timeout是最长时间,具体时长会有误差
  3. wait(long timeout, long nanos)
    文档里说是为了更精确的唤醒时间1000000*timeout+nanos,但是代码里这么写的(java 1.8),看不出来哪里精确了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public final void wait(long timeout, int nanos) throws InterruptedException {
    if (timeout < 0) {
    throw new IllegalArgumentException("timeout value is negative");
    }
    if (nanos < 0 || nanos > 999999) {
    throw new IllegalArgumentException(
    "nanosecond timeout value out of range");
    }
    if (nanos > 0) {
    timeout++;
    }
    wait(timeout);
    }
  4. notify()
    在等待队列中唤醒一个线程,选择规则由系统具体实现,被唤醒的线程要和其他线程公平竞争,重新获得锁才能继续执行下去。该方法只能在当前线程获得锁的情况下才能调用,也就是说必须在Sychronized代码中。一次只能有一个线程拥有对象的监视器。

  5. notifyAll
    该方法与 notify ()方法的工作方式相同,重要的一点差异是:
    notifyAll 使所有原来在该对象上 wait 的线程统统退出WAITTING状态,使得他们全部从等待队列中移入到同步队列中去,等待下一次能够有机会获取到对象监视器锁。
    wait notify实现的生产者消费者模式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    package com.zgq.java;

    import com.zgq.Algorithm;
    import com.zgq.Utils;

    import java.util.Objects;
    import java.util.concurrent.locks.Lock;

    /**
    * Created by zgq on 2019/4/6.
    */
    public class ProducerConsumer implements Algorithm{

    public static final Object LOCK = new Object();
    public static int count = 0;
    public static final int FULL = 10;

    @Override
    public void execut() {
    for (int i = 0; i < 5; i++){
    new Thread(new Producer("producer" + i)).start();
    }
    for (int i = 0; i < 5; i++){
    new Thread(new Consumer("consumer" + i)).start();
    }

    }

    public static class Producer implements Runnable {
    private String name;
    public Producer(String name){
    this.name = name;
    }

    @Override
    public void run() {
    for (int i = 0; i < 5; i++) {
    synchronized (LOCK) {
    Utils.printString(name + " lock");
    try {
    while (count >= FULL) {
    Utils.printString(name + " wait");
    LOCK.wait();
    }
    Thread.sleep(500);
    count++;
    Utils.printString(name + " product 1 " + " now is " + count);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    LOCK.notifyAll();
    Utils.printString(name + " unlock");
    }
    }
    }
    }
    }

    public static class Consumer implements Runnable {
    public String name;
    public Consumer(String name){
    this.name = name;
    }
    @Override
    public void run() {
    for (int i = 0; i < 5; i++) {
    synchronized (LOCK) {
    Utils.printString(name + " lock");
    try {
    while (count <= 0) {
    Utils.printString(name + " wait");
    LOCK.wait();
    }
    Thread.sleep(1000);
    count--;
    Utils.printString(name + " consume 1 " + " now is " + count);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    LOCK.notifyAll();
    Utils.printString(name + " unlock");
    }
    }
    }
    }
    }
    }

Lock

使用Lock实现消费者生产者模式与wait-notify相差不多,Lock中Condition的await/signalAll可以达到相同的效果,Lock具体用法这里不深究,具体可以看下篇博客。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.zgq.java;

import com.zgq.Algorithm;
import com.zgq.Utils;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by zgq on 2019/4/6.
*/
public class ProducerConsumer1 implements Algorithm {

public static final Lock LOCK = new ReentrantLock();
public static Condition NOTFULL = LOCK.newCondition();
public static Condition NOTEMPTY = LOCK.newCondition();
public static int count = 0;
public static final int FULL = 10;

@Override
public void execut() {
for (int i = 0; i < 5; i++) {
new Thread(new Producer("producer" + i)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(new Consumer("consumer" + i)).start();
}
}

public static class Producer implements Runnable {
private String name;

public Producer(String name) {
this.name = name;
}

@Override
public void run() {
for (int i = 0; i < 5; i++) {
LOCK.lock();
Utils.printString(name + " lock");
try {
while (count >= FULL) {
Utils.printString(name + " wait");
NOTFULL.await();
}
Thread.sleep(20);
count++;
Utils.printString(name + " product 1 " + " now is " + count);
NOTEMPTY.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
Utils.printString(name + " unlock");
}
}
}
}


public static class Consumer implements Runnable {
public String name;

public Consumer(String name) {
this.name = name;
}

@Override
public void run() {
for (int i = 0; i < 4; i++) {
LOCK.lock();
Utils.printString(name + " lock");
try {
while (count <= 0) {
Utils.printString(name + " wait");
NOTEMPTY.await();
}
Thread.sleep(50);
count--;
Utils.printString(name + " consume 1 " + " now is " + count);
NOTFULL.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
Utils.printString(name + " unlock");
}
}
}
}
}

BlockingQueue实现

BlockingQueue是一个接口,LinkedBlockingDeque是他的一个实现类,我们看下部分实现代码

1
2
3
4
5
6
7
8
9
10
11
12
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}

其实就是lock实现

实现

这里用了BlockingQueue的具体实现类LinkedBlockingDeque。链表实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.zgq.java;

import com.zgq.Algorithm;
import com.zgq.Utils;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by zgq on 2019/4/6.
*/
public class ProducerConsumer2 implements Algorithm {

public static BlockingQueue storage = new LinkedBlockingDeque<>();
public static final int FULL = 10;

@Override
public void execut() {
for (int i = 0; i < 5; i++) {
new Thread(new Producer("producer" + i)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(new Consumer("consumer" + i)).start();
}
}

public static class Producer implements Runnable {
private String name;

public Producer(String name) {
this.name = name;
}

@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(20);
if (storage.size() < FULL) {
Random random = new Random();
int k = random.nextInt();
storage.add(k);
}
Utils.printString(name + " product 1 " + " now is " + storage.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static class Consumer implements Runnable {
public String name;

public Consumer(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 4; i++) {
try {
Thread.sleep(50);
if (storage.size() > 0) {
storage.take();
}
Utils.printString(name + " consume 1 " + " now is " + storage.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

可以看出,使用BlockingQueue来实现生产者-消费者很简洁,这正是利用了BlockingQueue插入和获取数据附加阻塞操作的特性。