前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink用户画像(二)推荐部分

Flink用户画像(二)推荐部分

作者头像
算法之名
发布2021-11-15 17:15:06
9380
发布2021-11-15 17:15:06
举报
文章被收录于专栏:算法之名算法之名

Flink用户画像

创建用户画像偏爱品牌标签

创建一个商品品牌标签类

代码语言:javascript
复制
@Data
public class Brand {
    private Long userId;
    private Long productId;
    private String brand;
    private Long nums = 0L;
    private String groupField;
    private Long timeInfo;
}

一个BrandMap实现MapFunction接口的转换类

代码语言:javascript
复制
public class BrandMap implements MapFunction<String,Brand> {
    @Override
    public Brand map(String value) throws Exception {
        ScanOpertor scanOpertor = JSONObject.parseObject(value,ScanOpertor.class);
        Long userId = scanOpertor.getUserId();
        Long productId = scanOpertor.getProductId();
        String tablename = "product";
        String rowkey = productId + "";
        String famliyname = "info";
        String colum = "product_brand";
        //获取历史用户偏好品牌
        String brandString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
        Brand brand = new Brand();
        brand.setBrand(brandString);
        Long timeInfo = DateUntil.getCurrentHourStart(System.currentTimeMillis());
        String groupField = "brand==" + timeInfo + "==" + userId
                + "==" + brandString;
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setUserId(userId);
        brand.setProductId(productId);
        brand.setNums(1L);

        return brand;
    }
}

当我们新增一个商品

代码语言:javascript
复制
INSERT INTO `product` VALUES (1, 1, '大牛洗发水', '洗发水', 23.2000000000, 23, '2021-11-01 19:13:44', '2021-11-01 19:13:51', '内蒙古包头市', '大牛', 10.0000000000, 'RF-666')

可以在HBase中查看到该商品的品牌

代码语言:javascript
复制
scan 'product',{COLUMNS=>'info:product_brand'}
ROW                   COLUMN+CELL                                               
 1                    column=info:product_brand, timestamp=1636809177766, value=
                      \xE5\xA4\xA7\xE7\x89\x9B

当用户浏览该商品时就会留下浏览痕迹。此处是为了存储用户每小时点击过的品牌和点击次数。

一个BrandReduce实现了ReduceFunction接口的统计类

代码语言:javascript
复制
public class BrandReduce implements ReduceFunction<Brand> {
    @Override
    public Brand reduce(Brand value1, Brand value2) throws Exception {
        Long numbers1 = value1.getNums();
        String groupField = value1.getGroupField();
        Long userId = value1.getUserId();
        String brandString = value1.getBrand();
        Long timeInfo = value1.getTimeInfo();

        Long numbers2 = value2.getNums();

        Brand brand = new Brand();
        brand.setUserId(userId);
        brand.setBrand(brandString);
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setNums(numbers1 + numbers2);
        return brand;
    }
}

一个BrandSink实现了SinkFunction的存储类

代码语言:javascript
复制
public class BrandSink implements SinkFunction<Brand> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Brand value, Context context) throws Exception {
        if (value != null) {
            Long timeInfo = value.getTimeInfo();
            String brandString = value.getBrand();
            Long numbers = value.getNums();
            String tablename = "brand_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeInfo + "");
            dataMap.put("brandlabel",brandString);
            dataMap.put("numbers",numbers + "");
            Set<String> fields = new HashSet<>();
            fields.add("timeinfo");
            fields.add("numbers");
            clickUntil.saveData(tablename,dataMap,fields);
        }
    }
}

一个UserBrandSaveMap实现了MapFunction接口的转换类

