Flink学习笔记:1、Flink快速入门

官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html

flink在streaming上,比spark的设计要更为良好,spark的stream应该都很清楚了,是microbatch,本质上并不是严格的stream,flink将dataset分为batch和stream,我觉得是一个非常好的思路,因为本质上就不是一个东西

1.1、下载与安装

http://flink.apache.org/downloads.html

http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.10.tgz

[root@centos ~]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.10.tgz
--2017-11-19 08:46:47--  http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.10.tgz
Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.6.178, 2402:f000:1:416:101:6:6:178
Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.6.178|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 146881599 (140M) [application/octet-stream]
Saving to: ‘flink-1.3.2-bin-hadoop27-scala_2.10.tgz.1’

100%[=======================================================================================>] 146,881,599 1.40MB/s   in 1m 44s 

2017-11-19 08:48:30 (1.35 MB/s) - ‘flink-1.3.2-bin-hadoop27-scala_2.10.tgz.1’ saved [146881599/146881599]

[root@centos ~]# ls
anaconda-ks.cfg                          
flink-1.3.2-bin-hadoop27-scala_2.10.tgz 
[root@centos ~]# 
[root@node1 ~]# tar -zxvf flink-1.3.2-bin-hadoop27-scala_2.10.tgz -C /opt
[root@node1 ~]# cd /opt/flink-1.3.2/
[root@node1 flink-1.3.2]# bin/start-local.sh
Starting jobmanager daemon on host node1.
[root@node1 flink-1.3.2]#

打开http://192.168.80.131:8081检查Jobmanager和其他组件是否正常运行。Web前端应该显示了只有一个可用的 TaskManager。

查看日志

[root@node1 flink-1.3.2]# tail log/flink-*-jobmanager-*.log
2017-11-19 08:59:40,788 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: b1c3d16b52d0245feac840080ee1f387 @ localhost (dataPort=37405)
2017-11-19 08:59:40,788 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 1 task slot(s).
2017-11-19 08:59:40,790 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 262/1963/1963 MB, NON HEAP: 36/37/-1 MB (used/committed/max)]
2017-11-19 08:59:40,820 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2017-11-19 08:59:40,832 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - TaskManager b1c3d16b52d0245feac840080ee1f387 has started.
2017-11-19 08:59:40,840 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager#235294181] - leader session 00000000-0000-0000-0000-000000000000
2017-11-19 08:59:40,845 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at localhost (akka://flink/user/taskmanager) as a3cda67b9b60ece3c5da03afb128c2d8. Current number of registered hosts is 1. Current number of alive task slots is 1.
2017-11-19 08:59:40,854 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka://flink/user/jobmanager), starting network stack and library cache.
2017-11-19 08:59:40,862 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:39996. Starting BLOB cache.
2017-11-19 08:59:40,865 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-5bb19dd1-7cb5-45fc-8007-8eb6f6908868
[root@node1 flink-1.3.2]#

1.2、运行样例

现在,我们来运行SocketTextStreamWordCount例子,它从socket中获取文本,然后计算每个单词出现的次数。操作步骤如下:

(1)首先,我们使用netcat来启动本地服务器:

[root@node1 flink-1.3.2]# nc -l 9000

一直等待中

(2)另打开一个终端,然后我们就可以提交Flink程序了。

[root@node1 flink-1.3.2]# bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: 7250da9a40999a17046c65c68c164e97. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#235294181] with leader session id 00000000-0000-0000-0000-000000000000.
11/19/2017 09:07:13 Job execution switched to status RUNNING.
11/19/2017 09:07:13 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED 
11/19/2017 09:07:13 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@ab644855, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@60f00693}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
11/19/2017 09:07:13 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING 
11/19/2017 09:07:13 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@ab644855, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@60f00693}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
11/19/2017 09:07:13 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING 
11/19/2017 09:07:13 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@ab644855, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@60f00693}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Sink: Unnamed(1/1) switched to RUNNING 

(3)查看WebUI The program connects to the socket and waits for input. You can check the web interface to verify that the job is running as expected: 这个程序和socket进行了连接,并等待输入。我们可以在WEB UI中检查Job是否正常运行

