前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Griffin+Flink+Kafka实现流式数据质量监控实战

Apache Griffin+Flink+Kafka实现流式数据质量监控实战

作者头像
王知无-import_bigdata
发布2022-04-13 09:41:27
1.1K0
发布2022-04-13 09:41:27
举报

一. 组件及版本

本文用的组件包括以下几个,是参考了官方案例,版本可以参考github以及里面的pom文件。本文假定以下环境均已安装好。

  • JDK (1.8)
  • MySQL(version 5.6)
  • Hadoop (2.7.2)
  • Hive (version 2.4)
  • Spark (version 2.4.1)
  • Kafka (version 0.11)
  • Griffin (version 0.6.0)
  • Zookeeper (version 3.4.1)

这里有详细的配置过程和可能遇到的bug。

二. kafka数据生成脚本

由于是测试案例,我们就写一个生成数据的脚本,并且把数据写到kafka source中,真实的场景应该是源源不断写数据到kafka中的(比如flume或者其他工具),具体数据脚本和模版可以参考官方demo数据

gen-data.sh

代码语言:javascript
复制
#!/bin/bash

#current time
cur_time=`date +%Y-%m-%d_%H:%M:%S`
sed s/TIME/$cur_time/ /opt/module/data/source.temp > /opt/module/data/source.tp

#create data
for row in 1 2 3 4 5 6 7 8 9 10
do
  sed -n "${row}p" < /opt/module/data/source.tp > sline
  cnt=`shuf -i1-2 -n1`
  clr="red"
  if [ $cnt == 2 ]; then clr="yellow"; fi
  sed s/COLOR/$clr/ sline >> /opt/module/data/source.data
done
rm sline

rm source.tp

#import data
kafka-console-producer.sh --broker-list hadoop101:9092 --topic source < /opt/module/data/source.data

rm source.data

echo "insert data at ${cur_time}"

streaming-data.sh

代码语言:javascript
复制
#!/bin/bash

#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target

#every minute
set +e
while true
do
  /opt/module/data/gen-data.sh
  sleep 90
done
set -e

source.temp

代码语言:javascript
复制
{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
{"id": 2, "name": "Banana", "color": "COLOR", "time": "TIME"}
{"id": 3, "name": "Cherry", "color": "COLOR", "time": "TIME"}
{"id": 4, "name": "Durian", "color": "COLOR", "time": "TIME"}
{"id": 5, "name": "Lichee", "color": "COLOR", "time": "TIME"}
{"id": 6, "name": "Peach", "color": "COLOR", "time": "TIME"}
{"id": 7, "name": "Papaya", "color": "COLOR", "time": "TIME"}
{"id": 8, "name": "Lemon", "color": "COLOR", "time": "TIME"}
{"id": 9, "name": "Mango", "color": "COLOR", "time": "TIME"}
{"id": 10, "name": "Pitaya", "color": "COLOR", "time": "TIME"}

三. Flink流式处理

flink流式数据分成三个部分,读取kafka,业务处理,写入kafka

  1. 首先交代我的pom.xml引入的依赖
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xxxx</groupId>
    <artifactId>kafka_Flink_kafka_Test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>



            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <relocations>
                                <relocation>
                                    <pattern>org.codehaus.plexus.util</pattern>
                                    <shadedPattern>org.shaded.plexus.util</shadedPattern>
                                    <excludes>
                                        <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                        <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                    </excludes>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!--<dependency>-->
        <!--<groupId>org.apache.flink</groupId>-->
        <!--<artifactId>flink-table_2.10</artifactId>-->
        <!--<version>1.3.2</version>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20090211</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

</project>
  1. 先写个bean类模版,用来接收json数据
代码语言:javascript
复制
import java.util.Date;

public class Student{
    private int id;
    private String name;
    private String color;
    private Date time;

    public Student(){}

    public Student(int id, String name, String color, Date time) {
        this.id = id;
        this.name = name;
        this.color = color;
        this.time = time;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor() {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", color='" + color + '\'' +
                ", time='" + time + '\'' +
                '}';
    }
}
  1. 读取kafka,有关读取和写入kafka的配置信息,是可以写到kafkaUtil工具类中的,我这里为了方便,就直接嵌入到代码中了,就做个测试
代码语言:javascript
复制
// 创建Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 一定要设置启动检查点!!
        //env.enableCheckpointing(5000);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // Kafka参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop101:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        String inputTopic = "source";
        String outputTopic = "target";

        // Source
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);
  1. flink业务处理,这一块由于所处的业务不同,我只是简单demo以下,以20%的概率修改数据使之成为异常数据用于检测,这是为了模拟业务中可能对数据处理有误而发生数据质量问题。这里要特别提一下,本案例是假定flink业务处理时延忽略不计,真实场景中可能由于flink处理延迟导致target端误认为数据丢失,这一部分我还在研究他的源码,日后更新,有了解的大神,还请指点迷津。
代码语言:javascript
复制
//使用Flink算子简单处理数据
        // Transformations
        // 使用Flink算子对输入流的文本进行操作
        // 按空格切词、计数、分区、设置时间窗口、聚合
        //{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
        DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return handleData(value);
            }
        });
