前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【基于Flink的城市交通实时监控平台】需求二:卡口的实时拥堵情况-滑动窗口

【基于Flink的城市交通实时监控平台】需求二:卡口的实时拥堵情况-滑动窗口

作者头像
火之高兴
发布2024-07-25 15:45:49
490
发布2024-07-25 15:45:49
举报
文章被收录于专栏:大数据应用技术
需求分析

卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速和通过的车辆的数量,为了统计实时的平均车速,我设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。

代码语言:javascript
复制
平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量
滑动窗口: 窗口长度是为5分钟,滑动步长为1分钟(为了测试方便,设置为10秒)
MySQL建表语句
代码语言:javascript
复制
DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `start_time` bigint(20) DEFAULT NULL,
 `end_time` bigint(20) DEFAULT NULL,
 `monitor_id` varchar(255) DEFAULT NULL,
 `avg_speed` double DEFAULT NULL,
 `car_count` int(11) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
项目代码
代码语言:javascript
复制
package car;

import bean.MonitorInfo;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

public class JamWindow {
//day110612\SourceMySQL.java是需求一
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStreamSource<String> dss =
                env.socketTextStream("hadoop10", 9999);

        SingleOutputStreamOperator<MonitorInfo> carDss = dss.map(new MapFunction<String, MonitorInfo>() {
            @Override
            public MonitorInfo map(String s) throws Exception {
                String[] arr = s.split(",");
                return new MonitorInfo(
                        Long.parseLong(arr[0]),
                        arr[1],
                        arr[2],
                        arr[3],
                        Double.parseDouble(arr[4]), arr[5], arr[6]);
            }
        });

        //获取路口的ID,因为要按照路口分组
        KeyedStream<MonitorInfo, String> keyedDS = carDss.keyBy(c -> c.getMonitorId());
        //1> MonitorInfo{actionTime = 1686647521, monitorId = 0002, cameraId = 1, car = 豫A39396, speed = 90.0, roadId = 01, areaId = 20, speedLimit = null}

        // 路口通过汽车的数量--基于时间的滚动窗口  10miao
        SingleOutputStreamOperator<Tuple2<String, String>> avg = keyedDS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))  // 设置滚动窗口长度为10秒
                .apply(new WindowFunction<MonitorInfo, Tuple2<String, String>, String, TimeWindow>() { //IN OUT KEY W
                    @Override
                    public void apply(String key,
                                      TimeWindow window,
                                      Iterable<MonitorInfo> input,
                                      Collector<Tuple2<String, String>> out)
                            throws Exception {
                        //累加窗口内通过车辆的车速之和和计算路口通过汽车的数量。
                        double speedSum = 0.0;
                        int count = 0;

                        for (MonitorInfo info : input) {
                            speedSum += info.getSpeed(); //速度累加
                            count++;                     //计数器
                        }
                        double averageSpeed = count > 0 ? speedSum / count : 0.0;
                        //如果 count 大于 0,则计算速度总和(speedSum)除以 count 得到平均速度;否则,将平均速度设置为 0.0。

                        String start = new Timestamp(window.getStart()).toString();
                        String end = new Timestamp(window.getEnd()).toString();

                        out.collect(Tuple2.of(key, averageSpeed + "," + count + "," + start + "," + end));
                    }
                });


        avg.print();
        keyedDS.print();
        //TODO 5.execute-执行

        avg.addSink(
                JdbcSink.sink(
                        "insert into t_average_speed (id,start_time,end_time,monitor_id,avg_speed,car_count) values (null, ?, ?, ?, ?, ?)",
                        (preparedStatement, tuple) -> {
                            // 解析二元组的字段
                            String key = tuple.f0;
                            String[] fields = tuple.f1.split(",");
                            double averageSpeed = Double.parseDouble(fields[0]);
                            int carCount = Integer.parseInt(fields[1]);
                            String startTime = fields[2];
                            String endTime = fields[3];

                            // 设置参数
                            preparedStatement.setString(1, startTime);
                            preparedStatement.setString(2, endTime);
                            preparedStatement.setString(3, key);
                            preparedStatement.setDouble(4, averageSpeed);
                            preparedStatement.setInt(5, carCount);
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1) //设置批处理大小,表示每次向数据库提交的批处理数据量。在这里设置为 1,表示每次只提交一条数据。
                                .withBatchIntervalMs(200)//设置批处理间隔时间,表示两次提交批处理之间的时间间隔。在这里设置为 200 毫秒,表示每隔 200 毫秒提交一次批处理。
                                .build(),
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://hadoop10:3306/yangyulin?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .withUsername("root")
                                .withPassword("0000")
                                .build()
                )
        );


        env.execute();

    }
}
代码语言:javascript
复制
这段代码实现了对车辆通过卡口的实时拥堵情况进行统计和存储。

代码解释如下:

导入所需的类和包。

创建StreamExecutionEnvironment实例,并设置运行模式为AUTOMATIC。

创建一个socketTextStream,从指定的主机和端口接收实时数据流。

使用map函数将接收到的文本数据转换为MonitorInfo对象。

使用keyBy操作将数据流按照卡口ID进行分区。

创建一个滚动窗口,窗口长度为10秒,对每个窗口内的数据进行处理。

在窗口函数apply中,累加窗口内通过车辆的车速之和和计算路口通过的车辆数量。

计算窗口内平均车速,如果有通过的车辆,则计算速度总和除以车辆数量得到平均速度;否则,平均速度设置为0.0。

获取窗口的起始时间和结束时间,并将结果以元组形式输出。

使用print()函数打印计算得到的平均车速。

使用print()函数打印分区后的数据流。

将结果写入到MySQL数据库中。

使用JdbcSink.sink()方法创建JDBC sink。
设置插入数据的SQL语句,使用占位符表示待填充的参数。
使用lambda表达式定义参数填充逻辑,将元组中的字段值设置到预编译语句中的对应位置。
使用JdbcExecutionOptions设置批处理大小和间隔时间。
使用JdbcConnectionOptions设置数据库连接信息。
将JDBC sink添加到数据流中,用于将数据写入MySQL数据库。
调用env.execute()方法启动Flink程序的执行。

总体来说,该代码通过对车辆数据流的处理,统计每个卡口窗口内的平均车速和通过的车辆数量,并将结果写入到MySQL数据库中。同时,通过print()函数打印中间结果,方便调试和观察程序执行过程。
测试流程

任意从端口发送对应格式的数据即可

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-07-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求分析
  • MySQL建表语句
  • 项目代码
  • 测试流程
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档