(4)实时输入数据 Words are counted in time windows of 5 seconds (processing time, tumbling windows) and are printed to stdout. Monitor the JobManager’s output file and write some text in nc (input is sent to Flink line by line after hitting ): 计数会打印到标准输出stdout。监控JobManager的输出文件(.out文件),并在nc中敲入一些单词:

[root@node1 flink-1.3.2]# nc -l 9000
hello
hello,flink
bye

(5)实时处理结果 The .out file will print the counts at the end of each time window as long as words are floating in, e.g.: .out 文件会立即打印出单词的计数: 另打开一个窗口,

[root@node1 flink-1.3.2]# tail -f log/flink-*-jobmanager-*.out
hello: 1
hello,flink : 1
bye : 1

(6)停止 Flink

[root@node1 flink-1.3.2]# bin/stop-local.sh
Stopping jobmanager daemon (pid: 2568) on host node1.
[root@node1 flink-1.3.2]#

1.3 阅读样例源码

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala

package org.apache.flink.streaming.scala.examples.socket

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * Implements a streaming windowed version of the "WordCount" program.
 * 
 * This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text sever (at port 12345) 
 * using the ''netcat'' tool via
 * {{{
 * nc -l 12345
 * }}}
 * and run this example with the hostname and the port as arguments..
 */
object SocketWindowWordCount {

  /** Main program method */
  def main(args: Array[String]) : Unit = {

    // the host and the port to connect to
    var hostname: String = "localhost"
    var port: Int = 0

    try {
      val params = ParameterTool.fromArgs(args)
      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
      port = params.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("No port specified. Please run 'SocketWindowWordCount " +
          "--hostname <hostname> --port <port>', where hostname (localhost by default) and port " +
          "is the address of the text server")
        System.err.println("To start a simple text server, run 'netcat -l <port>' " +
          "and type the input text into the command line")
        return
      }
    }

    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to the socket
    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')

    // parse the data, group it, window it, and aggregate the counts 
    val windowCounts = text
          .flatMap { w => w.split("\\s") }
          .map { w => WordWithCount(w, 1) }
          .keyBy("word")
          .timeWindow(Time.seconds(5))
          .sum("count")

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1)

    env.execute("Socket Window WordCount")
  }

  /** Data type for words with count */
  case class WordWithCount(word: String, count: Long)
}

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java

package org.apache.flink.streaming.examples.socket;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * <p>This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text server (at port 12345)
 * using the <i>netcat</i> tool via
 * <pre>
 * nc -l 12345
 * </pre>
 * and run this example with the hostname and the port as arguments.
 */
@SuppressWarnings("serial")
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                "type the input text into the command line");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })

                .keyBy("word")
                .timeWindow(Time.seconds(5))

                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // ------------------------------------------------------------------------

    /**
     * Data type for words with count.
     */
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala

package org.apache.flink.streaming.scala.examples.wordcount

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData

/**
 * Implements the "WordCount" program that computes a simple word occurrence
 * histogram over text files in a streaming fashion.
 *
 * The input is a plain text file with lines separated by newline characters.
 *
 * Usage:
 * {{{
 * WordCount --input <path> --output <path>
 * }}}
 *
 * If no parameters are provided, the program is run with default data from
 * {@link WordCountData}.
 *
 * This example shows how to:
 *
 *  - write a simple Flink Streaming program,
 *  - use tuple data types,
 *  - write and use transformation functions.
 *
 */
object WordCount {

  def main(args: Array[String]) {

    // Checking input parameters
    val params = ParameterTool.fromArgs(args)

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // make parameters available in the web interface
    env.getConfig.setGlobalJobParameters(params)

    // get input data
    val text =
    // read the text file from given input path
    if (params.has("input")) {
      env.readTextFile(params.get("input"))
    } else {
      println("Executing WordCount example with default inputs data set.")
      println("Use --input to specify file input.")
      // get default test text data
      env.fromElements(WordCountData.WORDS: _*)
    }

    val counts: DataStream[(String, Int)] = text
      // split up the lines in pairs (2-tuples) containing: (word,1)
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      // group by the tuple field "0" and sum up tuple field "1"
      .keyBy(0)
      .sum(1)

    // emit result
    if (params.has("output")) {
      counts.writeAsText(params.get("output"))
    } else {
      println("Printing result to stdout. Use --output to specify output path.")
      counts.print()
    }

    // execute program
    env.execute("Streaming WordCount")
  }
}

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java

