前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Flink(四十一):​​​​​​​Flink实现订单自动好评

2021年大数据Flink(四十一):​​​​​​​Flink实现订单自动好评

作者头像
Lansonli
发布2021-10-11 14:41:59
6840
发布2021-10-11 14:41:59
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

Flink实现订单自动好评

需求

在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评,我们今天主要使用Flink的定时器来简单实现这一功能。

数据

自定义source模拟生成一些订单数据. 在这里,我们生了一个最简单的二元组Tuple3,包含用户id,订单id和订单完成时间三个字段.

代码语言:javascript
复制
/**
 * 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
 */
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
    private boolean flag = true;
    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
        Random random = new Random();
        while (flag) {
            String userId = random.nextInt(5) + "";
            String orderId = UUID.randomUUID().toString();
            long currentTimeMillis = System.currentTimeMillis();
            ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
            Thread.sleep(500);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

​​​​​​​编码步骤

1.env

2.source

3.transformation

设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间

long interval = 5000L;

分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评

dataStream.keyBy(0).process(new TimerProcessFuntion(interval));

3.1定义MapState类型的状态,key是订单号,value是订单完成时间

3.2创建MapState

MapStateDescriptor<String, Long> mapStateDesc =

            new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);

            mapState = getRuntimeContext().getMapState(mapStateDesc);

3.3注册定时器

mapState.put(value.f0, value.f1);

ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);

3.4定时器被触发时执行并输出结果

4.sink

5.execute

参考代码

代码语言:javascript
复制
package cn.lanson.action;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

/**
 * Author lanson
 * Desc
 */
public class OrderAutomaticFavorableComments {
    public static void main(String[] args) throws Exception {
        //TODO 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //TODO 2.source
        //Tuple3<用户id,订单id,订单生成时间>
        DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());

        //TODO 3.transformation
        //设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间
        long interval = 5000L;//5s
        //分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
        orderDS.keyBy(t -> t.f0)
                .process(new TimerProcessFunction(interval));

        //TODO 4.sink

        //TODO 5.execute
        env.execute();
    }

    /**
     * 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单生成时间>
     */
    public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
        private boolean flag = true;

        @Override
        public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
            Random random = new Random();
            while (flag) {
                String userId = random.nextInt(5) + "";
                String orderId = UUID.randomUUID().toString();
                long currentTimeMillis = System.currentTimeMillis();
                ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }

    /**
     * 自定义ProcessFunction完成订单自动好评
     * 进来一条数据应该在interval时间后进行判断该订单是否超时是否需要自动好评
     * abstract class KeyedProcessFunction<K, I, O>
     */
    private static class TimerProcessFunction extends KeyedProcessFunction<String, Tuple3<String, String, Long>, Object> {
        private long interval;//订单超时时间 传进来的是5000ms/5s
        public TimerProcessFunction(long interval) {
            this.interval = interval;
        }

        //-0.准备一个State来存储订单id和订单生成时间
        private MapState<String, Long> mapState = null;

        //-1.初始化
        @Override
        public void open(Configuration parameters) throws Exception {
            MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);
            mapState = getRuntimeContext().getMapState(mapStateDescriptor);
        }

        //-2.处理每一条数据并存入状态并注册定时器
        @Override
        public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
            //Tuple3<用户id,订单id, 订单生成时间> value里面是当前进来的数据里面有订单生成时间
            //把订单数据保存到状态中
            mapState.put(value.f1, value.f2);//xxx,2020-11-11 00:00:00 || xx,2020-11-11 00:00:01
            //该订单在value.f2 + interval时过期/到期,这时如果没有评价的话需要系统给与默认好评
            //注册一个定时器在value.f2 + interval时检查是否需要默认好评
            ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);//2020-11-11 00:00:05  || 2020-11-11 00:00:06
        }

        //-3.执行定时任务
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
            //检查历史订单数据(在状态中存储着)
            //遍历取出状态中的订单数据
            Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Long> map = iterator.next();
                String orderId = map.getKey();
                Long orderTime = map.getValue();
                //先判断是否好评--实际中应该去调用订单评价系统看是否好评了,我们这里写个方法模拟一下
                if (!isFavorable(orderId)) {//该订单没有给好评
                    //判断是否超时--不用考虑进来的数据是否过期,统一判断是否超时更保险!
                    if (System.currentTimeMillis() - orderTime >= interval) {
                        System.out.println("orderId:" + orderId + "该订单已经超时未评价,系统自动给与好评!....");
                        //移除状态中的数据,避免后续重复判断
                        iterator.remove();
                        mapState.remove(orderId);
                    }
                } else {
                    System.out.println("orderId:" + orderId + "该订单已经评价....");
                    //移除状态中的数据,避免后续重复判断
                    iterator.remove();
                    mapState.remove(orderId);
                }
            }
        }

        //自定义一个方法模拟订单系统返回该订单是否已经好评
        public boolean isFavorable(String orderId) {
            return orderId.hashCode() % 2 == 0;
        }
    }
}

