前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于 flink 的电商用户行为数据分析【6】| APP市场推广统计

基于 flink 的电商用户行为数据分析【6】| APP市场推广统计

作者头像
大数据梦想家
发布2021-01-27 16:53:48
4660
发布2021-01-27 16:53:48
举报

前言

本篇是flink 的「电商用户行为数据分析」的第6篇文章,为大家带来的是市场营销商业指标统计分析APP市场推广统计的内容,通过本期内容的学习,你同样能够学会处理一些特定场景领域下的方法。话不多说,我们直入正题!

在这里插入图片描述
在这里插入图片描述

模块创建和数据准备

继续在UserBehaviorAnalysis下新建一个maven module作为子项目,命名为MarketAnalysis

这个模块中我们没有现成的数据,所以会用自定义的测试源来产生测试数据流,或者直接用生成测试数据文件。

APP市场推广统计

随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机APP成为了更多用户访问电商网站的首选。对于电商企业来说,一般会通过各种不同的渠道对自己的APP进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP下载量)就成了市场营销的重要商业指标

首先我们考察分渠道的市场推广统计。在src/main/scala下创建AppMarketingByChannel.scala文件。由于没有现成的数据,所以我们需要自定义一个测试源来生成用户行为的事件流。

自定义测试数据源

定义一个源数据的样例类MarketingUserBehavior,再定义一个SourceFunction,用于产生用户行为源数据,命名为SimulatedEventSource

代码语言:javascript
复制
// 定义一个输入数据的样例类  保存电商用户行为的样例类
  case class MarketingUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)

  // 定义一个输出结果的样例类   保存 市场用户点击次数
  case class MarketingViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)

  // 自定义数据源
  class SimulateEventSource extends RichParallelSourceFunction[MarketingUserBehavior] {

    // 定义是否运行的标识符
    var running: Boolean = true
    // 定义渠道的集合
    val channelSet: Seq[String] = Seq("AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba")
    // 定义用户行为的集合
    val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL")
    // 定义随机数发生器
    val rand: Random.type = Random

    // 重写 run 方法
    override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {

      // 获取到 Long类型的最大值
      val maxElements: Long = Long.MaxValue
      // 设置初始值
      var count: Long = 0L

      // 随机生成所有数据
      while (running && count < maxElements) {

        // 生成一个随机数
        val id: String = UUID.randomUUID().toString
        // 获取随机行为
        val behaviorType: String = behaviorTypes(rand.nextInt(behaviorTypes.size))
        // 获取随机渠道
        val channel: String = channelSet(rand.nextInt(channelSet.size))
        // 获取到当前的系统时间
        val ts: Long = System.currentTimeMillis()
        // 输出生成的用户行为的事件流
        ctx.collect(MarketingUserBehavior(id, behaviorType, channel, ts))
        // count + 1
        count += 1
        // 设置休眠的时间
        TimeUnit.MICROSECONDS.sleep(10L)

      }
    }

    override def cancel(): Unit = running = false
  }

分渠道统计

另外定义一个窗口处理的输出结果样例类 MarketingViewCount,并自定义 ProcessWindowFunction进行处理,完整代码如下:

代码语言:javascript
复制
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

/*
 * @Author: Alice菌
 * @Date: 2020/12/7 17:32
 * @Description: 
    电商用户行为数据分析:  市场营销商业指标统计分析
    APP市场推广统计   - - > 分渠道统计
 */
object AppMarketingByChannel {

  // 定义一个输入数据的样例类  保存电商用户行为的样例类
  case class MarketingUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)

  // 定义一个输出结果的样例类   保存 市场用户点击次数
  case class MarketingViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)

  // 自定义数据源
  class SimulateEventSource extends RichParallelSourceFunction[MarketingUserBehavior] {

    // 定义是否运行的标识符
    var running: Boolean = true
    // 定义渠道的集合
    val channelSet: Seq[String] = Seq("AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba")
    // 定义用户行为的集合
    val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL")
    // 定义随机数发生器
    val rand: Random.type = Random

    // 重写 run 方法
    override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {

      // 获取到 Long类型的最大值
      val maxElements: Long = Long.MaxValue
      // 设置初始值
      var count: Long = 0L

      // 随机生成所有数据
      while (running && count < maxElements) {

        // 生成一个随机数
        val id: String = UUID.randomUUID().toString
        // 获取随机行为
        val behaviorType: String = behaviorTypes(rand.nextInt(behaviorTypes.size))
        // 获取随机渠道
        val channel: String = channelSet(rand.nextInt(channelSet.size))
        // 获取到当前的系统时间
        val ts: Long = System.currentTimeMillis()
        // 输出生成的用户行为的事件流
        ctx.collect(MarketingUserBehavior(id, behaviorType, channel, ts))
        // count + 1
        count += 1
        // 设置休眠的时间
        TimeUnit.MICROSECONDS.sleep(10L)

      }
    }

    override def cancel(): Unit = running = false
  }

  def main(args: Array[String]): Unit = {

    // 创建流处理的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并行度
    env.setParallelism(1)
    // 设置时间特征为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.addSource(new SimulateEventSource()) //  添加数据源
      .assignAscendingTimestamps(_.timestamp) // 设置水印
      .filter(_.behavior != "UNINSTALL") // 过滤掉 卸载 的数据
      .map(data => {
        ((data.channel, data.behavior), 1L)
      })
      .keyBy(_._1) //以渠道和行为作为key分组
      .timeWindow(Time.hours(1), Time.seconds(1)) // 设置滑动窗口,窗口大小为1h,滑动距离为1s
      .process(new MarketingCountByChannel) // 调用自定义处理方法
      .print() // 输出结果

    // 执行程序
    env.execute("app marketing by channel job")

  }

  // 自定义处理函数
  class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow] {

    override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingViewCount]): Unit = {

      // 根据 context 对象分别获取到 Long 类型的 窗口的开始和结束时间
      //context.window.getStart是长整形   所以new 一个 变成String类型
      val startTs: String = new Timestamp(context.window.getStart).toString
      val endTs: String = new Timestamp(context.window.getEnd).toString

      // 获取到 渠道
      val channel: String = key._1
      // 获取到 行为
      val behaviorType: String = key._2
      // 获取到 次数
      val count: Int = elements.size

      // 输出结果
      out.collect(MarketingViewCount(startTs, endTs, channel, behaviorType, count))
    }
  }
}

