前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >这次来整个高端的API实时QPS流计算

这次来整个高端的API实时QPS流计算

作者头像
老李秀
发布2021-01-05 10:06:05
1.6K0
发布2021-01-05 10:06:05
举报

大家好,泥腿子安尼特又和大家见面了。转眼一年又要过去了,我也跌跌撞撞的算是翻完了这本。

就像读书的时候周日晚上补作业一样,我也想在2020年再写一篇文章。前段时间倒腾了下配置中心,但是因为学艺不精,再加上连个CAP都不懂的人,我感觉写了可能也没卵用,不能提升姿势。那我整点啥,那就来简介下今年火到爆的Flink。

开篇

那Flink到底是个啥,来我们来看下它官网的介绍。

是不是和我第一眼看到的一样,不知所云,先不用管,主要这个东西前面带个Apache就很牛逼。(扯个题外话

,几年前,我刚入行PHP的时候,我清晰的记得有个面试题,web服务器,nginx与apache比,然后为啥nginx牛逼,那时候我记得就百度到的答案默念一遍,然后apache在我心中一直是个拉胯的存在= =)

那Flink又有多牛逼呢!我来上个图,最近股价猛跌的福报厂双11的时候用Flink进行实时计算是这样的

是不是很牛逼!

统计关键词

好的 废话不多说,我们基于官网的demo 开始进入Flink的旅程

我们先不管什么是流什么是批,对着代码就是干

代码语言:javascript
复制
package org.myorg.quickstart;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchJob {

public static void main(String[] args) throws Exception {
// set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.readTextFile("/path/xxxx.log");

        DataSet<Tuple2<String, Integer>> counts =
                text.flatMap(new LogLevelFilter())
                        .groupBy(0)
                        .sum(1);

        counts.print();

        env.execute("Flink Batch Java API Skeleton");
    }

// 自定义函数
public static class LogLevelFilter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// json decode每一行的log
            JSONObject jsonObject = JSON.parseObject(value);
// 统计不同level的log
if (jsonObject.containsKey("level")) {
String level = jsonObject.getString("level");
                out.collect(new Tuple2<>(level, 1));
            }
        }
    }
}

那输出结果如下

这时候有小伙伴要问了,这就是大数据,实时流计算???

差不多一行linux命令可以搞定

这只是个demo,能统计关键词了 那我们再扩展一下,基于nginx的access.log 我们搞个实时统计网站qps

实时QPS统计

我们先开启nginx access log 顺便把每一行的log记录成json串 比如这样

代码语言:javascript
复制
{
"@timestamp":"2020-12-27T18:58:38+08:00",
"remote_addr":"127.0.0.1",
"referer":"-",
"request":"GET /wechat/config HTTP/1.1",
"status":200,
"bytes":64,
"user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
"http_x_forwarded":"-",
"request_time":"0.145"
}

然后 上代码 实时读取文件流 算qps

flink 读取文件流有两种模式 一种是直接一次性读完 一种是持续性检测,因为nginx access log是会不断增加的 所以我们选择第二种 来实时统计网站请求状态码的count

代码语言:javascript
复制
package org.myorg.quickstart;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingJob {

public static final Logger logger = LoggerFactory.getLogger(StreamingJob.class);

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TextInputFormat textInputFormat = new TextInputFormat(null);

DataStream<String> nginxAccessLog = env.readFile(textInputFormat, "path/to/nginx_log/access.log", FileProcessingMode.PROCESS_CONTINUOUSLY, 100);

DataStream<StatusCount> nginxAccessLogStream = nginxAccessLog.flatMap((FlatMapFunction<String, StatusCount>) (s, collector) -> {
try {
JSONObject jsonObject = JSON.parseObject(s);
if (jsonObject.containsKey("status")) {
Integer status = jsonObject.getInteger("status");
if (status.compareTo(500) < 0) {
                        collector.collect(StatusCount.of(status + "", 1L));
                    }
                }
            } catch (Exception e) {
                logger.error("解析nginx access log 错误 {}", e.getMessage());
            }
        });
DataStream<StatusCount> windowsCounts = nginxAccessLogStream.
                keyBy("status").
                timeWindow(Time.seconds(1)).
                sum("count");
        windowsCounts.print().setParallelism(1);

        env.execute("Flink Streaming Java API Skeleton");
    }

