首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >交换机的实际用例是什么?

交换机的实际用例是什么?
EN

Stack Overflow用户
提问于 2021-10-03 10:32:12
回答 1查看 97关注 0票数 0

什么是现实的用例,其中java.util.Exchanger将是最好的选择同步器?

我见过来自GitHub和教程网站的片段,但它们似乎总是人为的,最好用TransferQueue来解决。

EN

Stack Overflow用户

回答已采纳

发布于 2021-10-03 12:01:40

ExchangerTransferQueue是非常不同的,如果使用者非常慢,TransferQueue基本上可以填充所有内存,而Exchanger使用常量内存,在配对线程准备就绪时产生同步。

显然,使用其中一种或另一种会对同步和工作流的效率产生严重影响(但也会影响资源使用的确定性)。

注意:生产者和使用者请求同步的原因可能是不同的,不仅仅是因为他们有一个完整的缓冲区,例如,其中一个已经在等待另一个。而且,对于Exchange生产者/消费者是混淆的(使用者可以向生产者交付结果),而使用TransferQueue,您应该创建额外的结构。

例如,将并发过程看作管道基础结构,节点上下移动(节点包含水泵)。

另外,抽水效应会移动到所有基础设施(如wave),认为您的并发进程使用一个线程结果在另一个线程上使用,等等。

(代码中的注释)

代码语言:javascript
运行
复制
package com.computermind.sandbox.concurrent;

import lombok.AllArgsConstructor;
import lombok.SneakyThrows;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;

public class ExchangerTransferQueue {
    
    // helper for wait
    @SneakyThrows
    public static void _wait() {
        Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 5_000));
    }

    // the pumping capacity is the work a node must to do, this work
    // is moving across threads then, the `PumpingCapacity` "P1" will
    // be used by the producer but later (when water move) "P1" will be
    // used by consumer and so (the `exchange` sync move PumpingCapacity
    // instances)
    @AllArgsConstructor
    static
    class PumpingCapacity {
        String name;
        int from;
        int to;

        // do "up" work (if any)
        boolean up(String node) {
            if(from < 1) return false;
            System.out.printf("~ node %s pump up %s%n", node, name);
            from -= 1; to += 1;
            return true;
        }
        // do "down" work (if any)
        boolean down(String node) {
            if(to < 1) return false;
            System.out.printf("~ node %s pump down %s%n", node, name);
            from += 1; to -= 1;
            return true;
        }
    }

    // the producer have a initial PumpingCapacity and
    // create the exchange to the next node
    static class WaterPumpProducer implements Runnable {
        PumpingCapacity p;
        final Exchanger<PumpingCapacity> b = new Exchanger<>();

        WaterPumpProducer(PumpingCapacity p) {
            this.p = p;
        }

        @SneakyThrows
        @Override
        public void run() {
            // for ever
            while(true) {
                // do work
                while (p.up("Producer")) _wait();
                // and exchange
                System.out.println("Producer need change");
                p = b.exchange(p);
            }
        }
    }

    // an interemediate node have two PumpingCapacity one working
    // with the predecessor and other with the successor
    static class WaterPumpNode implements Runnable {
        PumpingCapacity p, q;
        final Exchanger<PumpingCapacity> a;
        final Exchanger<PumpingCapacity> b = new Exchanger<>();

        WaterPumpNode(PumpingCapacity p, PumpingCapacity q, Exchanger<PumpingCapacity> a) {
            this.p = p;
            this.q = q;
            this.a = a;
        }

        @SneakyThrows
        @Override
        public void run() {
            while(true) {
                while (p.down("Node")) _wait();
                while (q.up("Node")) _wait();
                System.out.println("Node need change");
                p = a.exchange(p);
                q = b.exchange(q);
            }
        }
    }

    static class WaterPumpConsumer implements Runnable {
        PumpingCapacity p;
        final Exchanger<PumpingCapacity> a;

        WaterPumpConsumer(PumpingCapacity initialCapacity, Exchanger<PumpingCapacity> a) {
            p = initialCapacity;
            this.a = a;
        }

        @SneakyThrows
        @Override
        public void run() {
            while(true) {
                while (p.down("Consumer")) _wait();
                System.out.println("Consumer need change");
                p = a.exchange(p);
            }
        }
    }

    @SneakyThrows
    public static void main(String... args) {

        WaterPumpProducer producer = new WaterPumpProducer(new PumpingCapacity("P1", 5, 0));
        WaterPumpNode node = new WaterPumpNode(new PumpingCapacity("P2", 0, 3), new PumpingCapacity("P3", 3, 0), producer.b);
        WaterPumpConsumer consumer = new WaterPumpConsumer(new PumpingCapacity("P4", 0, 2), node.b);

        // consumer run first, the consumer do job!
        new Thread(consumer).start();

        // wait to see consumer wait
        Thread.sleep(15_000);

        new Thread(node).start();

        // wait to see node wait
        Thread.sleep(15_000);

        new Thread(producer).start();

        // see how PumpingCapacities up and down across all pipe infrastructure
    }

}

