前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >相信我,你也能成为大数据开发工程师(一)

相信我,你也能成为大数据开发工程师(一)

作者头像
老李秀
发布2021-06-17 20:26:40
4550
发布2021-06-17 20:26:40
举报

大家好啊,老李最近高产如母猪,我也来凑个热闹。说起来挺魔幻的,去年这时候,我还是一个连java curd都不会的菜鸡,今天却在这里大谈大数据开发- -。我也没想到,等以后有机会可以讲讲写java的心路历程,目前还是一个java菜鸟,也因为目前的公司部门里没有足够的数据开发,我自己硬着头皮写了几个Flink应用,没想到这东西上手还是挺简单的,所以就很想分享给大家。

都2021年了,我们看看现在的大数据开发什么东西火,毫无疑问,Flink这个新兴之子,占了很大一块。随便一搜某招聘网站

是不是心动了

简介&&准备

回过头来,

我们看看Flink到底是个啥,其实我之前有一篇文章讲过 这次来整个高端的API实时QPS流计算 。还是引用官网的那句话——Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. 简而言之,就是一个可以对流进行有状态的计算的这么个东西。之前的那篇文章可能对没接触过的人来说比较不友好,所以我想开个系列来让各位也可以“精通”Flink。

准备工作

  • 开发工具,IntelliJ IDEA
  • 安装java java的安装比php还简单,下载好jdk,设置好JAVA_HOME就OK,目前主流还是jdk8。可以参考 https://juejin.cn/post/6844903895504797710
  • 安装maven maven的话,也一样,下载下来直接配个环境变量就行。可以参考 https://cloud.tencent.com/developer/article/1680711

直接开干

我们先来安装运行Flink job的集群(反正这些运维会做好的,Flink还可以运行在yarn集群上,还有Flink on k8s,当然,它也可以单独以jar包的形式运行)

代码语言:javascript
复制
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz
代码语言:javascript
复制
tar -xzf flink-1.13.0-bin-scala_2.11.tgz
代码语言:javascript
复制
cd flink-1.13.0
./bin/start-cluster.sh

这些都没什么坑,装好了之后是这样的。

然后执行

代码语言:javascript
复制
./bin/flink run examples/streaming/WordCount.jar

然后我们打开 http://127.0.0.1:8081 可以看到

这上面就是控制台了,我们提交的Flink任务都能在这上面看到。刚才执行的WordCount.jar就是我们刚才运行的那个job。

能运行到这一步说明你已经成功了百分之99了,马上你就要“精通”了

“手动编写”个Flink任务

我们在一个目录下执行如下命令

代码语言:javascript
复制
 mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.13.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

然后我们就可以看到一个maven生成的项目叫frauddetection

这里我们需要把pom.xml的这两行删掉。为什么呢,这里就涉及到了maven打包的知识点了,如果有这个provided的,那么项目依赖的包就不会被打包到jar包里。具体可以参考 https://segmentfault.com/a/1190000038594247

为什么这里官方的demo这么设计呢,因为我们刚才装好了可以运行Flink任务的集群,集群里面已经有这些了,所以我们打成的jar包可以不用这些,这样可以大大减少jar包的大小,因为我们要在idea里面运行(不去掉idea里打出来的jar会找不到该class,会报错运行不起来),所以我这里还是建议大家可以去掉。

然后我们再用idea自带的maven插件执行clean跟install,就可以跑了。

这个demo任务的输出是这样的。

当你完成这一步的时候,Congratulations,You are Big Data开发工程师!!!你已经编写了一个反欺诈实时流计算应用了,是不是有点成就感。

代码解析

因为有些观众只喜欢听我扯淡,应该很少有人去手动实践,我这里就把这个frauddetection项目的代码贴下,顺便把代码的含义也用注释的方式写在下面,方便大家阅读

主要两个类 一个FraudDetectionJob

代码语言:javascript
复制
package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;


public class FraudDetectionJob {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    /*
      所有的flink任务都要有一个数据源
      这里的TransactionSource是flink包里自带的,
      里面无限循环生成信用卡模拟交易数据
      每条交易数据包括了信用卡 ID (accountId),
      交易发生的时间 (timestamp) 以及交易的金额(amount)
      name就是给这个环节起个名字的意思,不参与实际业务,下面也一样。
     */
    DataStream<Transaction> transactions = env
      .addSource(new TransactionSource())
      .name("transactions");

    /*
      这里的keyBy我们以后可以展开来学习,
      先简单的理解为就是把数据流按accountId相同的一个个分开去处理了
      process就是真正的处理类,
      就是一个自定义叫做FraudDetector的类,下面会讲到这个类里做了啥
     */
    DataStream<Alert> alerts = transactions
      .keyBy(Transaction::getAccountId)
      .process(new FraudDetector())
      .name("fraud-detector");

    /*
      这个sink可以暂时也不同管他是干嘛的,
      我们只要明白这里就是把数据处理结果通过终端输出就OK了
     */
    alerts
      .addSink(new AlertSink())
      .name("send-alerts");

    env.execute("Fraud Detection");
  }
}

另一个类FraudDetector

代码语言:javascript
复制
package spendreport;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        /*
          这里实例化了一个Alert类,
          把前面数据流里的accountId设置到了Alert类里的Id
          然后通过一个Collector类来收集并输出
         */
        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());
        collector.collect(alert);
    }
}

这时候,如果产品来了个小需求,对于一个账户,如果出现小于 1 的交易后紧跟着一个大于 500 的交易,就可能疑似在洗钱,需要输出告警一下。

按照业务开发思维,我们需要保存一下上一笔的交易状态,当然,flink里面也自带一些能存储状态的State。

我们只要稍微改下之前的FraudDetector处理类

代码语言:javascript
复制
package spendreport;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

/**
 * Skeleton code for implementing a fraud detector.
 */
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;
    
    /**
     * 定义小额交易的金额
     */
    private static final double SMALL_AMOUNT = 1.00;

    /**
     * 定义大额交易的金额
     */
    private static final double LARGE_AMOUNT = 500.00;

    /**
     * 定义一个存储布尔值的状态类
     */
    private transient ValueState<Boolean> flagState;

    @Override
    public void open(Configuration parameters) throws Exception {
        //重写类的open方法,初始化我们的状态类
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
        super.open(parameters);
    }

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        //先从状态类里获取上一笔交易是否小额交易
        Boolean lastTransactionWasSmall = flagState.value();

        //如果上一笔交易是小额交易,并且当前这笔是大额交易
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //告警输出
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            //清空State
            flagState.clear();
        }
        //如果当前笔是小额交易,设置State
        if (transaction.getAmount() < SMALL_AMOUNT) {
            flagState.update(true);
        }
    }
}

然后我们再看运行结果

完美,我们抓到了这个账户3的疑似欺诈交易的人。

是不是flink还是有点有趣的,而且是容易上手的,你们脑海里是不是也有一个flink到底是个啥的大致轮廓了。可能这个东西平时大家后端开发可能都接触不到,可能大数据也没大家想的那么高端,如果老哥们感兴趣的话,可以自己玩玩,我感觉是可以扩宽后端业务开发思维的,说不定能帮你解决一些你的疑难杂症需求。

感觉写的还行的话,各位老哥点个赞可好


本篇是是在官方教程文档来展开讲解

这一篇文章也算给我自己开了个新坑,后面应该还会有三四篇

参考资料

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 高性能API社区 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档