代码语言:javascript
复制
public class UserBrandSaveMap implements MapFunction<Brand,Brand> {
    @Override
    public Brand map(Brand value) throws Exception {
        Long userId = value.getUserId();
        String brandString = value.getBrand();
        Long timeInfo = value.getTimeInfo();
        String tablename = "user_info";
        String rowkey = userId + "";
        String famliyname = "info";
        String colum = "brandlist";
        //获取用户偏爱的品牌历史数据
        String brandListString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
        List<Map> temp = new ArrayList<>();
        List<Map<String,String>> result = new ArrayList<>();
        if (StringUtils.isNotBlank(brandListString)) {
            temp = JSONObject.parseArray(brandListString,Map.class);
        }
        for (Map map : temp) {
            String brandStr = map.get("key").toString();
            Long value1 = Long.parseLong(map.get("value").toString());
            //如果新的商品品牌与历史商品品牌相同,偏好值+1
            if (brandString.equals(brandStr)) {
                value1++;
                map.put("value",value1 + "");
            }
            result.add(map);
        }
        //对用户偏爱的品牌进行排序,取前5各品牌
        Collections.sort(result,(o1, o2) -> {
            Long value1 = Long.parseLong(o1.get("value"));
            Long value2 = Long.parseLong(o2.get("value"));
            return value2.compareTo(value1);
        });
        if (result.size() > 5) {
            result = result.subList(0,5);
        }
        String data = JSONObject.toJSONString(result);
        HbaseUtil.putdata(tablename,rowkey,famliyname,colum,data);
        Brand brand = new Brand();
        String groupField = "brandBy==" + timeInfo + "==" + brandString;
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setBrand(brandString);
        brand.setNums(1L);

        return brand;
    }
}

此处是为了存储用户最为偏爱前5名的品牌的排名,用户每点击一次该品牌,就会使用户对该品牌的偏爱度+1,并重新排序存储。

一个UserBrandReduce实现了ReduceFunction接口的统计类

代码语言:javascript
复制
public class UserBrandReduce implements ReduceFunction<Brand> {
    @Override
    public Brand reduce(Brand value1, Brand value2) throws Exception {
        Long numbers1 = value1.getNums();
        String groupField = value1.getGroupField();
        String brandString = value1.getBrand();
        Long timeInfo = value1.getTimeInfo();

        Long numbers2 = value2.getNums();

        Brand brand = new Brand();
        brand.setBrand(brandString);
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setNums(numbers1 + numbers2);
        return brand;
    }
}

一个UserBrandSink实现了SinkFunction接口的存储类

代码语言:javascript
复制
public class UserBrandSink implements SinkFunction<Brand> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Brand value, Context context) throws Exception {
        if (value != null) {
            Long timeInfo = value.getTimeInfo();
            String brandString = value.getBrand();
            Long numbers = value.getNums();
            String tablename = "user_brand_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeInfo + "");
            dataMap.put("userbrandlabel",brandString);
            dataMap.put("numbers",numbers + "");
            Set<String> fields = new HashSet<>();
            fields.add("timeinfo");
            fields.add("numbers");
            clickUntil.saveData(tablename,dataMap,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript
复制
public class BrandAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<Brand> map = data.map(new BrandMap());
        DataStream<Brand> reduce = map.keyBy(Brand::getGroupField)
                .timeWindowAll(Time.hours(1))
                .reduce(new BrandReduce());
        reduce.addSink(new BrandSink());

        DataStream<Brand> userMap = reduce.map(new UserBrandSaveMap());
        DataStream<Brand> userReduce = userMap.keyBy(Brand::getGroupField).timeWindowAll(Time.hours(1))
                .reduce(new UserBrandReduce());
        userReduce.addSink(new UserBrandSink());

        env.execute("portrait brand type");
    }
}

推荐部分

热门商品统计

所谓热门商品就是用户购买数量最多的商品。

创建一个历史热门商品类

代码语言:javascript
复制
@Data
public class HistoryHotProduct {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String groupField;
}

一个HistoryHotProductMap实现了FlatMapFunction接口的转换类

代码语言:javascript
复制
public class HistoryHotProductMap implements FlatMapFunction<String,HistoryHotProduct> {
    @Override
    public void flatMap(String value, Collector<HistoryHotProduct> out) throws Exception {
        Order order = JSONObject.parseObject(value,Order.class);
        Integer payStatus = order.getPayStatus();
        //如果订单不是未支付状态
        if (payStatus > 0) {
            HistoryHotProduct historyHotProduct = new HistoryHotProduct();
            historyHotProduct.setProductId(order.getProductId());
            historyHotProduct.setProductTypeId(order.getProductTypeId());
            historyHotProduct.setNumbers(1L);
            String groupFiled = "HistoryHotProduct==" + order.getProductId() + "=="
                    + order.getProductTypeId();
            historyHotProduct.setGroupField(groupFiled);
            out.collect(historyHotProduct);
        }
    }
}

一个HistoryHotProductReduce实现了ReduceFunction接口的统计类

代码语言:javascript
复制
public class HistoryHotProductReduce implements ReduceFunction<HistoryHotProduct> {
    @Override
    public HistoryHotProduct reduce(HistoryHotProduct value1, HistoryHotProduct value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1 + numbers2);
        return value1;
    }
}