带输出(带注释)

代码语言:javascript
运行
复制
~ node Consumer pump down P4  <-- only consumer is working
~ node Consumer pump down P4
Consumer need change          <-- and stop since need change
~ node Node pump down P2      <-- when node start do job
~ node Node pump down P2
~ node Node pump down P2
~ node Node pump up P3
~ node Node pump up P3
~ node Node pump up P3
Node need change              <-- and need change and wait producer
~ node Producer pump up P1    <-- when producer start do job
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
Producer need change          <-- here all nodes work and
~ node Producer pump up P2        PumpingCapacities go up and down
~ node Consumer pump down P3      moving water
~ node Node pump down P1
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
Consumer need change
Producer need change
~ node Node pump down P1
~ node Node pump up P4
~ node Node pump up P4
Node need change
~ node Node pump down P2
~ node Consumer pump down P4
~ node Producer pump up P1
...

内存使用是常量(如果要使用TransferQueue,则不会)。

然后

,什么是现实的用例,其中java.util.Exchanger将是同步器的最佳选择?

两个进程相互同步以执行一个任务,其中一个进程的工作对另一个进程产生影响,反之亦然。生产者/消费者模式是一个很好的例子,其中生产者必须等待消费者(即,因此不积累过多的工作)。

而不是将它移动到另一个线程以进行进一步的操作?

因为两个线程都共享信息,并且希望并发执行操作,所以不能将其从一个线程传递到另一个线程。

假设您有以下递归函数

代码语言:javascript
运行
复制
G{i+1} = g( H{i}, G{i} )
H{i+1} = h( H{i}, G{i} )

您可以为每一步启动,两个线程并行地运行gh,也可以使用Exchanger只启动两个线程一次。

当然,您可以使用许多其他结构,但是您将看到,在这些结构中,您必须考虑死锁的可能性,而Exchanger使这种交换变得简单和安全。

例如,假设我们想要执行某种生物模拟(即https://en.wikipedia.org/wiki/Nicholson%E2%80%93Bailey_model)

代码语言:javascript
运行
复制
static void NicholsonBailey(double H, double P, double k, double a, double c, AtomicBoolean stop) {
    // H and P exchange their values (H get p and P get H)
    final Exchanger<Double> e = new Exchanger<>();

    // H function
    new Thread(() -> { try {
        double h = H, p = P;
        while(!stop.get()) {
            h = k * h * Math.exp(-a * p); // expensive
            p = e.exchange(h);
        }
        e.exchange(0., 1, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();

    // P function
    new Thread(() -> { try {
        double h = H, p = P;
        while(!stop.get()) {
            System.out.printf("(H, P) := (%e, %e)%n", h, p);
            p = c * h * (1 - Math.exp(-a * p)); // expensive
            h = e.exchange(p);
        }
        e.exchange(0., 1, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
}

@SneakyThrows
public static void main(String... args) {
    AtomicBoolean stop = new AtomicBoolean(false);
    double k = 2, a = 0.02, c = 1;
    NicholsonBailey(Math.log(k) / a + 0.3, (k * Math.log(k)) / ((k - 1) * a * c) + 0.3, k, a, c, stop);

    // run simulation until stop
    Thread.sleep(100);
    stop.set(true);
}

只要使用Exchanger,我们就可以轻松地同步这两种计算。

票数 2
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69423990

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档