前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >从零开始快速构建自己的Flink应用

从零开始快速构建自己的Flink应用

原创
作者头像
程序员白总
发布于 2024-02-19 13:55:59
发布于 2024-02-19 13:55:59
2520
举报
文章被收录于专栏:FlinkFlink

本文介绍如何在 mac 下快速构建属于自己的 Flink 应用。

1. 本地安装 flink

在 mac 上使用homebrew安装 flink:

代码语言:bash
AI代码解释
复制
brew install apache-flink

查看安装的位置:

代码语言:bash
AI代码解释
复制
brew info apache-flink

进入安装目录,启动 flink 集群:

代码语言:bash
AI代码解释
复制
cd /usr/local/Cellar/apache-flink/1.18.0
./libexec/bin/start-cluster.sh

进入 web 页面:http://localhost:8081/

2. 构建项目

基于模板直接构建一个项目:

代码语言:bash
AI代码解释
复制
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.18.0
cd quickstart

在项目的 DataStreamJob 类实现如下计数的功能:

代码语言:java
AI代码解释
复制
package org.myorg.quickstart;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class DataStreamJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.socketTextStream("127.0.0.1", 9000)
        .flatMap(new LineSplitter())
        .keyBy(0)
        .sum(1)
        .print();

        env.execute("WordCount");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token : tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

在上面的例子中,我们使用 DataStream API 构建了一个 Flink 应用,数据源(source)为本地的 socket 9000 端口,经过 flatMap、keyBy、sum 三个转换操作之后,最后打印到标准输出流。整体流程如下图:

3. 运行

启动 socket 连接,监听 9000 端口:

代码语言:bash
AI代码解释
复制
nc -l 9000

打包,上传(可以使用 Web UI 界面上传,也可以使用命令行上传)。

上传后,就可以在 WebUI 看到正在运行的 job 了。

此时通过在 socket 输入内容,

就可以在 task manager 的 stdout 看到打印结果了。

4. 总结

