需要环境
软件下载
截至2019-07-24,Flink的最新稳定版为1.8.1,以下是下载地址:
tar -zxvf flink-1.8.1-bin-scala_2.11.tgz
[hadoop@beh07 flink-1.8.1]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host beh07.
Starting taskexecutor daemon on host beh07.
[hadoop@beh07 flink-1.8.1]$ jps
10497 TaskManagerRunner
10053 StandaloneSessionClusterEntrypoint
10522 Jps
[hadoop@beh07 flink-1.8.1]$ bin/stop-cluster.sh
因为服务器上的Flink安装目录下已经有了Flink的基础jar包,所以打包时应该把Flink的基础包的范围设置为provided,而在idea中直接运行代码的时候,应该设置这些基础包的范围为compile。
<properties>
<flink.version>1.8.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
上传的应用程序的jar包应该选择带有全部依赖的jar包。
nc -lk 9999
[hadoop@beh07 flink-1.8.1]$ bin/flink run \
> --class com.bairong.flink.java.SocketWindowWordCountJava \ # 指定主类
> /tmp/app/learnning-flink-1.0-jar-with-dependencies.jar \ # jar包路径
> --host beh07 --port 9999 # 程序的参数
查看计算结果有两种方式:
停止任务有两种方式:
首先在WEB-UI确定任务的job-id
使用flink cancel [job-id]
命令停止任务