public static class StatusCount {

public String status;
public Long count;

public static StatusCount of(String word, Long count) {
StatusCount statusCount = new StatusCount();
            statusCount.status = word;
            statusCount.count = count;
return statusCount;
        }

        @Override
public String toString() {
return "status:" + status + " count:" + count;
        }
    }
}

效果就是这样的

这里就实时的打印出了每秒中nginx access log中状态小于500的所有status

这样岂不是完成了实时统计QPS 而且还可以按状态分组。

这时候又有小伙伴要问了

我实时cat nginx log也差不多也行啊!

假如你公司有50台api服务器 每台每天产生500G的日志 而且日志按小时或者文件名分割 你cat给我一个看看!

实际生产环境,现在主流都是ELK一套来管理log(我之前也大致介绍过),运维也不会直接把log往ES插,因为高峰期的时候 ES的写入速度并不快 可能会插崩它。所以,运维一般还是把log收集到kafka,然后消费kafka的方式插入ES,flink也可以消费kafka,只要把这里的文件流换成消费kakfa就可以做到算出API整体的QPS了。

如果你看到了这里,实操之后,我们再回过头来解释下刚才的代码,再了解下flink是个啥。因为如果开篇就大肆介绍名词 简介,我感觉你们也不会看,因为感觉跟自己没啥关系。

我们看上述两个例子的代码,都是先读取一个文件流,然后用自定义的类来解析每行文本,然后第一个例子group就像你们sql中groupby 因为我把每行文本的level提取出来了,然后还有个计数,所以有个Tuple2。

第二个例子稍微麻烦点,可能也难以理解点,因为用到了时间窗口。就是我把每秒读取文本里的内容当做一个独立的时间窗口,这样每秒access log里各种status都打印出来了。而且他是可以一直在不断运行并且一直打印下去的。

那我还是不明白flink牛逼在哪啊!我再来介绍一个概念,是什么是有界流,什么是无界流

假如李老某年某月开了个网站,

那么网站的数据的开始时间就是他第一次网站发布的时候。现在这个网站也还开着,每天都不断有人陆陆续续的访问,数据一致在积累,假如50年后李老嗝屁了,但是小李还继续维护着这个网站,100年后,小李也嗝屁了,小小李说不定还继续维护着这个网站。所以你不知道这个数据的边界在哪,数据从现在到未来一直会源源不断的流进来,这就是无界的数据流。就像我上面两个demo,第一个我一次性读了这个文本,那么数据是有界限的,第二个例子,因为我nginx access log就可以类比李老的网站,没有界限,所以可以叫它无界流。而flink就是非常方便能处理这些无界流的数据。

我们再来看官网那句话 ——

Stateful Computations Over Streams

在流上进行有状态的计算,是不是有点觉得牛逼了呢。当然我只是单机随便演示下demo。flink可以稳定的运行在大数据成熟的yarn集群上,一个flink job可以消费多个流 而且可以保存多个状态。flink集成了消费kafka、rabbit MQ 等等之类的数据源,所以用起来也很方便。比如你可以消费kafka里的上报数据,kafka里的binlog数据,来实时计算比如一分钟的订单数啊,一分钟内的GVM啊等等之类。至于其它一些高端的概念,比如什么滑动窗口、滚动窗口、什么水印、什么反压机制,我也不懂。


本文主要基于一些简单的demo简介了flink 里面很多概念跟代码都没解释清楚,主要是不想让大家入门的时候接收太多的名词概念

本泥腿子在线上刚发布过一个flink job 目前也有很多不懂的,所以可能表述的不是那么好 大家见谅

如果大家有兴趣学这个玩意,真的学好了真的高薪,现在这东西火的一逼

如果有兴趣的话 可以对着官网看几眼,官网介绍的挺全的。

https://flink.apache.org/zh/flink-architecture.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 高性能API社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档