我读过关于Apache风暴的文章,并做了一些基本的教程。我想到了下面的拓扑结构,我想用storm来实现,但不确定如何处理数据分布。业务需求是对客户组合进行实时评估。简化形式涉及: 1)接受市场价格(货币、商品等),2)对每一价格滴滴答答,计算每一头寸的当期利润,并将其转换为客户账户货币;3)分析每个客户的总市盈率和所有头寸的数量,并在必要时产生信号4)在客户一级的计算必须是顺序的,原子的/序列化的。也就是说,所有的仓位都必须用它进入系统的顺序中的每一个滴答来评估,并且必须根据相同的价格计算总计,即使客户有1000多个仓位。5)分析了按符号/客户类型/国家/etc汇总的系统中所有职位的数量/趋势。把它们放在仪表板上。
所有订单都被执行并存储在rdbms中。我的主要问题是如何在每个节点处理自己的部分的不同节点上跨风暴螺栓分配1000多个位置。使用Modulo可以很好地划分客户,但是我如何才能为每一个螺栓实例提供id,使它们只处理自己的部分客户呢?暴风雨里有什么东西可以做吗?另一个问题是如何有效地完成上述聚合?
发布于 2015-05-14 01:19:46
为此您可以使用fieldsGrouping
。您可以声明一个元组分组的字段(在您的例子中是id
)。
我假设您的输入流是带有id和body字段的JSON对象,类似
{"id":"1234","body":"some body"}
另外,假设您的拓扑有一个喷口,两个螺栓,即BoltA和BoltB。
在BoltB中,重写declareOutputFields方法并填写详细信息。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","log"));
}
您可以像下面这样声明拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("boltA", new BoltA(), 1)
.shuffleGrouping("spout");
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));
在本例中,具有来自boltA
的相同id的元组将被传递给boltB
的同一个实例。
https://stackoverflow.com/questions/30130926
复制相似问题