前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于Spark Graphx实现ID-Mapping

基于Spark Graphx实现ID-Mapping

原创
作者头像
平常心
修改2020-10-16 16:42:02
4.7K0
修改2020-10-16 16:42:02
举报
文章被收录于专栏:个人总结系列个人总结系列

一. 背景

通常公司有产品矩阵,而每个产品都有自己的注册账号产生的用户ID。从公司全局,整合用户表,用户行为数据来看,确定不同产品的用户ID是相同一个人非常重要,这关系到用户行为分析,用户画像,用户数据挖掘等业务需求。

二. 开发实现

1. 通过采集的数据内容,对应用户数据样例如下:

{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","imsi":"imsi_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","account":"","email":"","phoneNbr":"15857635755","birthday":"","isRegistered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"imei_zs_001","mac":"mac_zs_001","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_002"}}
{"name":"zs","uid":"u_001","phone":{"mac":"mac_zs_002","imsi":"imsi_zs_002","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"zs","uid":"u_001","phone":{"imei":"imei_zs_002","mac":"mac_zs_002","imsi":"imsi_zs_002","androidId":"androidId_zs_001","deviceId":"deviceId_zs_001","uuid":"uuid_zs_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_001","mac":"mac_ls_001","imsi":"imsi_ls_001","androidId":"androidId_ls_001","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_001","uuid":"uuid_ls_001"}}
{"name":"ls","uid":"u_002","phone":{"imei":"imei_ls_002","mac":"mac_ls_001","imsi":"imsi_ls_002","androidId":"androidId_ls_002","deviceId":"deviceId_ls_002","uuid":"uuid_ls_002"}}

2. 对应方案内容

通常我们的数据内容,涵盖很多的唯一ID字段,如:用户id, 电子邮件,以及在移动APP中经常有使用的唯一设备信息内容(imei, imsi,mac, Device ID, etc...),通过这些唯一id的字段进行mapping设计,设别是否是同一个人并产生OneId。

3.spark代码实现

3.1 初始化One Id


import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object IdMapFirst {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("id-mapping")
      .master("local[1]")
      .getOrCreate()
    //将rdd变成df
    import spark.implicits._


    val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
    val data: RDD[Array[String]] = rawData.rdd.map(line => {
      //将每行数据解析成json对象
      val jsonObj = JSON.parseObject(line)

      // 从json对象中取user对象
      //      val userObj = jsonObj.getJSONObject("user")
      val uid = jsonObj.getString("uid")

      // 从user对象中取phone对象
      val phoneObj = jsonObj.getJSONObject("phone")
      val imei = phoneObj.getOrDefault("imei","").toString
      val mac = phoneObj.getOrDefault("mac","").toString
      val imsi = phoneObj.getOrDefault("imsi","").toString
      val androidId = phoneObj.getOrDefault("androidId","").toString
      val deviceId = phoneObj.getOrDefault("deviceId","").toString
      val uuid = phoneObj.getOrDefault("uuid","").toString
      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
    }
    )

    val vertices: RDD[(Long, String)] = data.flatMap(arr => {
      for (id <- arr) yield (id.hashCode.toLong, id)
    })
    vertices.foreach(ele => println(ele._1 + " : " + ele._2))

    val edges: RDD[Edge[String]] = data.flatMap(arr => {
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      .map(edge => (edge, 1)).reduceByKey(_ + _)
      .filter(tp => tp._2 > 2)
      .map(x => x._1)

    //用 点集合 和 边集合 构造一张图  使用Graph算法
    val graph = Graph(vertices,edges)


    //并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值)
    val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
    val firstIds =   res.toDF("id","guid")

    firstIds.write.parquet("file:///E://code//study//spark_ml//file//userIds_demo")

    spark.stop()
  }

}

3.2 第二次数据和初始化数据合并

import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

object IdMapSecond {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("id-mapping")
      .master("local[1]")
      .getOrCreate()

    val rawData = spark.read.textFile("file:///E://code//study//spark_ml//file//userInfo.json")
    val data: RDD[Array[String]] = rawData.rdd.map(line => {
        //将每行数据解析成json对象
        val jsonObj = JSON.parseObject(line)

        // 从json对象中取user对象
        //      val userObj = jsonObj.getJSONObject("user")
        val uid = jsonObj.getString("uid")

        // 从user对象中取phone对象
      val phoneObj = jsonObj.getJSONObject("phone")
      val imei = phoneObj.getOrDefault("imei","").toString
      val mac = phoneObj.getOrDefault("mac","").toString
      val imsi = phoneObj.getOrDefault("imsi","").toString
      val androidId = phoneObj.getOrDefault("androidId","").toString
      val deviceId = phoneObj.getOrDefault("deviceId","").toString
      val uuid = phoneObj.getOrDefault("uuid","").toString
      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
      }
    )

    val vertices: RDD[(Long, String)] = data.flatMap(arr => {
      for (id <- arr) yield (id.hashCode.toLong, id)
    })

    vertices.foreach(ele => println(ele._1 + " : " + ele._2))

    val edges: RDD[Edge[String]] = data.flatMap(arr => {
      //用双重for循环的方法让数组中所有的两两组合成边
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      .map(edge => (edge, 1)).reduceByKey(_ + _)
      //过滤将重复次数<5(经验阈值)的边去掉,
      .filter(tp => tp._2 > 2)
      .map(x => x._1)

    //从初次的guid读取
    val firstIdmap = spark.read.parquet("file:///E://code//study//spark_ml//file//userIds_demo2")
    val firstVertices = firstIdmap.rdd.map(
      {
        case Row(id_hashcode: VertexId, guid: VertexId) =>
          (id_hashcode, "")
      }
    )

    val firstEdges = firstIdmap.rdd.map(row => {
        val id_hashcode = row.getAs[VertexId]("id")
        val guid = row.getAs[VertexId]("guid")
        Edge(id_hashcode,guid,"")
      }
    )

    // 通过vertex, edge沟通graph
    val graph = Graph(vertices.union(firstVertices),edges.union(firstEdges))

    //result:  VertexRDD[VertexId] => rdd(点id-long, 组中最小值)
    val result:  VertexRDD[VertexId] = graph.connectedComponents().vertices


    val idMap = firstIdmap.rdd.map(row => {
      val id_hashcode = row.getAs[VertexId]("id")
      val guid = row.getAs[VertexId]("guid")
      (id_hashcode,guid)
    }
    ).collectAsMap()

    val bcMap = spark.sparkContext.broadcast(idMap)

    import spark.implicits._

    val todayIdmap = result.map(tup => (tup._2,tup._1))
      .groupByKey()
      .mapPartitions( iter => {
        iter.map(tup => {
          val idmapMap = bcMap.value

          var todayGuid = tup._1
          val ids = tup._2
          //遍历id,挨个映射查找
          var idFind = false
          for (id <- ids if !idFind) {
            val getGuid = idmapMap.get(id)
            if (getGuid.isDefined) {
              todayGuid = getGuid.get
              idFind = true
            }
          }
          (todayGuid, ids)
        })
      })
      .flatMap(tup => {
        val ids = tup._2
        val guid = tup._1
        for(ele <- ids) yield (ele, guid)
      }).toDF("id", "guid")

    todayIdmap.show()

    todayIdmap.createOrReplaceTempView("id_guid");


    val data2 = data.flatMap( arr => {
      for(id <- arr) yield (id.hashCode.toLong, id)
    }).toDF("id", "str_id")

    data2.createOrReplaceTempView("id_original")

    val output = spark.sql("""select
                             |    t1.id as id_hashcode,
                             |    t2.str_id as id,
                             |    t1.guid as guid
                             |from id_guid t1
                             |left join id_original t2 on t1.id = t2.id
                             |group by t1.id, t2.str_id, t1.guid
                             |order by guid""".stripMargin
    )
    output.show()

  }

}

输出展示:

+-----------+----------------+-----------+
|id_hashcode|              id|       guid|
+-----------+----------------+-----------+
|-1381665248|androidId_zs_001|-1908595409|
|  110929767|           u_001|-1908595409|
|-1908595409|     uuid_zs_001|-1908595409|
|-1018465903|     imsi_zs_002|-1908595409|
|-1753513447| deviceId_zs_001|-1908595409|
| -714652388|      mac_zs_002|-1908595409|
|-1908595408|     uuid_zs_002|-1908595408|
|-1884715312|     imei_ls_001|-1884715312|
|-1884715311|     imei_ls_002|-1884715311|
|-1782473362|androidId_ls_001|-1782473362|
| 2140645735| deviceId_ls_001|-1782473361|
|-1419274017|     imsi_ls_002|-1782473361|
|  110929768|           u_002|-1782473361|
| 1985563773|     uuid_ls_001|-1782473361|
|-1115460503|      mac_ls_001|-1782473361|
|-1782473361|androidId_ls_002|-1782473361|
|-1483907198|     imei_zs_001|-1483907198|
|-1483907197|     imei_zs_002|-1483907197|
|-1419274018|     imsi_ls_001|-1419274018|
|-1381665247|androidId_zs_002|-1381665247|
+-----------+----------------+-----------+

当然guid也可以采用MD5/UUID的方式输出。

    val output = spark.sql("""select
                |    t1.id as id_hashcode,
                |    t2.str_id as id,
                |    t1.guid as guid
                |from id_guid t1
                |left join id_original t2 on t1.id = t2.id
                |group by t1.id, t2.str_id, t1.guid
                |order by guid desc""".stripMargin
    )

    val info: RDD[(String, String, String)] = output.rdd.map(
      row => {
        val id = row.getAs[String]("id")
        val oneId = row.getAs[Long]("guid").toString
        val oneId_Md5 = DigestUtils.md5Hex(oneId)
        (id, oneId,oneId_Md5)
      }
    )
    
   <dependency>
      <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.13</version>
    </dependency>

    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.6</version>
    </dependency>

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
智能数据分析
腾讯云智能数据分析 Intellectual Data Analysis 是新一代云原生大数据敏捷分析解决方案。产品具备存算分离、动态扩缩容等特点,并内置事件、转化、留存、行为路径等成熟分析模型,提供高可用、低成本的全场景敏捷分析服务,可同时满足数据分析师、数据开发工程师和业务决策人的关键分析需求,帮助企业大幅降低数据分析成本,支撑业务更高效决策。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档