首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Windows flink开发环境构建-新手

1. JDK 说明

安装(不会请百度)—版本在1.8及之上(如果有多版本的话,在path中将JAVA_HOME放在最前面)

2. Scala安装

下载地址:https://www.scala-lang.org/download/ 在页面的最底部

安装过程几乎没什么注意事项,网上说的安装路径不能有空格(如: Program Files),否则安装后使用会报错 ;

仅参考,此处安装未使用含有空格的文件名称

环境变量的配置

新增环境变量: SCALA_HOME 值:E:\soft\dev\install\scala

Path地址配置:

3. Flink的下载(JDK1.8及之上)

下载地址: https://flink.apache.org/downloads.html

此处使用版本的是1.9.0

下载后直接解压可使用;

进入解压目录的bin目录,运行start-cluster.bat,启动成功后本地访问http://localhost:8081

flink-web(ui)界面

idea开发配置

创建MAVEN项目(另行百度)

Pom.xml文件:

pom文件的依赖

只需要依赖flink-java和flink-streaming-java_$;其他的暂时不需要;

Scala插件安装

a>在线安装:

File->Settings->Plugins->Install JetBrains plugins 然后输入Scala;然后安装(时间可能持久较长,视网络情况而定)—;

b>离线安装:

下载插件:https://plugins.jetbrains.com/plugin/1347-scala 选择相应的版本(idea 、JDK) –>可用在线安装的方式查看版本;下载后将其解压,将解压的文件夹Scala放入idea安装目录的plugins目录下

安装完成后(在线安装):

idea的scala插件安装

项目依赖配置

选择项目结构:(如下图)

左侧选中项目,然后右上角如图

文件结构

添加sdk

(如果没有Ivy则表示Scala插件安装不成功)

然后直接点击OK即可;最后会多一个scala-sdk

添加scala-sdk成功

不要以为到此就可用运行了,我就在此处掉坑里了,获取运行环境的适合总是出错:

获取运行环境出差

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

避免上面的代码出错,还相应运行环境的配置

Flink在Idea的运行环境配置

选择项目结构:

选择运行环境lib

lib文件夹

点击OK,然后依赖中会多一个lib依赖

flink加载lib多的lib文件

到此,idea可用运行flink了;

运行验证

Java代码例子(网上找的),网上是监听端口读取;偷懒一下直接读取文本

flink执行代码

内部类

启动执行结果

代码:

public static void main(String[] args) throws Exception {

//获取运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//从文件中读取

DataStreamSource text = env.readTextFile(“D:\source\idea\test\project\flink\flink-parent\flink-demo1\src\main\resources\demo.txt”);

//计算数据

DataStream windowCount = text.flatMap(new FlatMapFunction() {

@Override

public void flatMap(String value, Collector out) throws Exception {

String[] splits = value.split("\s");

for (String word : splits) {

out.collect(new WordWithCount(word, 1L));

}

}

})//打平操作,把每行的单词转为类型的数据

.keyBy(“word”)//针对相同的word数据进行分组

.timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小

.sum(“count”);

//把数据打印到控制台

windowCount.print().setParallelism(1);//使用一个并行度

//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

env.execute("streaming word count");

}

/**

* 主要为了存储单词以及单词出现的次数

*/

public static class WordWithCount {

public String word;

public long count;

public WordWithCount() {

}

public WordWithCount(String word, long count) {

this.word = word;

this.count = count;

}

@Override

public String toString() {

return "WordWithCount{" +

"word='" + word + '\'' +

", count=" + count +

'}';

}

}

//把数据打印到控制台

windowCount.print().setParallelism(1);//使用一个并行度

//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

env.execute("streaming word count");

}

/**

* 主要为了存储单词以及单词出现的次数

*/

public static class WordWithCount {

public String word;

public long count;

public WordWithCount() {

}

public WordWithCount(String word, long count) {

this.word = word;

this.count = count;

}

@Override

public String toString() {

return "WordWithCount{" +

"word='" + word + '\'' +

", count=" + count +

'}';

}

}

Demo.txt

demo.txt文件-内容随意

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190907A0HAD900?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券