首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在Apache中对两个流进行左连接的正确方法

在Apache中对两个流进行左连接的正确方法
EN

Stack Overflow用户
提问于 2020-04-06 06:35:09
回答 1查看 65关注 0票数 0

我正在开发一个使用Apache的欺诈检测系统,但我是个初学者,并且在这个问题上陷入困境:

我想从两个流进行左连接,其中一个流包含当前事务,另一个流已经验证了与银行的交易,在那里我可以找到一些错误,如stolen_card等。因此,我需要加入它们,以了解一张卡是否在过去被拒绝。

代码语言:javascript
运行
复制
   DataStream<Card> currentDataStream =  getCardsStream(env, Parameters.CURRENT_SOCKET)
            .keyBy((card) -> card.getCardID);

    DataStream<Card> historicDataStream =  getCardsStream(env, Parameters.HISTORIC_SOCKET)
            .keyBy((card) -> card.getCardID()); 

我现在正在做的是一个RichCoFlatMapFunction,它在每次historicDataStream到达时更新一个名为historicList的列表状态,并返回一个带有当前卡的元组和一个包含该Id的所有已加入事件的列表:

代码语言:javascript
运行
复制
public class LeftJoin extends RichCoFlatMapFunction<Card, Card, Tuple2<Card, List<Card>> > {

    private ValueState<Card> currentValueState;
    private ListState<Card> historicListState;

    @Override
    public void open(Configuration parameters) throws Exception {
        currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
        historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
    }

    @Override
    public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        Iterable<Card> historicCardList =  historicListState.get();

        //If there is a coincidence
        if (Iterables.size(historicCardList) > 0) {
            out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
        } else {
            currentValueState.update(currentCard);
            //Returning null if there are no cards for the Id
            out.collect(new Tuple2<>(currentCard, null));
        }
    }

    @Override
    public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        historicListState.add(historicCard); //Updates the historicListState
    }
}

问题是,当我想要检查包含的卡片时,List<Card>会给我带来很多麻烦,因为它总是会再次获得所有的卡,我需要一种方法来标记我已经处理过的卡片,比如:

代码语言:javascript
运行
复制
  //I don't like this list because it always gets me all the join coincidences
    for (Card card : historicList) {

        //Comparar cada regla del Broadcast state con el error que contiene el elemento card
        if (rule.getBankDecision().equals(card.getErrors())) {


            //Evaluate some rules
            for (Long stateEventTime : windowState.keys()) {
                if (isStateValueInWindow(stateEventTime, windowStartForEvent, System.currentTimeMillis())) {
                    aggregateValuesInState(stateEventTime, aggregator);
                }

            }
    }

是否有更好的方法使联名卡成为一个流?

EN

回答 1

Stack Overflow用户

发布于 2020-04-11 20:58:58

我希望我对你的理解是正确的,如果不是,请把我记下来。

  1. private ValueState<Card> currentValueState是冗余的(在本例中,您只更新它,从未读取它的值)
  2. (如果我正确理解您的话),问题是您在整个historicListState上发出了您的RuleSystem,而且您已经检查了其中的一些。为什么不从已经超过规则的historicListState卡中删除?
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61054190

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档