​​​​​​​参考效果

实现代码:

代码语言:javascript
复制
package cn.lanson.action;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

/**
 * Author lanson
 * Desc
 * 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评(或者下单之后在一定时间内没有付款, 就触发站内信/短信提醒/取消...)
 * 我们今天主要使用Flink的定时器来简单实现这一功能。
 * 注意: 这个需求不使用大数据的技术,就是用Web的定时器也可以做
 * 课后可以用你熟悉的编程语言/工具/框架去实现
 */
public class OrderAutomaticFavorite {
    public static void main(String[] args) throws Exception {
        //TODO 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //TODO 2.source
        //Tuple3<用户id,订单id,订单生成时间>
        DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());
        //TODO 3.transformation
        //设置经过interval毫秒用户未对订单做出评价就自动给予好评,为了方便测试,设置5000ms/5s(实际中可以长一点)
        long interval = 5000L;

        //实现这个功能原本不需要分组,但是为了后面使用keyedState状态,所以这里分下组
        orderDS.keyBy(t->t.f0)
                .process(new MyKeyedProcessFunction(interval));
        //TODO 4.sink
        //TODO 5.execute
        env.execute();
    }

    /**
     * public abstract class KeyedProcessFunction<K, I, O>
     */

    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String,Tuple3<String, String, Long>,Object> {
        //准备一个MapState存储订单信息<订单号,订单时间>
        private MapState<String,Long> mapState = null;

        private long interval = 0L;
        public MyKeyedProcessFunction(long interval) {
            this.interval = interval;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
           //创建状态描述器
            MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);
            mapState = getRuntimeContext().getMapState(descriptor);
        }

        //处理进来的每个元素/订单,然后注册定时器,到时候判断是否进行了好评
        @Override
        public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
            //把订单信息存入状态中方便后续使用
            mapState.put(value.f1,value.f2);

            //注册定时器在interval时间后执行/在value.f2 + interval时间时执行
            ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);
        }

        //实现定时器执行方法
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
            //定时器触发的时候需要检查状态中的订单是否已经好评了
            Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
            while (iterator.hasNext()){
                Map.Entry<String, Long> entry = iterator.next();
                String orderId = entry.getKey();
                Long orderTime = entry.getValue();
                //判断该订单是否已经评价--实际中需要调用外部订单系统的接口,我们自己简单起见直接调用模拟的方法
                if(isEvaluate(orderId)){
                    //已经评价过了
                    System.out.println("该订单:"+orderId+"用户已评价");
                    //移除当前订单
                    iterator.remove();//迭代器可以直接移除元素
                    //保险一定状态中也移除
                    mapState.remove(orderId);
                }else{
                    //没有评价
                    //注意:一个key(用户)有很多订单,有的可能超时,有的可能还未超时
                    //所以需要判断是否超时
                    if(System.currentTimeMillis() - orderTime >= interval){
                        //超时且未评价,需要系统给予自动好评
                        System.out.println("该订单:"+orderId+"已超时未评价,系统给予自动好评");
                        //移除当前订单
                        iterator.remove();//迭代器可以直接移除元素
                        //保险一定状态中也移除
                        mapState.remove(orderId);

                    }/*else{
                        //未超时,不用管

                    }*/
                }
            }
        }

        //模拟订单系统,传入订单id,返回该订单是否已经评价
        public boolean isEvaluate(String orderId){
            //下面这行代码会随机返回订单是否已经评价
            return new Random().nextInt(10) % 2 == 0;
        }
    }

    /**
     * 自定义source实时产生订单数据Tuple3<用户id,订单id,订单生成时间>
     */
    public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
        private boolean flag = true;
        @Override
        public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
            Random random = new Random();
            while (flag) {
                String userId = random.nextInt(5) + "";
                String orderId = UUID.randomUUID().toString();
                long currentTimeMillis = System.currentTimeMillis();
                ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
                Thread.sleep(500);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-05-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink实现订单自动好评
    • 需求
      • 数据
        • ​​​​​​​编码步骤
          • 1.env
          • 2.source
          • 3.transformation
          • 4.sink
          • 5.execute
        • 参考代码
          • ​​​​​​​参考效果
            • 实现代码:
            相关产品与服务
            大数据
            全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档