一个HistoryHotProductSink实现了SinkFunction接口的存储类

代码语言:javascript
复制
public class HistoryHotProductSink implements SinkFunction<HistoryHotProduct> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(HistoryHotProduct value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            data.put("productId", productId + "");
            data.put("productTypeId", productTypeId + "");
            data.put("numbers", numbers + "");
            String tablename = "history_hot_product";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript
复制
public class HistoryHotProductAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("order",
                new SimpleStringSchema(),properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<HistoryHotProduct> map = data.flatMap(new HistoryHotProductMap());
        DataStream<HistoryHotProduct> reduce = map.keyBy(HistoryHotProduct::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new HistoryHotProductReduce());
        reduce.addSink(new HistoryHotProductSink());
        
        env.execute("history hot product");
    }
}

历史评分统计

在数据库中新建评价表

代码语言:javascript
复制
DROP TABLE IF EXISTS `evaluate`;
CREATE TABLE `evaluate` (
  `id` bigint(20) NOT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  `order_id` bigint(20) DEFAULT NULL,
  `product_id` bigint(20) DEFAULT NULL,
  `product_type_id` bigint(20) DEFAULT NULL,
  `evaluate_time` datetime DEFAULT NULL,
  `score` int(255) DEFAULT NULL,
  `content` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

对应实体类

代码语言:javascript
复制
@Data
public class Evaluate {
    private Long id;
    private Long userId;
    private Long orderId;
    private Long productId;
    private Long productTypeId;
    private Date evaluateTime;
    private Integer score; //1-5 1-2差评 3-4中评 5好评
    private String content; //评价内容
}

在HBase中执行

代码语言:javascript
复制
create 'evaluate','info'

在Kafka的bin目录下执行

代码语言:javascript
复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic evaluate

一个评分标签实体类

代码语言:javascript
复制
@Data
public class Score {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String groupField;
}

一个ScoreMap实现了FlatMapFunction接口的转换类

代码语言:javascript
复制
public class ScoreMap implements FlatMapFunction<String,Score> {
    @Override
    public void flatMap(String value, Collector<Score> out) throws Exception {
        Evaluate evaluate = JSONObject.parseObject(value,Evaluate.class);
        Integer score = evaluate.getScore();
        if (score > 2) {
            Score scoreResult = new Score();
            Long productId = evaluate.getProductId();
            Long productTypeId = evaluate.getProductTypeId();
            scoreResult.setProductId(productId);
            scoreResult.setProductTypeId(productTypeId);
            scoreResult.setNumbers(1L);
            String groupField = "score==" + productId + "==" + productTypeId;
            scoreResult.setGroupField(groupField);
            out.collect(scoreResult);
        }
    }
}

一个ScoreReduce实现了ReduceFunction的统计类

代码语言:javascript
复制
public class ScoreReduce implements ReduceFunction<Score> {
    @Override
    public Score reduce(Score value1, Score value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1 + numbers2);
        return value1;
    }
}

一个ScoreSink实现了SinkFunction接口的存储类

代码语言:javascript
复制
public class ScoreSink implements SinkFunction<Score> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Score value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            data.put("productId", productId + "");
            data.put("productTypeId", productTypeId + "");
            data.put("numbers", numbers + "");
            String tablename = "score";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript
复制
public class ScoreAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("evaluate",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<Score> map = data.flatMap(new ScoreMap());
        DataStream<Score> reduct = map.keyBy(Score::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new ScoreReduce());
        reduct.addSink(new ScoreSink());
        
        env.execute("portrait score");
    }
}

