首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Apache Flink CEP,模式不匹配

Apache Flink CEP,模式不匹配
EN

Stack Overflow用户
提问于 2022-08-09 19:21:09
回答 1查看 90关注 0票数 0

我是新的Flink CEP,并试图测试基本的东西-在下面的代码,我的期望是所有的输入应该匹配模式,并应打印为匹配的结果。但不知何故,没有什么比('matechedStream.print()')更能说明原因吗?

任何建议/帮助都将不胜感激。

代码语言:javascript
复制
package com.o9.flink;
import com.o9.flink.asyncio.DemandSupply;
import org.apache.flink.cep.CEP;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class DemandSupplyPattern {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> keyedInputStream = env.fromElements("AAA","BBB","CCC");

        Pattern<String, ?> dspattern = Pattern.<String>begin("start");

        PatternStream<String> patternStream = CEP.pattern(keyedInputStream, dspattern);
        DataStream<String> matechedStream =  patternStream.process(new PatternProcessFunction<String, String>() {
            @Override
            public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
                collector.collect(map.get("start").toString());
            }
        });

        matechedStream.print();

        env.execute("DemandSupply-CEP");
    }
}

Maven依赖关系:

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>${flink.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>${flink.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <dependency>
        <groupId>org.asynchttpclient</groupId>
        <artifactId>async-http-client</artifactId>
        <version>2.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep</artifactId>
        <version>${flink.version}</version>
        <!--<scope>provided</scope>-->
    </dependency>


</dependencies>

谢谢马亨德拉

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-19 09:38:36

默认情况下,FlinkCEP使用EventTime,它依赖于WaterMark来帮助这些事件继续进行。如果您将其切换到ProcessingTime,可能会有所帮助。

代码语言:javascript
复制
PatternStream<String> patternStream = CEP.pattern(keyedInputStream, dspattern).inProcessingTime();

如果您不知道差异和时间语义,可以查看时间概念的官方文档。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73297054

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档