本文从零开始在本地构建运行了一个 Flink 应用,包括 Flink 集群的安装、Flink 应用的构建,以及 Flink 应用的运行。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
1 条评论
热度
最新
1
1
回复回复点赞举报
推荐阅读
Java&Go高性能队列之Disruptor性能测试
之前写过Java&Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor高性能消息队列的应用文章:高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿。
FunTester
2022/04/01
8950
Java&Go高性能队列之LinkedBlockingQueue性能测试
在写完高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿视频版之后,我就一直在准备Java & Go 语言几种高性能消息队列的性能测试,其中选取了几种基准测试场景以及在性能测试中的应用场景。
FunTester
2022/02/08
1.4K3
基于docker的分布式性能测试框架功能验证(三)
本文是DCS_FunTester测试框架分布式性能测试功能拓展实践,是一种比较粗略的技术验证实践,技术方案采用
FunTester
2021/07/23
3510
基于docker的分布式性能测试框架功能验证(三)
分布式性能测试框架用例方案设想(一)
在近期工作规划中,分布式压测框架提上日程,目前「FunTester」已经具备了一些分布式压测中用到的功能。
FunTester
2021/06/23
7150
分布式性能测试框架用例方案设想(三)
性能测试脚本基于FunTester性能测试框架,在之前的方案二中,我们需要将用例写进去基础的jar包中,然后通过反射调用,灵活之处就是可以将用例的主要变量参数化,但是死板之处就是用例整体的设计已经完成了,参数化能力有限。如果用例场景需要增添,这种方式也就无能为力了,又时候甚至一点点的用例改动都会造成用例失效的情况。
FunTester
2021/07/23
5220
自动化项目基类实践--视频演示
接口测试中业务验证 http://mpvideo.qpic.cn/0bf2raaagaaaauaafxudrvpfbcgdaoeaaaya.f10002.mp4?dis_k=c999f00b893c9
FunTester
2020/04/08
3110
基于时间戳的日志回放引擎
之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。
FunTester
2022/12/09
3120
高性能队列Disruptor在测试中应用
最近在研究goreplay的源码的过程中,感觉有些思路还是很值得借鉴。所以自己立了一个flag,实现一个千万级日志回放功能。但是在这个实现的过程中遇到一个棘手的问题:Java自带的LinkedBlockingQueue比较难以直接满足需求场景和性能要求。
FunTester
2022/02/08
8350
Java&Go三种HTTP客户端性能测试
在学完Golang语言HTTP客户端实践、Go语言HTTPServer开发的六种实现之后,我自然开始了Java&Go两种语言的HTTP客户端性能测试。
FunTester
2021/12/02
1K0
单机12万QPS——FunTester复仇记
在文章10万QPS,K6、Gatling和FunTester终极对决!中,最后测试结果FunTester除了在在CPU方面有一丁点优势以外,内存和QPS均略逊一筹,特别是内存方面劣势尤为明显。当时立了一个flag:
FunTester
2021/08/18
4150
性能测试中记录每一个耗时请求
在之前的文章性能测试中标记每个请求中提到,把每一个接口都进行requestID的标记,接下来的工作就简单了,就是设置各种超时配置,然后进行压测,会记录超时的请求ID和响应时间(采取“响应时间_requestID”形式),结果如图:
FunTester
2020/02/17
4640
基于docker的分布式性能测试框架功能验证(二)
本文是FunTester测试框架分布式性能测试功能拓展实践,是一种比较粗略的技术验证实践,技术方案采用分布式性能测试框架用例方案设想(二)中所设想场景,基于jar包内函数的,这个方案需要将测试用例写到更新到jar包中或者classpath目录下。
FunTester
2021/06/23
3270
下单延迟10s撤单性能测试
研发提出了一个下单之后延迟10s撤单的压测需求,着实让我迷糊了一会儿,最后参考了Java的延迟队列java.util.concurrent.DelayQueue实现了这个需求。
FunTester
2021/12/02
4290
下单延迟10s撤单性能测试
单点登录性能测试方案
项目登录系统升级,改为单点登录:英文全称Single Sign On。SSO是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。 之前有的统一登录方式被废弃,由于单点登录比较之前的登录系统复杂很多。之前的方案请求一个接口即可获得用户校验令牌。 先分享一下单点登录的技术方案的时序图:
FunTester
2019/10/08
1.6K0
单点登录性能测试方案
链路压测中如何记录每一个耗时的请求
前文回顾:性能测试中记录每一个耗时请求,做完了单接口耗时请求的记录功能,近期又迎来了一批多接口链路压测的需求。刚好趁着这个机会,多实现一些不同场景的链路压测需求,锻炼一波,也能提高自己写的「FunTester」测试框架的兼容性,可谓一石多鸟,何乐而不为。
FunTester
2020/12/24
8510
DCS_FunTester分布式压测框架更新(一)
在经过了一些人生的思考和积累,终于开始第一轮的更新,除了修复BUG以外,还进行不少功能性更新。另外本地部署已经完成了。有兴趣的可以自己多体验一下。分布式性能测试框架单节点内测
FunTester
2021/07/23
3980
性能测试中标记每个请求
在做性能测试过程中,遇到一个棘手的问题,开发让我们复现几个请求时间较长的请求,他们看日志进行链路追踪,查找瓶颈所在。
FunTester
2020/01/17
4140
拷贝HttpRequestBase对象
在实践性能测试框架第二版的过程中,我实现了一个单个HttpRequestBase对象的concurrent对象创建,单之前都是用使用唯一的HttpRequestBase对象进行多线程请求,目前来看是没有问题的,但为了防止以后出现意外BUG和统一concurrent的构造方法使用,故尝试拷贝了一个HttpRequestBase对象。原因是因为之前封装的深拷贝方法对于HttpRequestBase对象的实现类如:httpget和httppost并不适用,因为没有实现Serializable接口。所以单独写了一个HttpRequestBase对象的拷贝方法,供大家参考。
FunTester
2019/12/31
5740
HTTP接口测试基础【FunTester框架教程】
今天继续编写FunTester测试框架的教程,主要内容是HTTP接口测试基础,分为请求(GET、POST、PUT等)、请求头、cookie、响应、JSON以及资源释放。
FunTester
2021/09/14
5560
基于docker的分布式性能测试框架功能验证(一)
本文是「FunTester」测试框架分布式性能测试功能拓展实践,是一种比较粗略的技术验证实践,技术方案采用分布式性能测试框架用例方案设想(一)中所设想场景。
FunTester
2021/06/23
4270
推荐阅读
相关推荐
Java&Go高性能队列之Disruptor性能测试
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文