package org.apache.flink.streaming.examples.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;

/**
 * Implements the "WordCount" program that computes a simple word occurrence
 * histogram over text files in a streaming fashion.
 *
 * <p>The input is a plain text file with lines separated by newline characters.
 *
 * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
 * If no parameters are provided, the program is run with default data from
 * {@link WordCountData}.
 *
 * <p>This example shows how to:
 * <ul>
 * <li>write a simple Flink Streaming program,
 * <li>use tuple data types,
 * <li>write and use user-defined functions.
 * </ul>
 */
public class WordCount {

    // *************************************************************************
    // PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

        DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
        // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0).sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

        // execute program
        env.execute("Streaming WordCount");
    }

    // *************************************************************************
    // USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a
     * user-defined FlatMapFunction. The function takes a line (String) and
     * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
     * Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
                throws Exception {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JavaQ

自定义注解设置缓存有效期的正确姿势

引言 redis缓存的有效期可以通过xml配置文件设置(默认有效期),也可以通过编码的方式手动去设置,但是这两种方式都存在缺陷。xml方式设置的是全局的默认有效...

3827
来自专栏菩提树下的杨过

rpc框架之gRPC 学习 - hello world

grpc是google在github于2015年开源的一款RPC框架,虽然protobuf很早google就开源了,但是google一直没推出正式的开源框架,导...

5287
来自专栏安富莱嵌入式技术分享

【二代示波器教程】第15章 FreeRTOS操作系统版本二代示波器实现

本章教程为大家讲解FreeRTOS操作系统版本的二代示波器实现。主要讲解RTOS设计框架,即各个任务实现的功能,任务间的通信方案选择,任务栈,系统栈以及全局变量...

831
来自专栏纯洁的微笑

springboot(十一):Spring boot中mongodb的使用

mongodb是最早热门非关系数据库的之一,使用也比较普遍,一般会用做离线数据分析来使用,放到内网的居多。由于很多公司使用了云服务,服务器默认都开放了外网地址,...

2936
来自专栏Jerry的SAP技术分享

给广大码农分享福利:一个业界良心的github仓库,中文计算机资料

我今天查资料时无意发现的,https://github.com/CyC2018/CS-Notes

1682
来自专栏Golang语言社区

如果裸写一个goroutine pool

引言 在上文中,我说到golang的原生http server处理client的connection的时候,每个connection起一个goroutine,这...

4016
来自专栏技术翻译

关于Couchbase-Dzone数据库,你必须了解的10件事情

此功能已经存在了一段时间,但仍值得一提。一些Key-Value Store只允许你将整个文档全部整合在一起,这是一个合理的。但是,如果你使用Couchbase作...

440
来自专栏施炯的IoT开发专栏

移动物联网 之 家电节能 (2)

    本系列文章结合时下正热的“物联网”概念,介绍实现“家电节能”的一套解决方案。本部分讲述 “家电节能”的具体实现方法。 1. 系统结构 系统包括Senso...

1897
来自专栏云计算教程系列

如何使用腾讯云云硬盘API

腾讯云控制台允许您以类似于使用硬盘驱动器的方式管理腾讯云CVM的额外存储。只需点击腾讯云简化的GUI或图形用户界面,即可为我们的CVM添加云硬盘。但是,这不是一...

1082
来自专栏互联网杂技

SpringBoot ( 十一 ) :SpringBoot 中 mongodb 的使用

mongodb是最早热门非关系数据库的之一,使用也比较普遍,一般会用做离线数据分析来使用,放到内网的居多。由于很多公司使用了云服务,服务器默认都开放了外网地址,...

1002

扫码关注云+社区