前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【基于Flink的城市交通实时监控平台】需求三:实时车辆分布情况-滚动窗口-JSON解析为对象

【基于Flink的城市交通实时监控平台】需求三:实时车辆分布情况-滚动窗口-JSON解析为对象

作者头像
火之高兴
发布2024-07-25 15:45:40
590
发布2024-07-25 15:45:40
举报
文章被收录于专栏:大数据应用技术
需求分析

实时车辆分布情况,是指在一段时间内(比如:10分钟)整个城市中每个区分布多少量车。这里要注意车辆的去重,因为在10分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。

具体需求以及技术选型:

  • 使用Flink读取kafka中发送的Json会话;
  • 使用反序列化工具解析Json会话字符串为JavaBean对象MonitorInfo.java(详见需求一);
  • getAreaId作为key,而后设置滚动窗口;
  • apply中编写业务代码统计车量;
  • 写入MySQL数据库t_area_control
测试数据
代码语言:javascript
复制
{"actionTime":1686647522,"monitorId":"0003","cameraId":"1","car":"豫A99999","speed":60,"roadId":"01","areaId":"20"}
{"actionTime":1686647523,"monitorId":"0004","cameraId":"1","car":"豫A99999","speed":80,"roadId":"01","areaId":"20"}
{"actionTime":1686647523,"monitorId":"0004","cameraId":"1","car":"豫A99999","speed":80,"roadId":"01","areaId":"30"}
{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99998","speed":60,"roadId":"01","areaId":"30"}
{"actionTime":1686647524,"monitorId":"0005","cameraId":"1","car":"豫A99997","speed":60,"roadId":"01","areaId":"10"}
测试结果查询样例:
代码语言:javascript
复制
id   区域编号    车的数量      窗口的开始时间   		    窗口的结束时间
1     20           1           20230-06-19 18:30:00         20230-06-19 18:40:00
2     30           2           20230-06-19 18:30:00         20230-06-19 18:40:00
3     10           1           20230-06-19 18:30:00         20230-06-19 18:40:00
建表语句
代码语言:javascript
复制
create table t_area_control(
	id 			int 	primary key auto_increment,
	area_id     varchar(50),
	car_count   int,
	window_start  varchar(50),
	window_end    varchar(50)
)
需求代码
代码语言:javascript
复制
package car;

import bean.MonitorInfo;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import util.Constants;
import util.JSONDeserializationSchema;

import java.util.HashSet;
import java.util.Properties;

/**
 * 实时车辆分布情况
 * 实时车辆分布情况,是指在一段时间内(比如:10分钟)整个城市中每个区分布多少量车。
 * 这里要注意车辆的去重,因为在10分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。
 * 统计结果如下
 * id   区域编号    车的数量      窗口的开始时间   		    窗口的结束时间
 * 1     20           1           20230-06-19 18:30:00         20230-06-19 18:40:00
 * 2     30           2           20230-06-19 18:30:00         20230-06-19 18:40:00
 * 3     10           1           20230-06-19 18:30:00         20230-06-19 18:40:00
 */
public class Test5_RealTimeDistribution {
    public static void main(String[] args) throws Exception {
        //TODO 1.env-准备环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        //TODO 2.source-加载数据
        //从kafka的topic1消费数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "flinkgroup3");
        //使用反序列化工具获取JSON字符串内容,将其解析为javaBean对象
        FlinkKafkaConsumer<MonitorInfo> consumer = new FlinkKafkaConsumer<MonitorInfo>("topic-car",
                new JSONDeserializationSchema<>(MonitorInfo.class), properties);
        DataStreamSource<MonitorInfo> ds1 = env.addSource(consumer);
        ds1.print();//MonitorInfo{actionTime = 1686647524, monitorId = 0005, cameraId = 1, car = 豫A99997, speed = 60.0, roadId = 01, areaId = 10, speedLimit = null}
        KeyedStream<MonitorInfo, String> dsKey = ds1.keyBy(v -> v.getAreaId());//获取areaId为key
        WindowedStream<MonitorInfo, String, TimeWindow> window =
                dsKey.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));//设置滚动窗口


        SingleOutputStreamOperator<Tuple4<String, Integer, String, String>> apply = window.apply(new WindowFunction<MonitorInfo, Tuple4<String, Integer, String, String>, String, TimeWindow>() {

            @Override
            public void apply(String s, TimeWindow window, Iterable<MonitorInfo> iterable, Collector<Tuple4<String, Integer, String, String>> collector) throws Exception {
                //将车牌放到集合中去重
                HashSet<String> cars = new HashSet<>();

                for (MonitorInfo car : iterable) {
                    cars.add(car.getCar());
                }
                String start = DateFormatUtils.format(window.getStart(), Constants.D1);
                String end = DateFormatUtils.format(window.getEnd(), Constants.D1);
                collector.collect(Tuple4.of(s, cars.size(), start, end));
            }
        });


        apply.addSink(
                JdbcSink.sink(
                        "insert into t_area_control values (null, ?, ?, ?,? )",
                        (ps, value) -> {
                            ps.setString(1, value.f0);
                            ps.setLong(2, value.f1);
                            ps.setString(3, value.f2);
                            ps.setString(4, value.f3);

                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1)
                                .withBatchIntervalMs(200)
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://hadoop10:3306/yangyulin?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .withUsername("root")
                                .withPassword("0000")
                                .build()
                ));


        env.execute();
    }
}

import util.Constants;该工具为解析时间戳工具:

代码语言:javascript
复制
package util;

public class Constants {
    public static final String  D1 = "yyyy-MM-dd HH:mm:ss";
}

import util.JSONDeserializationSchema;该工具为解析JSON为JavbaBean对象:

代码语言:javascript
复制
package util;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

public class JSONDeserializationSchema<T> implements DeserializationSchema<T> {
    private Class<T> clz;

    public JSONDeserializationSchema(Class<T> clz) {
        this.clz = clz;
    }

    @Override
    public T deserialize(byte[] message) throws IOException {
        return JSON.parseObject(new String(message),clz);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeInformation.of(clz);
    }

}
实验截图
在这里插入图片描述
在这里插入图片描述

通过Kafka发送测试数据

查询MySQL表中结果

扩展内容

解析JSON为Bean对象使用了alibaba的maven依赖工具:

代码语言:javascript
复制
    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
    </dependency>

测试案例:

代码语言:javascript
复制
package test;

import bean.MonitorInfo;
import com.alibaba.fastjson.JSON;
//json测试
public class testJson {
    public static void main(String[] args) {
        String s1 = "{\"actionTime\":1686647524,\"monitorId\":\"0005\",\"cameraId\":\"1\",\"car\":\"豫A99997\",\"speed\":60,\"roadId\":\"01\",\"areaId\":\"10\"}\n";
        MonitorInfo monitorInfo = JSON.parseObject(s1, MonitorInfo.class);
        System.out.println(monitorInfo);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求分析
  • 测试数据
  • 测试结果查询样例:
  • 建表语句
  • 需求代码
  • 实验截图
  • 扩展内容
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档