欢迎您关注《大数据成神之路》
某广告公司在网页上投递动态图片广告,广告的展现形式是根据热点图片动态生成的。为了收入的最大化,需要统计每个广告的点击数来决定哪些广告可以投放的更长时间,哪些需要及时更换。大部分的广告生命周期很短,实时获取广告的点击数可以让我们快速确定哪些广告对业务是关键的。所以我们理想的解决方案是有流处理数据的能力,可以统计所有广告的点击量以及统计实时的点击量。
来看下我们业务数据链路
广告点击数据通过手机或者电脑的网页传递到“数据提取”,提取后的数据经过“数据处理”计算实时的点击数,最后存储到数据库,使用“数据查询”用于统计分析,统计每个广告的点击总数。 根据我们的数据特点,整个数据链路的数据输入输出如下:
针对每个点击事件我们使用asset id以及cost 两个字段来表示一个广告信息,例如:
asset [asset id] cost [actual cost] asset aksh1hf98qwdst9q7 cost 39 asset aksh1hf98qwdst9q8 cost 19
经过上图中步骤2:数据处理后,我们把结果集存储到一个数据表中,数据表可以用于上图步骤3使用Sql查询,例如:
select asset, count from clicks order by count desc asset count ----------------- ----- aksh1hf98qwdst9q7 2392 aksh1hf98qwdst9q8 2010 aksh1hf98qwdst9q6 1938
基于以上诉求选择StructuredStreaming + Redis Stream作为解决方案。先介绍下方案中涉及到的组件。
现在让我们看下如何使用StructuredStreaming + Redis Stream
通过上图可以看到点击数据首先存储到Redis Stream,然后通过StructuredStreaming消费数据、处理聚合数据,再把处理的结果入库到Redis,最后通过Spark Sql查询Redis进行统计分析。下面分别看下每个步骤:
Redis Stream是Redis内置的数据结构,具备每秒百万级别的读写能力,另外存储的数据可以根据时间自动排序。Spark-Redis连接器支持使用Redis Stream作为数据源,非常适用这个场景,把Redis Stream数据对接到Spark 引擎。
Spark的StructuredStreaming 非常适合此场景的数据处理部分,Spark-Redis连接器可以获取Redis Stream的数据转换成Spark的DataFrames。在StructuredStreaming处理流数据的过程中,可以对微批次数据或者整体数据进行查询。数据的处理结果可以通过自定义的“writer”输出到不同的目的地,本场景中我们直接把数据输出到Redis的Hash数据结构。
Spark-Redis连接器可以把Redis的数据结构映射成Spark的DataFrames,然后我们把DataFrames创建成一个临时表,表的字段映射Redis的Hash数据结构。借助Redis的亚毫米级的延迟,使用Spark-SQL进行实时的数据查询。
通过下面实例介绍下开发的步骤
Redis Streams 是一个append-only的数据结构。部署Redis Streams后使用redis-cli向Redis发送数据。 redis-cli使用方法可参考redis-cli连接。下面的命令是Redis向Stream clicks发送数据。
XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29
在StructuredStreaming中把数据处理步骤分成3个子步骤。
在Spark中读取Redis Stream数据需要确定如何去连接Redis,以及Redis Stream的schema信息。这里使用Spark-Redis连接器,需要创建一个SparkSession并带上Redis的连接信息。
val spark = SparkSession .builder() .appName("StructuredStreaming on Redis") .config("spark.redis.host", redisHost) .config("spark.redis.port", redisPort) .config("spark.redis.auth", redisPassword) .getOrCreate()
在Spark中构建schema,我们给流数据命名为“clicks”,并且需要设置参数“stream.kes”的值为“clicks”。由于Redis Stream中的数据包含两个字段:“asset”和“cost”,所以我们要创建StructType映射这两个字段。
val clicks = spark .readStream .format("redis") .option("stream.keys", redisTableName) .schema(StructType(Array( StructField("asset", StringType), StructField("cost", LongType) ))) .load()
在这里统计下每个asset的点击次数,可以创建一个DataFrames根据asset汇聚数据。
val bypass = clicks.groupBy("asset").count()
最后一个步骤启动StructuredStreaming。
val query = bypass .writeStream .outputMode("update") .foreach(clickWriter) .start()
我们通过自定义的ClickForeachWriter向Redis写数据。ClickForeachWriter继承自FroeachWriter,使用Redis的Java客户端Jedis连接到Redis。
class ClickForeachWriter(redisHost: String, redisPort: String, redisPassword: String) extends ForeachWriter[Row] { var jedis: Jedis = _ def connect() = { val shardInfo: JedisShardInfo = new JedisShardInfo(redisHost, redisPort.toInt) shardInfo.setPassword(redisPassword) jedis = new Jedis(shardInfo) } override def open(partitionId: Long, version: Long): Boolean = { true } override def process(value: Row): Unit = { val asset = value.getString(0) val count = value.getLong(1) if (jedis == null) { connect() } jedis.hset("click:" + asset, "asset", asset) jedis.hset("click:" + asset, "count", count.toString) jedis.expire("click:" + asset, 300) } override def close(errorOrNull: Throwable): Unit = {} }
程序完成打包后,可以通过Spark控制台提交任务,运行Spark StructuredStreaming任务。
--class com.aliyun.spark.redis.StructuredStremingWithRedisStream --jars /spark_on_redis/ali-spark-redis-2.3.1-SNAPSHOT_2.3.2-1.0-SNAPSHOT.jar,/spark_on_redis/commons-pool2-2.0.jar,/spark_on_redis/jedis-3.0.0-20181113.105826-9.jar --driver-memory 1G --driver-cores 1 --executor-cores 1 --executor-memory 2G --num-executors 1 --name spark_on_polardb /spark_on_redis/structuredstreaming-0.0.1-SNAPSHOT.jar xxx1 6379 xxx2 clicks
参数说明:
数据查询使用Spark-SQL创建表读取Redis Hash数据库。这里使用Spark控制台的“交互式查询”,输入如下语句:
CREATE TABLE IF NOT EXISTS clicks(asset STRING, count INT) USING org.apache.spark.sql.redis OPTIONS ( 'host' 'xxx1', 'port' '6379', 'auth' 'xxx2', 'table' 'click' )
参数说明:
然后运行查询语句:
select * from clicks;
例如下图:
Spark-SQL通过Spark-Redis连接器直接查询Redis数据,统计了广告的点击数。
本文分享自微信公众号 - 大数据技术与架构(import_bigdata)
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间:2019-06-06
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句