近期热门商品统计

近期热门指的的最近10天的热门商品

创建一个近期热门商品标签实体类

代码语言:javascript
复制
@Data
public class RecentHotProduct {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String dateTime;
    private String groupField;
}

DateUntil增加一个静态方法

代码语言:javascript
复制
public static String transferDate(Date date,String dateFormatStr) {
    DateFormat dateFormat = new SimpleDateFormat(dateFormatStr);
    return dateFormat.format(date);
}

一个RecentHotProductMap实现了FlatMapFunction接口的转换类

代码语言:javascript
复制
public class RecentHotProductMap implements FlatMapFunction<String,RecentHotProduct> {
    @Override
    public void flatMap(String value, Collector<RecentHotProduct> out) throws Exception {
        Order order = JSONObject.parseObject(value,Order.class);
        Integer payStatus = order.getPayStatus();
        if (payStatus > 0) {
            RecentHotProduct recentHotProduct = new RecentHotProduct();
            Date date = order.getCreateTime();
            Long productId = order.getProductId();
            Long productTypeId = order.getProductTypeId();
            String dateString = DateUntil.transferDate(date,"yyyyMMdd");
            String groupField = "RecentHotProduct==" + productId + "=="
                    + productTypeId + "==" + dateString;
            recentHotProduct.setDateTime(dateString);
            recentHotProduct.setGroupField(groupField);
            recentHotProduct.setProductId(productId);
            recentHotProduct.setProductTypeId(productTypeId);
            recentHotProduct.setNumbers(1L);
            out.collect(recentHotProduct);
        }
    }
}

一个RecentHotProductReduce实现了ReduceFunction接口的统计类

代码语言:javascript
复制
public class RecentHotProductReduce implements ReduceFunction<RecentHotProduct> {
    @Override
    public RecentHotProduct reduce(RecentHotProduct value1, RecentHotProduct value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1 + numbers2);
        return value1;
    }
}

一个RecentHotProductSink实现了SinkFunction接口的存储类

代码语言:javascript
复制
public class RecentHotProductSink implements SinkFunction<RecentHotProduct> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(RecentHotProduct value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            String dateTime = value.getDateTime();
            data.put("productId", productId + "");
            data.put("productTypeId", productTypeId + "");
            data.put("numbers", numbers + "");
            data.put("dateTime",dateTime);
            String tablename = "recent_hot_product";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript
复制
public class RecentHotProductAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("order",
                new SimpleStringSchema(),properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<RecentHotProduct> map = data.flatMap(new RecentHotProductMap());
        DataStream<RecentHotProduct> reduce = map.keyBy(RecentHotProduct::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new RecentHotProductReduce());
        reduce.addSink(new RecentHotProductSink());

        env.execute("recent hot product");
    }
}

优质商品统计

创建一个优质商品标签实体类

代码语言:javascript
复制
@Data
public class HighQualityProduct {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private Integer scoreTotal;
    private String dateTime;
    private String groupField;
}

一个HighQualityProductMap实现了FlatMapFunction接口的转换类

代码语言:javascript
复制
public class HighQualityProductMap implements FlatMapFunction<String,HighQualityProduct> {
    @Override
    public void flatMap(String value, Collector<HighQualityProduct> out) throws Exception {
        Evaluate evaluate = JSONObject.parseObject(value, Evaluate.class);
        Integer score = evaluate.getScore();
        Date date = evaluate.getEvaluateTime();
        String dateTime = DateUntil.transferDate(date,"yyyyMMdd");
        HighQualityProduct highQualityProduct = new HighQualityProduct();
        Long productId = evaluate.getProductId();
        Long productTypeId = evaluate.getProductTypeId();
        highQualityProduct.setProductId(productId);
        highQualityProduct.setProductTypeId(productTypeId);
        highQualityProduct.setNumbers(1L);
        highQualityProduct.setScoreTotal(score);
        highQualityProduct.setDateTime(dateTime);
        String groupField = "HighQualityProduct==" + productId + "==" + productTypeId
                + "==" + dateTime;
        highQualityProduct.setGroupField(groupField);
        out.collect(highQualityProduct);
    }
}

