00:00
好,同学们,我们刚才呢,把需求一广告黑名单的流程我们给大家梳理了一下,并且通过画图的方式把里面的每个步骤呢给大家写出来了啊,希望大家能够搞明白啊,那我接下来呢,就顺着这个流程把代码呢咱们完善一下,那首先我们这个REQUIREMENT1就在这儿了啊,那我们其实在之前呢,是为了给大家演示这个数据的获取啊,所以我这里呢,咱们准备创建一个新的吧,好不好,咱们写上啊REQUIREMENT1我们叫black list啊,一个黑名单咱们来,呃,咱们这里呢,拷贝拷贝以后放过来,那我们来看一看,那首先我们的第一个大家可以看到我们现在这边呢,其实就已经获取了咱们从卡夫卡读取的那个广告点击的数据,对不对,但是啊,你的这个数据是一行一行的,对于我来讲呢,其实可能啊,呃并不是很好,所以呢,我们咱们这样啊,咱们来点点一下叫VAR,或者呢,咱们这样吧,啊,咱们来咱们创建一个。
01:00
类,然后呢,把我们的数据转换成类来操作,可能会更加简单一些啊,咱们就叫case class,叫广告的date,就是广告的点击啊,那么这里面呢,首先第一个就是TS啊,咱们的string,然后还有我们area区域,还有呢,我们的city啊,咱们的城市,还有就是用户了,嗯,咱们的还有一个就是广告,咱们来这个string就可以了啊好,那这个呢,我们写上叫广告。广告,我们的广告点击数据好,然后把我们的每一个呀,给它稍微的变一变啊,来我们这里咱们写上写个花括号吧,啊写个花括号,嗯,其实不写也没事啊,咱们写个小括号吧,嗯,也可以啊,来咱们就是每一个我们的卡夫卡的数据,咱们的date,嗯,给他放过来,咱们在这里呢,放到这边,然后把它的value取到啊。我们这里呢,给它拿过来,咱们就写上啊,咱们叫做value,或者我这边写上卡夫卡呢,咱们叫卡夫卡的那个date呢,然后咱们从卡夫卡的date当中取我们的数据,对吧,那这个数据呢,就被取到了,取到了以后给它分解一下,点split,我们是用空格给它隔开的,所以呢,我们这里应该有多条,那么有多条的情况下呢,咱们这里呢,就直接来了,写上一个零啊,写个逗号,然后这个逗号,然后拷贝一下,嗯,来1234 OK,那么写个四啊,把这个去掉,写个四,然后这边写上一个三,然后写上一个二,然后写上一个一,这样的话,我们的数据就有了啊,咱们就有了,有了以后在这里我们就写上了啊,这是我们的那个数据,咱们叫做嗯,就叫啊咱们的date吧,嗯,这个倒无所谓啊。
02:49
好,我们的date等于它就可以了,那这么写完以后,那我们接下来就得按照咱们的步骤一块来分析一下了,首先我们的第一步是要干嘛呢?你把你的数据拿到以后,我们要判断咱们的用户是不是在黑名单当中,那你得把每条数据的渠道啊,那么这个时候啊,记住啊,这个判断用户在不在黑名单当中,你要记住咱们有三个箭头可以指向黑名单,而这两个箭头是要拉入到黑名单,那就意味着黑名单的数据不是固定不变的,它应该是动态发生变化的,而且是在流程的后面来发生变化,对不对?所以在这种情况下,它应该是一种动态获取,也就是说是周期性的获取,而不是说就取一次,如果按照我们现在的逻辑啊,可能你取一次判断一下,那可能就有问题了,所以我们这里的黑名单应该是周期性的获取啊,所以我们说一下咱们叫周期性啊获取咱们叫黑名单。
03:50
数据,然后呢,我们再写上todo啊,判断我们的,嗯,点击用户他是否在我们黑名单中啊,黑名单中那如果要在的话,那没说的数据我就不要了,是吧,不用做统计了,但恰恰呢,如果你不在黑名单当中啊来嗯。
04:08
如果来咱们说一下来,如果我们的用户他不在我们的黑名单中,那么我们要进行啊,我们的统计啊,把它点击的那个数量给他统计出来,记住这回点击啊,那么进行统计数量,那这个数量呢,其实它并不是一天的,记住啊,它是我们的每一个我们的采集周期的,但是我们要真正做的可不是说每个采集周期,而是一天的,所以这个呢,它会稍微有点问题,但没关系,我们后面还有别的步骤,这是我们的第一个逻辑啊,就是我们要统计它的数量。好,那么第一段逻辑应该是这个样子的,那么第二段逻辑是什么呢?就是你统计完数量以后,由于啊可能存在一些机器程序,它会在短期内会有大量的数据过来,所以这个时候你的统计数量是有可能超过阈值的,所以啊,如果来我们说一下,如果我们的统计数量它超过我们的点击阈值,那么将我们的用户拉入啊到我们的黑名单,对吧,那也就是这种情况,但是呢,如果它没有超过呢?哎,来如过,如果没有超过我们的阈值啊,超过阈值,那么我们需要啊,将需要将我们当天的广告啊当天。
05:34
当天的那个广告,我们的点击进行什么呢?聚合啊的点击数量进行更新啊更新,那么你更新之后,你就是你当前没有超过,但你更新之后是不是有可能超过呀,所以来我们来判断判断,我们更新后啊,咱们叫更新后的那个点击数据,点击数据啊,是否我们叫超过阈值,那么如果超过啊,如果超过那么啊将我们用户拉入到黑名单,诶这就可以了啊,所以啊,它是这样的一个步骤,所以呢,我们用文字的方式,你会发现它分成两个大的步骤,那么第一个大的步骤呢,是把黑名单取出来,而下面这个步骤呢,是加入黑名单,这是两个截然不同的概念啊,所以咱们分着来写。
06:23
好,那我们首先写第一个,那第一个当中把这个咱们拷贝过来,拷贝以后点记住啊,它有一个周期性的概念。这我们之前讲过了,咱们有一个方法大家还记得吗?叫做点叫transform啊,这个可以拿到我们的RDD,而且是周期性的拿到咱们的RDD,哎,就在这儿了,所以啊,你周期性的获取黑名单的数据,那我这里就写上了啊来大家看,我现在把这句话我就放到这个位置啊,放到这个位置,嗯,好了,我们要周期性的来取我们的黑名单的数据,对吧,那怎么取啊?
07:01
那肯定是要读取买circle,可是你买circle当中有没有对应的表呢?所以我们打开啊,咱们打开我们这里面有一个叫SPA circle,那我觉得咱们在这里再创建一个吧,来我们再创建一个咱们叫做Spark,叫做什么呢?Streaming啊,然后这个我们来选择UTF杠八点击完确定以后,按照我们之前的这个理解呢,我们要创建咱们的表,这里面就会有一个咱们的black list啊,里面有个user ID,这是我们的黑名单,所以呢,我们拷贝一下,呃,拷贝以后在咱们的这个位置呢,我点一下啊,来把这个放过来给它执行一下,执行以以后黑名单就有了。还有一个我要存放单日各用户点击广告的次数,那么这里面会有一个日期,会有一个用户的ID,还会有一个广告ID,还会有一个count啊,所以咱们这里呢,咱们拷贝一下来拷贝拷贝之后来把这个。
08:01
拿过来,拿过来以后两张表就创建出来了,那我接下来就应该读取黑名单啊,所以呢,我们读取黑名单怎么读取啊,那我们肯定通过JDBC嘛,所以来啊,写上通过我们的JDBC啊,来周期性获取黑名单的数据,那么你要周期性的获取我们黑名单的数据的话,那么这个BC该怎么做呢?其实我们在之前给大家讲的时候啊,我们这里呢,其实会有JDBC的那些操作,对吧,咱们讲过在我们SPA当中,我们这里会有C,比方说一些连接配置,这些东西咱们都试过,对不对,没有问题,但是呢,我们每一回都这么写啊,他太麻烦,所以我们怎么办呢?咱们这样来做啊,同学们看,嗯。我们在这里面,咱们会有一个,咱们找一下会有一个工具类,我们事先的给大家准备了一下,叫jdbc u,呃,所以呢,我们在这个地方,我想想啊,咱们在这咱们创建一个吧,嗯,咱们叫做new,创建一个package,我们叫U,好了,那我现在呢,点击new啊,我们在这里给它创建咱们叫jdb CU u,嗯,好,放过来,那么你放过来的话,首先我们需要通过它的嗯工具类来得到它的连接对象,这个连接对象啊,大家可以看到在我们这里呢,会有这么一个操作,来咱们拷贝啊,拷贝拷贝以后放到这里,那么首先这里有个叫data source叫数据源啊,给它导一下,嗯,数据源。
09:33
好了,还有我们的property啊,这个property我们放过来,然后呢,这个呢是读取咱们的配置文件,这个咱不要啊,这个咱不要了,把这个呢,我们去掉吧,我看看啊,把这个去掉,这个不要,然后呢,我们叫driver class name,这个没问题,然后呢,连接串,这个接串咱们之前有啊,所以我们的JDBC,咱们的连接串就是它呀,嗯,拷贝拷贝以后我们放到这个位置啊,把这个去掉好放过来,放过来以后这个改成叫Spark streaming,然后呢,这个U呢是我们的root,那这个咱们就不从配置文件里面取了啊,然后这个呢,我们给它来一个啊,这个呢,我们写上叫密码123123,然后呢,这个叫做data source,就是数据库连接呀,咱不是有一个叫德鲁伊嘛,咱们之前依赖的时候对吧,那这个呢,我们就是一个德鲁伊的一个连接词的数量,给个50吧,啊,给个写个50。
10:28
写完了以后,这是我们的一个工厂类factory啊,咱们叫德鲁伊。好了,那我现在这么写完以后呢,接下来下面呢,有一个叫获取数据库的连接,它会从咱们的数据源当中,把咱们的这个什么我们的数据源当中的连接啊给它拿到,这个是connection,咱们叫做Java circle connection啊好,那我现在呢,通过jdbc u啊,咱们来这里呢,我就把每一条数据呢,我们都进行操作,然后呢,进行回好吧,同学们来我们这里呢,写上一个我们的map啊,这个map,那么里面每一个就是一条数据,所以呢,放过来啊,嗯,好了,然后呢,JDBC,我们的点啊这个给它导一下,嗯,咱们点叫做什么呢?Get connection。
11:17
OK,那我们写上叫connection啊,你取完以后别忘了,最后是要给它close掉的啊,给它关掉,嗯,那么你中间干嘛呀,我要读取数据库,那我是不是应该查询数据呢?所以我们来写上,咱们叫做点,咱们叫prepare statement,那么然后呢,写词课文叫select啊星from,这个from呢,就是从我们的blacklist当中,大家看我们有张表叫blacklist,把它里面的这个给它拿到,你拿到以后它会返回结果,因为咱们这个表中啊就一个字段叫UID,所以啊,我这其实直接写UID就可以了。好,那么你这么写完以后呢,我会得到一个叫做a prop statement,就是它好了,当你完成以后,它是需要给它close掉关闭的啊,如果你不关闭的情况下,就有可能会出现资源后进啊,连接池连不上的情况,这是有可能会出现的啊好,我们接着往下,然后干嘛呢?我们去真正的执行它,因为是一个查询操作,所以我们叫excute query啊,然后点VAR,我们叫做RS,哎,叫查询结果集,因为可能有多条,对吧,可能一条都没有,有可能有多条,所以呢,我们well啊,咱们协商,咱们叫RS啊R s.next这些操作呀,记住啊,就是纯粹JDBC的操作了,这个在大家之前学买S的时候应该都是学过的啊,我们这并不会做过多的讲解,好吧,同学们来,我们的RS有了之后呢,现在干嘛呢,我们要准备把咱们的查询结果给它保存起来,所以在我们的前面呢,我先事先呢。
12:55
上一个叫做什么呢?叫做list,那等于list buffer,那咱们来放过来,咱们写上一个three,嗯,OK,把这个呢拿过来,我们叫list buffer,嗯,咱们的muable啊,List buffer,然后我们就往里面放就行了,点我们叫做增个a pen,追加,追加以后2S点咱们叫get string啊,给个我们的一就可以了啊,因为呢,我们这个就一个字段嘛,所以我们就取一个就够了啊,然后呢,不断的往这个集合里面放,放完之后,那这个查询就结束了,那查询结束的话,R s.close这样的话就是OK的啊,那好,那如果OK的情况下,同学们记住在我当前啊,黑名单已经被我取到了,记住这个就是那个黑名单的数据了,咱们叫black list,那就是他啊。
13:46
好,那你拿到了以后,那么你接下来干什么呀?诶大家看来拷贝拷贝以后啊,我们就要判断一下咱们的数据在不在黑名单当中了,对不对,所以咱们这个date里面,咱们拷贝,拷贝以后咱们点它里面,诶咱们的这个date我确认一下有一个叫U,这个U在不在我们这个里面呢?所以我们的black list咱们可以来啊嗯。
14:13
咱们叫做我们看看我确认一下,咱们叫做点,我们叫做什么呢?看看它是不是包含在里面,如果它在里面的话,那这个数据我们就是有问题的,所以呢,我确定一下啊,咱们的这个地方该怎么去做它啊,嗯,那我们这个地方叫做map,其实也可以啊,那我现在来啊,咱们做一个我们的判断,嗯,判断来我们接着写啊,就是把这个去掉。嗯,我们写上叫做点,咱们看是不是包含我们当前的那个date,点我们的U啊好。呃,那我想想啊,咱们刚才这样包含的,我觉得有点问题啊,为什么呢?因为我们的这个查询啊,它应该是周期性获取,其实不应该在这里是吧?所以我放的位置好像放错了吧,哎,同刚我放错了,我在这个代码呢,应该放前面去啊,放前面去在这个地方周期性的获取,获取之后,然后呢,其实我们是应该给它一个过滤的,对吧?啊,所以来啊,我放出的位置有点问题啊,放到位置有点问题,放过来咱们RDD,咱们点咱们叫ER啊,咱们叫做fielder,这个fielder呢,我们给它拿过来啊,拿过来咱们写上叫做date OK,放到这边这个data有没有冲突啊,好像没有冲突对不对,所以我们现在这么写是可以的啊,那么判断点击用户是否在黑名单当中,那么如果他在里面的话,我们就不能要,所以它不在里面,我们要保留,所以我加一个感叹号,就是这个意思啊。
15:49
好了,那这个地方我们就叫field r DD了啊,就意味着我们过滤后的数据就在这个位置,那么你过滤后的数据,那我们这里不就应该干嘛呀,进行统计了吗?所以我们把这句话呢,放到这儿来啊,来放到我们这个位置,放到这里以后,那我现在呢,就得做统计了,但是统计之前呢,我们需要做一这么个操作,咱们叫field r dd.map我需要把它的格式稍微的我们变一变,所以呢,我们写个括号,括号然后写上一个date,好,那我现在想变成什么呢?你要想做统计的话,那我们心在首先应该有一个K,有一个V,那么这个V呢,其实就是一,那这个K是什么?按照咱们的需求啊,他是这么说的,需求当中它描述了这么一个概念,他是说我们的每天。
16:40
某个广告的用户,那所以呢,我们应该以天为单位,对不对,所以咱们这应该写个天,那么我们写上一个括号,那么这个应该写上一个我们的什么呢?我们叫做day,对吧?天嘛,嗯,OK,然后再来我们还有,嗯,还有什么呢?我们有一个用户叫U。还有一个啊,咱们叫做广告,诶咱们ad啊,等于它,所以说呢,也就意味着我们的day,然后呢,我们的这个叫它,然后呢,我们的U,诶好放过来,那么咱们的UR呢,其实就在这个date里面,它里面就有个U,然后呢,我们date当中就会有一个广告,这都没问题,所以啊这么写是对的,但这个天就有点不太对的,为什么呢?因为咱们这个天呀,咱们给大家准备的这个数据是什么呢?同学们想想,它其实是一个我们的毫秒时间戳。
17:32
那如何把这个毫秒把它变成天呢,对不对,所以啊,大家会来看啊,我们这里呢,想变成天的话,那我们这里可以给它格式化一下,比方说我们写上咱们叫做SDF,等于叫做new,咱们叫simple啊,咱们叫simple date。咱们叫simple date form,然后写上咱们就叫YYYY啊,然后MMDD,年月日嘛,那所以说我们这里来,我们写上它,咱们叫做点,点了以后,这个地方我们有一个叫做format啊,把一个我们的日期变成字符串,那你把一个我们的日期变成字符串,那日期在哪呢?日期给它来一个叫做new date,那这个时候我们写上叫Java,点我们的u.date记住它里面是可以传一个时间戳的,这个时间戳咱们的date里面恰恰是有的,咱们叫做TS,不过它是个字符串,我们这里应该给它to long。
18:34
啊,这样的话是一个时间戳,那时间戳给它格式化成年月日,那我们这不就有了吗?对不对?那么当你去完成操作以后,大家想想这个不就是word count吗?对不对?我们的一不就是count吗?咱们前面的这三个合在一块就叫word,所以啊,我们这里呢,给它来点,我们叫reduce by key,那么两两聚合,聚合之后咱们接下来就把它统计完成了,大家想想是不是这样的,所以啊,咱们的这个地方的transform返回其实就是我们的count啊,咱们这边写上咱们叫做DS吧,这就是我们转换之后的一个stream叫散流,对吧?哎,就这个意思啊,好,那我现在呢,把这一步算是完成了,最起码我们的数据我要判断在不在黑名单当中啊。
我来说两句