我正在寻找关于Apache的hello-world体验的最简单的例子。
假设我刚刚在一个干净的盒子上安装了flink,我需要做的最起码的事情是什么?我意识到这是相当模糊的,以下是一些例子。
来自终端的三个python示例:
python -c "print('hello world')"
python hello_world.py
python python -c "print(1+1)"
当然,流应用程序要复杂一些,但是这里有一些类似的东西,我之前在星火流中做过这样的事情:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
正如您所看到的,这些示例具有一些很好的特性:
它们是最小的tools/resources
所以我的问题是:
Flink最简单的hello world示例是什么?
到目前为止,我发现的是需要编译的50行代码的示例。
如果由于第3点而无法避免,那么满足第1点和第2点并使用(仅) jars (仅)的jars在默认情况下提供,或者可以很容易地从有信誉的来源获得,也可以。
发布于 2019-12-20 10:05:18
在大多数大数据和相关框架中,我们给出了单词计数程序作为Hello示例。是Flink中单词计数的代码:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromCollection(Arrays.asList("This is line one. This is my line number 2. Third line is here".split(". ")));
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.groupBy(0)
.sum(1);
wordCounts.print();
从文件读取
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//The path of the file, as a URI
//(e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
DataSet<String> text = env.readTextFile("/path/to/file");
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
})
.groupBy(0)
.sum(1);
wordCounts.print();
不要使用signature处理在wordCounts.print()上引发的异常,而是将抛出添加到方法签名中。
将以下依赖项添加到pom.xml。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
</dependency>
阅读有关flatMap、groupBy、sum和其他flink操作的文章:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/
Flink流文档和示例:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html
发布于 2019-12-16 20:24:52
好吧,这个怎么样:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3, 4, 5)
.map(i -> 2 * i)
.print();
env.execute();
}
发布于 2019-12-22 12:45:31
使用标准资源的最小步骤
我不确定这是否会是最终的答案,但已经发现,flink通常附带的例子,允许一些简单的互动,以最小的努力。
下面是一个包含flink 1.9.1标准资源的hello示例,它基于默认的单词计数:
中打开了三个终端。
终端1中的
的连接
nc -l 9000
Hello World
终端2中的
./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
终端3中的
的结果
tail -f log/flink-*-taskexecutor-*.out
你现在应该看到:
Hello : 1
World : 1
就是这样,在这里,您可以在终端1中键入更多内容,当您再次检查日志时,您将看到更新的more计数。
如果您以前已经这样做过一次,并且希望重新开始,您可以使用rm log/flink-*-taskexecutor-*.out
清除日志(假设是沙箱环境)。
https://stackoverflow.com/questions/59347209
复制相似问题