前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink 简单入门

flink 简单入门

作者头像
zeekling
发布2022-06-17 17:12:29
2390
发布2022-06-17 17:12:29
举报

安装

代码语言:javascript
复制
./start-cluster.sh
5cbd8c62e3fd2381f92695825eaf0c98.png
5cbd8c62e3fd2381f92695825eaf0c98.png

demo

  • 新建maven 项目
  • pom 文件配置如下:
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.thinker</groupId>
  <artifactId>flink-test</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>flink-test</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>statefun-sdk</artifactId>
          <version>2.0.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>statefun-flink-harness</artifactId>
          <version>2.0.0</version>
      </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>

        <!-- 该插件用于将Scala代码编译成class文件 -->
        <plugin>
          <groupId>net.alchim31.maven</groupId>
          <artifactId>scala-maven-plugin</artifactId>
          <version>3.4.6</version>
          <executions>
            <execution>
              <!-- 声明绑定到maven的compile阶段 -->
              <goals>
                <goal>compile</goal>
                <goal>testCompile</goal>
              </goals>
            </execution>
          </executions>
        </plugin>

        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>3.0.0</version>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>

      </plugins>
    </pluginManagement>
  </build>
</project>

创建下面文件:

代码语言:javascript
复制
package com.thinker;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Hello world!
 */
public class App {

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] tokens = s.toLowerCase().split("\\W+");
            for (String token: tokens){
                if (token.length() > 0){
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        System.out.println("H   ello World!");
        if (args.length != 2) {
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> streamSource = env.socketTextStream(hostname, port);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = streamSource.flatMap(new LineSplitter())
                .keyBy(0).sum(1);
        sum.print();
        env.execute("Java WordCount from SocketTextStream Example");
    }
}

打包:

代码语言:javascript
复制
mvn install -Dmaven.test.skip=true

监听9000端口:

代码语言:javascript
复制
ncat -l 9000

在flink的安装目录下面执行:

代码语言:javascript
复制
./flink run -c com.thinker.App /home/zeek/project/flink-test/target/flink-test-1.0-SNAPSHOT.jar 127.0.0.1 9000

demo 测试

flink界面上可以显示出当前的任务:

8ba2b7057d07c42ff9e7545560c628de.png
8ba2b7057d07c42ff9e7545560c628de.png

监听的位置输入文字:

12de5867b16652f2fd4c12607eb32f00.png
12de5867b16652f2fd4c12607eb32f00.png

则可以在输出的位置看到结果:

0e88a484676a12748e965e99713d900f.png
0e88a484676a12748e965e99713d900f.png
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020.04.29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装
  • demo
  • demo 测试
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档