运行效果

部分运行结果
部分运行结果

不分渠道(总量)统计

同样我们还可以考察不分渠道的市场推广统计,这样得到的就是所有渠道推广的总量。在src/main/scala下创建AppMarketingStatistics.scala文件,代码如下:

代码语言:javascript
复制
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random
/*
 * @Author: Alice菌
 * @Date: 2020/12/10 22:45
 * @Description: 
    电商用户行为数据分析:  市场营销商业指标统计分析
    APP市场推广统计   - - > 不分渠道(总量)统计
 */
object AppMarketingStatistics {

  // 定义一个输入数据的样例类  保存电商用户行为的样例类
  case class MarketingUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)

  // 定义一个输出结果的样例类   保存 市场用户点击次数
  case class MarketingViewCount(windowStart: String, windowEnd: String, count: Long)

  def main(args: Array[String]): Unit = {

    // 定义流处理环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并行度
    env.setParallelism(1)
    // 设置时间特征为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    env.addSource(new SimulateEventSource)  // 添加数据源
      .assignAscendingTimestamps(_.timestamp)
      .filter(_.behavior != "UNINSTALL")
      .map(data => {
        ("key",1L)   // 因为这里我们不分渠道,所以我们就将key值固定,将所有数据放入到同一个组
      })
      .keyBy(_._1)
      .timeWindow(Time.hours(1),Time.seconds(1))   // 设置滑动窗口,窗口大小为1h,滑动距离为1s
      .process(new MarketingCountByChannel) // 调用自定义处理方法
      .print() // 输出结果

    // 执行程序
    env.execute("app marketing by channel job")

  }


  // 自定义数据源
  class SimulateEventSource extends RichParallelSourceFunction[MarketingUserBehavior] {

    // 定义是否运行的标识符
    var running: Boolean = true
    // 定义渠道的集合
    val channelSet: Seq[String] = Seq("AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba")
    // 定义用户行为的集合
    val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL")
    // 定义随机数发生器
    val rand: Random.type = Random

    // 重写 run 方法
    override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {

      // 获取到 Long类型的最大值
      val maxElements: Long = Long.MaxValue
      // 设置初始值
      var count: Long = 0L

      // 随机生成所有数据
      while (running && count < maxElements) {
        // 生成一个随机数
        val id: String = UUID.randomUUID().toString
        // 获取随机行为
        val behaviorType: String = behaviorTypes(rand.nextInt(behaviorTypes.size))
        // 获取随机渠道
        val channel: String = channelSet(rand.nextInt(channelSet.size))
        // 获取到当前的系统时间
        val ts: Long = System.currentTimeMillis()
        // 输出生成的用户行为的事件流
        ctx.collect(MarketingUserBehavior(id, behaviorType, channel, ts))
        // count + 1
        count += 1
        // 设置休眠的时间
        TimeUnit.MICROSECONDS.sleep(10L)

      }
    }

    override def cancel(): Unit = running = false
  }


  // 自定义处理函数
  class MarketingCountByChannel() extends ProcessWindowFunction[(String, Long), MarketingViewCount, String, TimeWindow] {

    override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[MarketingViewCount]): Unit = {

      // 根据 context 对象分别获取到 Long 类型的 窗口的开始和结束时间
      //context.window.getStart是长整形   所以new 一个 变成String类型
      val startTs: String = new Timestamp(context.window.getStart).toString
      val endTs: String = new Timestamp(context.window.getEnd).toString

      // 获取到 次数
      val count: Int = elements.size

      // 输出结果
      out.collect(MarketingViewCount(startTs, endTs,count))

    }
  }
}

运行效果

部分运行结果
部分运行结果

小结

本期关于介绍flink 电商用户行为数据分析之APP市场推广统计的文章就到这里,主要为大家介绍了在自定义数据源的基础上,如何分渠道和不分渠道计算APP市场推广的数据 。考虑到部分小伙伴对于中间的部分代码有疑问,所以我每行都写上了注释,因此详细的过程笔者就不在这里详细赘述了。看了注释仍有疑惑的小伙伴们欢迎添加我的个人微信询问,互相学习,共同进步!你知道的越多,你不知道的也越多,我是Alice,我们下一期见!

受益的朋友记得三连支持小菌!

文章持续更新,可以微信搜一搜「 猿人菌 」第一时间阅读,思维导图,大数据书籍,大数据高频面试题,海量一线大厂面经…期待您的关注!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-12-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 模块创建和数据准备
  • APP市场推广统计
  • 自定义测试数据源
  • 分渠道统计
    • 运行效果
    • 不分渠道(总量)统计
      • 运行效果
      • 小结
      相关产品与服务
      智能数据分析
      腾讯云智能数据分析 Intellectual Data Analysis 是新一代云原生大数据敏捷分析解决方案。产品具备存算分离、动态扩缩容等特点,并内置事件、转化、留存、行为路径等成熟分析模型,提供高可用、低成本的全场景敏捷分析服务,可同时满足数据分析师、数据开发工程师和业务决策人的关键分析需求,帮助企业大幅降低数据分析成本,支撑业务更高效决策。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档