前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >轻装上阵Flink--在IDEA上开发基于Flink的实时数据流程序

轻装上阵Flink--在IDEA上开发基于Flink的实时数据流程序

作者头像
壮壮熊
修改2023-01-17 14:53:36
4720
修改2023-01-17 14:53:36
举报
文章被收录于专栏:程序猿牧场程序猿牧场

前言

代码语言:txt
复制
  本文介绍如何在IDEA上快速开发基于Flink框架的DataStream程序。先直接上手!

环境清单

代码语言:txt
复制
  案例是在win7运行。安装VirtualBox,在VirtualBox上安装Centos操作系统。所有资源都在百度云上,有需要请直接下载。安装教程基本都是傻瓜式,文章不做讲述,有需要直接网上搜索。

资源

版本

VirtualBox

5.2.16

Centos

6.5

Maven

3.6.3

JDK

8u241

IDEA

2019.3.2

Flink

1.10.0

链接:https://pan.baidu.com/s/12rXlY_z_Fck8-NRXdZ5row

提取码:qt2p

轻装上阵

1、IP设置

代码语言:txt
复制
  Centos的设置静态IP为192.168.2.20,关闭防火墙
代码语言:javascript
复制
vi /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
TYPE=Ethernet
ONBOOT=yes #开机启动eth0网卡
NM_CONTROLLED=yes
BOOTPROTO=static
IPADDR=192.168.2.20
GATEWAY=192.168.2.1
NETMASK=255.255.255.0
代码语言:txt
复制
  如果此时ping www.baidu.com等不通,需要我们添加dns服务器。
代码语言:javascript
复制
[root@localhost network-scripts]# vi /etc/resolv.conf
nameserver 192.168.2.1
代码语言:txt
复制
  重新启动网络服务
代码语言:javascript
复制
[root@localhost network-scripts]# service network restart                   
正在关闭接口 eth0:[确定]
关闭环回接口:[确定]
弹出环回接口:[确定]
弹出界面 eth0:Determining if ip address 192.168.2.20 is already in use for device eth0...
                                                           [确定]
代码语言:txt
复制
  关闭防火墙
代码语言:javascript
复制
[root@localhost network-scripts]# service iptables stop

2、创建项目

代码语言:txt
复制
  在win7的命令行下,用mvn命令创建开发模板
代码语言:javascript
复制
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0
代码语言:txt
复制
  这种方式允许你为新项目命名。它将以交互式的方式询问你项目的 groupId、artifactId 和 package 名称。用tree命令看下,如下结构。项目是一个 Maven project,它包含了两个类:StreamingJob 和 BatchJob 分别是 DataStream and DataSet 程序的基础骨架程序。main 方法是程序的入口,既可用于IDE测试/执行,也可用于部署。
代码语言:javascript
复制
│  pom.xml
└─src
    └─main
        ├─java
        │  └─com
        │      └─ryan
        │              BatchJob.java
        │              StreamingJob.java
        └─resources
                log4j.properties

3、写一个自己的DataStream的程序

功能介绍:WindowWordCount.java,5s为一个时间窗口,摄取数据源的数据,计算单词出现的次数。

实时数据流计算简易架构图:

代码语言:txt
复制
  为了演示方便,这里我们只演示消息队列和Flink Job两个模块,利用nc工具来替代消息队列作为Flink Job摄取的数据源。

代码:

代码语言:javascript
复制
package com.ryan;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("192.168.2.20", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);
        dataStream.print();
        env.execute("Window WordCount");
    }
    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}
代码语言:txt
复制
  在centos机器上,命令行启动nc
代码语言:javascript
复制
nc -lk 9999
代码语言:txt
复制
  IDEA上直接run main方法,然后在centos机器上,不断输入单词。
代码语言:javascript
复制
[ryan@localhost ~]$ nc -lk 9999
java
java
shen
深圳 深圳
代码语言:txt
复制
  IDEA控制台上输出如下:

注意:第一次在IDEA上运行这个程序,可能会报如下异常

代码语言:javascript
复制
java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream

原因是IDEA没有导入flink 的lib下的jar包。导入即可。

4、打包发布到centos平台上的Flink集群

代码语言:txt
复制
  修改pom.xml文件的mainclass的值为com.ryan.WindowWordCount
代码语言:javascript
复制
<mainClass>com.ryan.WindowWordCount</mainClass>
代码语言:txt
复制
  执行mvn clean install,得到flink-demo-1.0-SNAPSHOT.jar,并上传到centos机器上。
代码语言:javascript
复制
mvn clean install
代码语言:txt
复制
  打开两个centos的控制台,一个用于打开nc,一个用于运行我们打包好的Flink jar包。
代码语言:javascript
复制
[ryan@localhost ~]$ nc -lk 9999
java
shen
深圳 深圳 深圳
代码语言:javascript
复制
[root@localhost flink-1.10.0]# bin/flink run flink-demo/flink-demo-1.0-SNAPSHOT.jar 
Job has been submitted with JobID 9931a9dfc2eddeb2d0b5ed15578bd488
代码语言:txt
复制
  回到win7上,用浏览器打开http://192.168.2.20:8081/,在Running Jobs上,可以看到一条记录。
代码语言:txt
复制
  在Task Managers上,Stdout模块看到程序输出的结果。
代码语言:txt
复制
  所有代码都上传到github上,有需要的朋友可以下载

https://github.com/qinxiongzhou/flink-demo

代码语言:txt
复制
  至此,我们完成了开发编译调试到最终上线生产运行。喜欢请关注,谢谢!
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿牧场 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档