代码语言:javascript
复制
public static String handleData(String line){
        try {
                if (line!=null&& !line.equals("")){
                    Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
                    JsonReader reader = new JsonReader(new StringReader(line));
                    Student student = gson.fromJson(reader, Student.class);
                    int rand = ra.nextInt(10) + 1;
                    if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));
                    return gson.toJson(student);
                }
                else return "";
        }catch (Exception e){
            return "";
        }
    }

因为遇到了几个bug,所以这样创建gson

  1. 写入kafka,其中FlinkKafkaProducer010我们选择的构造器是(brokerList,topicId,serializationSchema)
代码语言:javascript
复制
//Sink
        outMap.addSink(new FlinkKafkaProducer010<String>(
                "hadoop101:9092",
                "target",
                new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();

四. Apache Griffin配置与启动

有关griffin的streaming模式配置,就是配置dq.json和env.json

dq.json

代码语言:javascript
复制
{
  "name": "streaming_accu",
  "process.type": "streaming",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connector": 
        {
          "type": "kafka",
          "version": "0.10",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "hadoop101:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "source_1",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ,
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/source",
        "info.path": "source_1",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-5m", "0"],
        "updatable": true
      }
    }, {
      "name": "tgt",
      "connector": 
        {
          "type": "kafka",
          "version": "0.10",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "hadoop101:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "target_1",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ,
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/target",
        "info.path": "target_1",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-1m", "0"]
      }
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "src.login_id = tgt.login_id AND src.bussiness_id = tgt.bussiness_id AND src.event_id = tgt.event_id",
        "details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out":[
          {
            "type":"metric",
            "name": "accu"
          },
          {
            "type":"record",
            "name": "missRecords"
          }
        ]
      }
    ]
  },
  "sinks": ["HdfsSink"]
}

env.json

代码语言:javascript
复制
{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs://hadoop101:9000/griffin/checkpoint",
    "batch.interval": "20s",
    "process.interval": "1m",
    "init.clear": true,
    "config": {
      "spark.default.parallelism": 4,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },
  "sinks": [
    {
      "name":"ConsoleSink",
      "type": "console"
    },
    {
      "name":"HdfsSink",
      "type": "hdfs",
      "config": {
        "path": "hdfs://hadoop101:9000/griffin/persist"
      }
    },
    {
      "name":"ElasticsearchSink",
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://hadoop101:9200/griffin/accuracy"
      }
    }
  ],
  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "hadoop101:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}

最后把项目提交到spark上运行,检测数据

代码语言:javascript
复制
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json

五. 全局代码

在本地创建个maven项目,由于这是个简单的测试项目,自己构建就好,我只写了两个类做测试

Student.class

代码语言:javascript
复制
import java.util.Date;

public class Student{
    private int id;
    private String name;
    private String color;
    private Date time;

    public Student(){}

    public Student(int id, String name, String color, Date time) {
        this.id = id;
        this.name = name;
        this.color = color;
        this.time = time;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor() {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", color='" + color + '\'' +
                ", time='" + time + '\'' +
                '}';
    }
}

flinkProcess.class

代码语言:javascript
复制
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.io.StringReader;
import java.util.Properties;
import java.util.Random;

public class flinkProcess {
    public static Random ra = new Random();
    public static void main(String[] args) throws Exception {
        // 创建Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 一定要设置启动检查点!!
        //env.enableCheckpointing(5000);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // Kafka参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop101:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        String inputTopic = "source";
        String outputTopic = "target";

        // Source
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);

        //使用Flink算子简单处理数据
        // Transformations
        // 使用Flink算子对输入流的文本进行操作
        // 按空格切词、计数、分区、设置时间窗口、聚合
        //{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
        DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return handleData(value);
            }
        });

        //Sink
        outMap.addSink(new FlinkKafkaProducer010<String>(
                "hadoop101:9092",
                "target",
                new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();
    }

    public static String handleData(String line){
        try {
                if (line!=null&& !line.equals("")){
                    Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
                    JsonReader reader = new JsonReader(new StringReader(line));
                    Student student = gson.fromJson(reader, Student.class);
                    int rand = ra.nextInt(10) + 1;
                    if (rand > 8) student.setName(student.getName() + "_" + ra.nextInt(10));
                    return gson.toJson(student);
                }
                else return "";
        }catch (Exception e){
            return "";
        }
    }
}

提示:在kafka中如果生成了一些不合格式的数据,程序会一直报错,可以参考这篇文章删除掉相应的kafka dataDir和zookeeper的znode数据,重新生成数据,运行代码。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 组件及版本
  • 二. kafka数据生成脚本
  • 三. Flink流式处理
  • 四. Apache Griffin配置与启动
  • 五. 全局代码
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档