一个HighQualityProductReduce实现了ReduceFunction接口的统计类

代码语言:javascript
复制
public class HighQualityProductReduce implements ReduceFunction<HighQualityProduct> {
    @Override
    public HighQualityProduct reduce(HighQualityProduct value1, HighQualityProduct value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        Integer score1 = value1.getScoreTotal();
        Integer score2 = value2.getScoreTotal();
        value1.setNumbers(numbers1 + numbers2);
        value1.setScoreTotal(score1 + score2);
        return value1;
    }
}

一个HighQualityProductSink实现了SinkFunction接口的存储类

代码语言:javascript
复制
public class HighQualityProductSink implements SinkFunction<HighQualityProduct> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
    
    @Override
    public void invoke(HighQualityProduct value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            Integer scoreTotal = value.getScoreTotal();
            String dateTime = value.getDateTime();
            data.put("productId", productId + "");
            data.put("productTypeId", productTypeId + "");
            data.put("numbers", numbers + "");
            data.put("scoreTotal",scoreTotal + "");
            data.put("dateTime",dateTime);
            String tablename = "recent_hot_product";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            fields.add("scoreTotal");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript
复制
public class HighQualityProductAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("evaluate",
                new SimpleStringSchema(),properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<HighQualityProduct> map = data.flatMap(new HighQualityProductMap());
        DataStream<HighQualityProduct> reduce = map.keyBy(HighQualityProduct::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new HighQualityProductReduce());
        reduce.addSink(new HighQualityProductSink());

        env.execute("high quality product");
    }
}

浏览次数统计

创建一个浏览次数标签实体类

代码语言:javascript
复制
@Data
public class ScanTimes {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String dateTime;
    private String groupField;
}

DateUnitl增加一个静态方法

代码语言:javascript
复制
public static Date getCurrentTime(Long visitTime) {
    return new Date(visitTime);
}

一个ScanTimesMap实现了FlatMapFunction接口的转换类

代码语言:javascript
复制
public class ScanTimesMap implements FlatMapFunction<String,ScanTimes> {
    @Override
    public void flatMap(String value, Collector<ScanTimes> out) throws Exception {
        ScanOpertor scanOpertor = JSONObject.parseObject(value,ScanOpertor.class);
        ScanTimes scanTimes = new ScanTimes();
        Long productId = scanOpertor.getProductId();
        Long productTypeId = scanOpertor.getProductTypeId();
        Long time = scanOpertor.getScanTime();
        Date date = DateUntil.getCurrentTime(time);
        String dateTime = DateUntil.transferDate(date,"yyyyMMdd");
        scanTimes.setProductId(productId);
        scanTimes.setProductTypeId(productTypeId);
        scanTimes.setDateTime(dateTime);
        String groupField = "ScanTimes==" + productId + "==" + productTypeId
                + "==" + dateTime;
        scanTimes.setGroupField(groupField);
        scanTimes.setNumbers(1L);
        out.collect(scanTimes);
    }
}

一个ScanTimesReduce实现了ReduceFunction接口的统计类

代码语言:javascript
复制
public class ScanTimesReduce implements ReduceFunction<ScanTimes> {
    @Override
    public ScanTimes reduce(ScanTimes value1, ScanTimes value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1 + numbers2);
        return value1;
    }
}

一个ScanTimesSink实现了SinkFunction接口的存储类

代码语言:javascript
复制
public class ScanTimesSink implements SinkFunction<ScanTimes> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
    
    @Override
    public void invoke(ScanTimes value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            String dateTime = value.getDateTime();
            data.put("productId", productId + "");
            data.put("productTypeId", productTypeId + "");
            data.put("numbers", numbers + "");
            data.put("dateTime",dateTime);
            String tablename = "scan_times";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript
复制
public class ScanTimesAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<ScanTimes> map = data.flatMap(new ScanTimesMap());
        DataStream<ScanTimes> reduct = map.keyBy(ScanTimes::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new ScanTimesReduce());
        reduct.addSink(new ScanTimesSink());

        env.execute("scan times");
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 推荐部分
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档