什么是现实的用例,其中java.util.Exchanger将是最好的选择同步器?
我见过来自GitHub和教程网站的片段,但它们似乎总是人为的,最好用TransferQueue来解决。
发布于 2021-10-03 12:01:40
Exchanger和TransferQueue是非常不同的,如果使用者非常慢,TransferQueue基本上可以填充所有内存,而Exchanger使用常量内存,在配对线程准备就绪时产生同步。
显然,使用其中一种或另一种会对同步和工作流的效率产生严重影响(但也会影响资源使用的确定性)。
注意:生产者和使用者请求同步的原因可能是不同的,不仅仅是因为他们有一个完整的缓冲区,例如,其中一个已经在等待另一个。而且,对于Exchange生产者/消费者是混淆的(使用者可以向生产者交付结果),而使用TransferQueue,您应该创建额外的结构。
例如,将并发过程看作管道基础结构,节点上下移动(节点包含水泵)。
另外,抽水效应会移动到所有基础设施(如wave),认为您的并发进程使用一个线程结果在另一个线程上使用,等等。
(代码中的注释)
package com.computermind.sandbox.concurrent;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;
public class ExchangerTransferQueue {
// helper for wait
@SneakyThrows
public static void _wait() {
Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 5_000));
}
// the pumping capacity is the work a node must to do, this work
// is moving across threads then, the `PumpingCapacity` "P1" will
// be used by the producer but later (when water move) "P1" will be
// used by consumer and so (the `exchange` sync move PumpingCapacity
// instances)
@AllArgsConstructor
static
class PumpingCapacity {
String name;
int from;
int to;
// do "up" work (if any)
boolean up(String node) {
if(from < 1) return false;
System.out.printf("~ node %s pump up %s%n", node, name);
from -= 1; to += 1;
return true;
}
// do "down" work (if any)
boolean down(String node) {
if(to < 1) return false;
System.out.printf("~ node %s pump down %s%n", node, name);
from += 1; to -= 1;
return true;
}
}
// the producer have a initial PumpingCapacity and
// create the exchange to the next node
static class WaterPumpProducer implements Runnable {
PumpingCapacity p;
final Exchanger<PumpingCapacity> b = new Exchanger<>();
WaterPumpProducer(PumpingCapacity p) {
this.p = p;
}
@SneakyThrows
@Override
public void run() {
// for ever
while(true) {
// do work
while (p.up("Producer")) _wait();
// and exchange
System.out.println("Producer need change");
p = b.exchange(p);
}
}
}
// an interemediate node have two PumpingCapacity one working
// with the predecessor and other with the successor
static class WaterPumpNode implements Runnable {
PumpingCapacity p, q;
final Exchanger<PumpingCapacity> a;
final Exchanger<PumpingCapacity> b = new Exchanger<>();
WaterPumpNode(PumpingCapacity p, PumpingCapacity q, Exchanger<PumpingCapacity> a) {
this.p = p;
this.q = q;
this.a = a;
}
@SneakyThrows
@Override
public void run() {
while(true) {
while (p.down("Node")) _wait();
while (q.up("Node")) _wait();
System.out.println("Node need change");
p = a.exchange(p);
q = b.exchange(q);
}
}
}
static class WaterPumpConsumer implements Runnable {
PumpingCapacity p;
final Exchanger<PumpingCapacity> a;
WaterPumpConsumer(PumpingCapacity initialCapacity, Exchanger<PumpingCapacity> a) {
p = initialCapacity;
this.a = a;
}
@SneakyThrows
@Override
public void run() {
while(true) {
while (p.down("Consumer")) _wait();
System.out.println("Consumer need change");
p = a.exchange(p);
}
}
}
@SneakyThrows
public static void main(String... args) {
WaterPumpProducer producer = new WaterPumpProducer(new PumpingCapacity("P1", 5, 0));
WaterPumpNode node = new WaterPumpNode(new PumpingCapacity("P2", 0, 3), new PumpingCapacity("P3", 3, 0), producer.b);
WaterPumpConsumer consumer = new WaterPumpConsumer(new PumpingCapacity("P4", 0, 2), node.b);
// consumer run first, the consumer do job!
new Thread(consumer).start();
// wait to see consumer wait
Thread.sleep(15_000);
new Thread(node).start();
// wait to see node wait
Thread.sleep(15_000);
new Thread(producer).start();
// see how PumpingCapacities up and down across all pipe infrastructure
}
}带输出(带注释)
~ node Consumer pump down P4 <-- only consumer is working
~ node Consumer pump down P4
Consumer need change <-- and stop since need change
~ node Node pump down P2 <-- when node start do job
~ node Node pump down P2
~ node Node pump down P2
~ node Node pump up P3
~ node Node pump up P3
~ node Node pump up P3
Node need change <-- and need change and wait producer
~ node Producer pump up P1 <-- when producer start do job
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
Producer need change <-- here all nodes work and
~ node Producer pump up P2 PumpingCapacities go up and down
~ node Consumer pump down P3 moving water
~ node Node pump down P1
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
Consumer need change
Producer need change
~ node Node pump down P1
~ node Node pump up P4
~ node Node pump up P4
Node need change
~ node Node pump down P2
~ node Consumer pump down P4
~ node Producer pump up P1
...内存使用是常量(如果要使用TransferQueue,则不会)。
然后
,什么是现实的用例,其中java.util.Exchanger将是同步器的最佳选择?
两个进程相互同步以执行一个任务,其中一个进程的工作对另一个进程产生影响,反之亦然。生产者/消费者模式是一个很好的例子,其中生产者必须等待消费者(即,因此不积累过多的工作)。
而不是将它移动到另一个线程以进行进一步的操作?
因为两个线程都共享信息,并且希望并发执行操作,所以不能将其从一个线程传递到另一个线程。
假设您有以下递归函数
G{i+1} = g( H{i}, G{i} )
H{i+1} = h( H{i}, G{i} )您可以为每一步启动,两个线程并行地运行g和h,也可以使用Exchanger只启动两个线程一次。
当然,您可以使用许多其他结构,但是您将看到,在这些结构中,您必须考虑死锁的可能性,而Exchanger使这种交换变得简单和安全。
例如,假设我们想要执行某种生物模拟(即https://en.wikipedia.org/wiki/Nicholson%E2%80%93Bailey_model)
static void NicholsonBailey(double H, double P, double k, double a, double c, AtomicBoolean stop) {
// H and P exchange their values (H get p and P get H)
final Exchanger<Double> e = new Exchanger<>();
// H function
new Thread(() -> { try {
double h = H, p = P;
while(!stop.get()) {
h = k * h * Math.exp(-a * p); // expensive
p = e.exchange(h);
}
e.exchange(0., 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
// P function
new Thread(() -> { try {
double h = H, p = P;
while(!stop.get()) {
System.out.printf("(H, P) := (%e, %e)%n", h, p);
p = c * h * (1 - Math.exp(-a * p)); // expensive
h = e.exchange(p);
}
e.exchange(0., 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
}
@SneakyThrows
public static void main(String... args) {
AtomicBoolean stop = new AtomicBoolean(false);
double k = 2, a = 0.02, c = 1;
NicholsonBailey(Math.log(k) / a + 0.3, (k * Math.log(k)) / ((k - 1) * a * c) + 0.3, k, a, c, stop);
// run simulation until stop
Thread.sleep(100);
stop.set(true);
}只要使用Exchanger,我们就可以轻松地同步这两种计算。
https://stackoverflow.com/questions/69423990
复制相似问题