生产者-消费者模式是一种非常常见的软件设计模式。编程语言的一些重要特性可以通过该模式很好的体现出来。本文分别用Java语言和Go语言为例对比实现了生产者-消费者模式,从一个侧面揭示了Java和Go的一些区别。
生产者-消费者模式:Java实现
用Java实现生产者-消费者模式的关键在于对synchronized/wait/notify/notifyAll以及循环条件判断的正确使用。先附上完整的示例代码如下:
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
public class ProducerConsumerTest {
public static void main(String[] args) throws Exception {
Queue queue = new LinkedList();
Producer producer = new Producer(queue, 10);
Consumer consumer = new Consumer(queue);
producer.start();
consumer.start();
Thread.sleep(10);
producer.Terminate();
consumer.Terminate();
producer.join();
consumer.join();
}
}
class Producer extends Thread {
private Queue queue = null;
private int capacity = 0;
private volatile boolean active = true;
public Producer (Queue queue, int capacity) {
this.queue = queue;
this.capacity = capacity;
}
public void Terminate() {
this.active = false;
}
public void run() {
while (active) {
synchronized (queue) {
while (queue.size() == capacity && active) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (active) {
int val = new Random().nextInt(100) + 1;
queue.add(val);
queue.notifyAll();
}
}
}
}
}
class Consumer extends Thread {
private Queue queue = null;
private volatile boolean active = true;
public Consumer(Queue queue) {
this.queue = queue;
}
public void Terminate() {
this.active = false;
}
public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty() && active) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// It should finish consuming the existing objects after being terminated.
if (queue.size() > 0) {
int val = queue.remove();
queue.notifyAll();
} else if (!active) {
return;
上面的代码加上空行,总共93行,本身逻辑也很简单,所以没有必要详细解释。这里只对其中Java语言层面的几个关键点说明如下:
1、wait、notify和notifyAll这几个函数是定义在类Object中的,所以可以用在任何对象上面;
2、wait、notify和notifyAll这几个函数必须和synchronized配合使用。更准确的说,这几个函数必须在synchronized代码块中调用,否则会抛出java.lang.IllegalMonitorStateException异常。
3、wait函数必须在synchronized中调用很好理解,因为wait()会先释放锁,然后再进行等待;一旦被唤醒之后,会立马尝试重新获取锁。而对于notify和notifyAll,我个人认为其实没有必要强制要求必须在synchronized中调用,Java之所以有这个强制要求,可能是为了防止开发者误用而已。
4、notify只会唤醒一个线程,而notifyAll会唤醒所有线程。通常情况下,尽量都使用notifyAll,因为在有多个生产者或消费者的情况下使用notify理论上有出现死锁的可能。
5、wait必须与循环判断配合使用,因为有时会出现“假”唤醒。
生产者-消费者模式:Go实现(一)
Go语言实现生产者-消费者模式有两种典型的方式,分别是通过条件变量和channel实现。首先给出通过条件变量的实现方式的代码,该方式与Java实现方式的逻辑几乎完全一样,从语言层面来看,概念也很类似。不过这里有一个生产者和两个消费者。
package main
import (
"sync"
"fmt"
"math/rand"
"time"
)
type Factory struct {
active bool
capacity int
queue []int
}
func (f *Factory)produce(c *sync.Cond) {
for f.active {
c.L.Lock()
for len(f.queue) == f.capacity && f.active {
c.Wait()
}
if f.active {
n := rand.Intn(100)
f.queue = append(f.queue, n)
fmt.Printf("Generating %d\n", n)
c.Broadcast()
}
c.L.Unlock()
}
}
func (f *Factory)consume(id int, c *sync.Cond, wg *sync.WaitGroup) {
for {
c.L.Lock()
for len(f.queue) == 0 && f.active {
c.Wait()
}
if len(f.queue) > 0 {
n := f.queue[0]
f.queue = f.queue[1:]
fmt.Printf("Consumer: %d, Received: %d\n", id, n)
c.Broadcast()
} else if !f.active {
c.L.Unlock()
wg.Done()
break
}
c.L.Unlock()
}
}
func main() {
factory := Factory
c := sync.NewCond(&sync.Mutex{})
var consumerCount int = 2
var wg sync.WaitGroup
go factory.produce(c)
for i:=1; i
wg.Add(1)
go factory.consume(i, c, &wg)
}
time.Sleep(20*time.Millisecond)
factory.active = false
c.Broadcast()
wg.Wait()
}
这段代码的逻辑与Java实现的逻辑几乎完全一样,这里只从Go语言的角度做如下几点说明:
1、该实现用到的Go语言的关键技术就是条件变量sync.Cond。其中有三个方法:Wait、Signal和Broadcast,分别对应Java中Object类中的Wait、notify和notifyAll。
2、Wait方法必须在Lock之后调用,因为Wait方法首先会释放锁,然后再等待;被唤醒之后,会立马重新尝试获取锁。这点与Java中的wait一样。但是Go语言中并没有强制要求Signal和Broadcast也必须在Lock之后调用,这点与Java中的notify和notifyAll则有所不同。
3、Go语言中没有while循环,而是通过for循环来实现的。下面列出了go和Java中对等的实现。
Golang
for len(f.queue) == f.capacity && f.active {
....
}
Java
while (queue.size() == capacity && active) {
....
}
4、Goroutine是一种比线程更轻量级的并发执行单元,Go语言中并没有类似于Java中的Thread.join。但是Go中提供的sync.WaitGroup可以实现Gorountine之间的同步,而且个人感觉易用性更好。sync.WaitGroup包含三个方法:Add、Done和Wait。Done实际上就是调用Add(-1)。Wait会阻塞,直到数量减到0。
生产者-消费者模式:Go实现(二)
用Go语言实现生产者-消费者模式的第二种方式,就是channel,这也是更常用的一种方式。同样,先贴出一个完整的代码示例,然后再解释。
package main
import (
"fmt"
"sync"
)
func produce(c chan int) {
for i := 0; i
}
close(c)
}
func consume(id int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
val, ok :=
if ok {
fmt.Printf("Consumer:%d, val:%d\n", id, val)
} else {
return
}
}
}
func main() {
var c = make(chan int, 10)
var wg sync.WaitGroup
go produce(c)
consumeCount := 2
for i:=1; i
wg.Add(1)
go consume(i, c, &wg)
}
wg.Wait()
}
这段代码逻辑虽然与以上两个例子稍有不同,但大同小异。看到这段代码,第一反应是不是感觉更简洁?这是因为使用channel使得开发者不用考虑生产者与消费者之间的数据同步以及相互通知的问题。这就是channel带来的好处。Channel的设计理念就是使用通信来共享内存(Share by communicating)。《Effective Go》中有一句口号就是:不要通过共享内存来通信,而应该使用通信来共享内存。原话如下:
Do not communicate by sharing memory; instead, share memory by communicating.
但是要注意Channel虽然简化了并发编程中数据同步问题的复杂度,但代价就是性能的下降。 使用Channel涉及到两次拷贝,而且内部实现也有数据同步的问题。概念上来说,Go语言中的channel就类似于Unix中的pipe。所以当有大量的数据在生产者与消费者之间传递的话,那么channel可能就不是太合适。
另外,值得一提的是,上面的consume函数其实可以做如下简化:
func consume(id int, c chan int, wg *sync.WaitGroup) {
defer wg.Done()
for val := range c {
fmt.Printf("Consumer:%d: val:%d\n", id, val)
}
}
还要注意channel关闭后,可以继续从channel中读取数据,但不能再往里写数据,否则会发生panic。而且同一个channel不能关闭多次,所以如果有多个生产者,那么就不能在生产者内部来关闭channel了。
后注:如果您对Go毫无经验,建议先熟悉一下Go的基本语法再阅读本文。可以阅读我以前写的一篇文章“Go语言语法及特性总结”。
--END--
领取专属 10元无门槛券
私享最新 技术干货