00:00
那接下来我们就正式的开始准备要在呃实际的这个代码里边做一个项目的实现了,我们这个项目呢,呃,最后还是用这个Java来编写,用idea作为集成开发环境啊,进行设置这样一个ma项目啊,用ma作为项目管理的包管理的工具,所以首先呢,我们在这个idea里边还是create一个new project。把它创建出来,我们先把这个整个项目的架子先搭好,首先com.at at,硅谷group ID artif artifact ID,我这个叫做就是项目名称啊,User behavior。呃,Analysis对吧,用户行为分析啊,把这个先创建好。创建一个新项目,呃,那么首先我们要处理的就应该是这个POM文件对吧?呃,这里边大家说呃首先要注意一下,就是我们接下来其实这个项目里边有各种各样不同的拈块,所以呃,接下来的问题就在于我是不是相当于应该有一个不同层级的这种拈块的,呃,层级关系先要设置出来啊啊所以接下来我首先是在下边直接去new一个module,然后接下来呢,呃,我还是啊没没去管理这样的一个模块啊,一个module,我当前这个模块就叫做第一个模块啊,实时的热门商品统计对吧?啊,我直接可以把它叫叫成这个hot items。
01:35
直接把这个模块先创建出来。然后我们就想到了,其实接下来我们的任务是不是就是在外边的这个副项目里边啊,它其实是不会去写具体代码的,具体代码应该都放在子模块里面去写,对不对啊,所以外边的这个source目录其实是可以直接删掉的。啊,然后大家看把子模块创建出来之后,负模块这负项目这边其实就多了一个modules对吧,直接在这里就就有这样一个关系了,然后接下来我们其实就是子模块也有一个po沫文件,如果说我们是所有模块通用的依赖的话,就放在负项目里边,如果是当前子模块单独独享的这个,呃,依赖的话就放在自己的里边,对吧?呃,这个模块管理依赖管理就会更加的合理一些。
02:25
好,呃,那接下来首先大家可能会想到我这里边怎么样去呃写这个po文件呢?那这里边我们既然是用flink去做数据分析吗?那是不是首先必须要有flink的相关依赖啊,那这个依赖是应该放在负项目还是子子模块呢?当然是放在负项里边对吧?啊,就是所有的拈肯定都是拿flink做的嘛,另外还涉及到就是是不是应该肯定要有卡夫卡相关的依赖啊,对吧?我们整整体来讲,真实项目数据源一般都是基于这个卡夫卡读径来的消费卡夫卡数据,所以卡夫卡的比方说它的这个客户端连接工具啊,另外还有就是我们flink卡夫卡的那个connector,这些都要引入进来,所以接下来我们看一下这个po文件到底应该怎么写啊,首先大家注意一下,跟之前不一样,我们之前都是直接上来dependenceies对吧,然后下面写依赖就完事,现在呢,我是首先写了一个properties。
03:23
这些是用来干什么的呢?大家直观的看一下,这其实就是对我当前是不是要做版本控制,版本管理啊,因为在实际生产项目当中,有可能我们一个大的项目下面的模块具体的需求可能是不同的,呃,同事不同的工程师来做的开发,那假如说你用的这个flink是一点十,我用的是01:11,他用的是1.8,那这个肯定就最后就乱了,对吧?我们提交的时候,这个你可能用到了不同的那个性能啊,这个最后如果不兼容的话,这个就很麻烦了啊,Flink的话,这个应该大家还都是一致的,因为我们当前生产环境集群肯定是固定的嘛,但是假如说用到的这个,比方说卡夫卡的版本对吧,其他的一些工具的版本,那其实如果出现版本冲突就很麻烦,所以最好的方式就是。
04:13
在呃,副项目的这个po文件里边,直接把它统一,就像属性一样啊,把它直接配置出来,我们当前flink的版本是一点十点一。Scla b re version这个单独列出来了,因为大家会发现在很多地方都对scla有依赖,对吧?呃,所以我这里面把它单独列出来,2.12,我们现在skyla的版本是2.12,那最后还有一个卡夫卡的版本,这个其实是卡夫卡客户端的一个版本啊,因为大家知道卡夫卡连接器的版本是不是应该跟弗link的版本是跟着的呀?啊,这个其实不需要写啊,我们这里边是那个客户端的版本是2.2.0。呃,那所以这样写出来之后,后续如果我们想要去升级版本的话。大家想也不用一个一个去找依赖了,对吧,直接是不是在这儿全局直接改一个版本,后面就都改了啊,这个其实是非常简单,也是实际项目当中最常见的一种配置方法,好,那然后接下来大家看一下这个dependency需要哪些,首先我们要引入flink,那是不是flink Java和flink streaming Java,然后SKY版本2.12是不是都必须要引入啊啊大家看一下这个写法啊,就这里边我引入对应的那个属性property的时候,可以用一个Dollar符加一个花括号,把对应的这个。
05:28
大家看这里边的这个名称是不是直接引入啊,然后就相当于一个变量对吧,把这个变量放在这里了,甚至我可以直接做这个拼接,你看这里边flink streaming Java后边下划线,然后是do scale b version,那是不是后边相当于就是用2.12替代啊,哎,所以这个就是,呃,这样写的话就会非常的通用性非常的好啊呃,下面还有这个对应这个连接器啊呃,对应的这个flink stream Java的版本当然也是一点十点一了。那另外我们还要引入当前,呃,这个是卡夫卡本身卡卡客户端的那个版本,对吧?这里引入一个卡夫卡,然后用的用的是卡夫卡version,另外我们还要引入弗Li卡夫卡之间的那个连接器,这个连接器我们用的是flink的版本,因为是flink官方给我们提供的,对吧?啊,所以这就是我们当前需要的东西,然后下边还有一个build build里边可能是有一些插件需要引入,我当前引入的是一个ma compiler plug in编译的这个插件,大家看我是指定了当前的版本,是不是必须是。
06:31
这个1.8呀,对吧,然后编码方式UTF8对吧?啊,如果大家已经配置好的话,这个其实不加也可以。我这里面就统一把这个po文件里的内容copy过来。直接放在下边。好,那其实大家会想到,如果我开启这个就是自动引入的话,这里边如果直接写在这个负项目的po文件里边的话,那子模块里边会需要去引入吗?哦,这个写错了,这个是子模块对吧?大家看到我们这里边引入的这个都是子模块里的啊。
07:06
Dependency property。好,我把这个先删掉,然后放到这个负项目的下边。我们来做一个引入,那大家想这个负项目引入子模块里面有没有呢?哎,对,大家看一下这个我们刷新一下啊,当前这个负项目下边dependency都有了,子模块下边是不是自然就引入了呀?呃,这个是没有问题的啊,所以接下来我们直接直观的这个依赖都都有了,接下来我们就看当前的模块到底应该怎么写。代码当然是应该写在source main Java下边,那这里边呢,还涉及到我们当前的数据呢,呃,数据本来应该是从卡夫卡里边读取,但我现在没有现成的数据,所以我们是放在了给大家准备好的这个data里边,有一个user behavior.csv对吧?所以我们现在是从文件里面读取数据,直接把它放在resources下边。
08:08
先把这个引入进来,呃,大家会看到就是这个数据量有多大呢。大概大概48万多条啊,当然对于我们这个测试来讲是够用了,但是这对于真实的大数据处理场景来讲,这个数据量其实也不大啊,我们只是通过这个啊,去感受一下当前我们这个项目整个架构怎么做就可以了,然后里边的字段,诶,大家回忆起来,User ID对吧,Item ID,还有那个类别的品类ID后面还有他当前的行为啊,PV car对吧,FA,最后还有一个时间戳。这这是已经做过ETL的,因为逗号分割嘛,一看就能就能看出来啊啊那接下来我们就可以去写代码了,首先我们想到既然是用到了这样的一个数据,我们后边是不是代码里边需要把它包装成对应的那个po类型啊,诶所以这里边我们是不是应该先做一个,先做一个这个po的一个定义啊啊所以当前我把这个定义成啊,可以加上前面这个包名啊,com.at硅谷反写域名,然后接下来我当前这个是在hot it。
09:18
呃,Analysis下边对吧,然后当前呢,我在新创建一个一个package,名字叫beans啊还是啊,就叫做B,接下来当前我这个名字叫user behavior。好把这个类创建出来,然后其实大家知道下面我们主要就是要定义一些。定义。私有属性主要需要的是什么呢?呃,应该就是用户ID对吧?Private长整型是一个长整型的用户IDUID,然后是不是还需要有一个长整型的item ID啊,商品ID,然后接下来还有一个一个类别ID对吧?Private啊,这个我们是用那个integer啊,就是类别相对来讲少一些category ID。
10:12
啊,那另外后面还有两个字段,一个是当前的行为,一个是时间戳,那行为大家知道其实是string对吧,Private string这个行为我们就叫做behavior吧。最后还有一个时间戳private长整型time step啊,这就是我们定义好的这些属性吧啊,然后接下来自然是符合这个抓病的标准啊呃,那那我接下来要去生成这个constructor,首先先来一个空餐的constructor,然后。呃,方便我们方便我们定义的话,方便使用的话,把这个直接全包起来,生成一个这样的带参数的对吧?啊,然后接下来还有就是是不是geter set都要生成啊,私有的属性啊,公开的这个get set都要生成,最后为了方便我们打印输出,把对应的那个to string也生成一下,对吧?诶,这就是我们想要的这个user behavior po类,然后接下来这里边还涉及到另外一个一个po,哎,那就是我们中间窗口的那个聚合结果,是不是也要想做一个包装啊,对吧?中间我们不是管它叫做item view count嘛,里边主要就三个字段,当前的商品ID,然后它属于哪个窗口对吧?那最后它的那个count值到底是多少,就是把那个要统计出来包装成一个po啊,所以接下来还是新建一个class。
11:40
这个叫做item view count,把这个先创建出来,同样首先上来之后还是啊private,呃,长整型的这个item ID,然后接下来,呃,大家想那个window and应该是什么类型。我们也可以把它转换成一个时间,就是年月日十分秒,对吧,但是就是对,如果大家直接从那个window里边去获取的话,获取到的是不是也是一个时间戳啊,诶,所以它其实就是一个长整型对吧?我们直接把这个时间戳拿出来就完事了,所以这里边是一个window and,最后还有一个count值啊,这个干脆我简单粗暴,三个都是长整型得了,对吧?因为count数量我也不知道到底有多少,有可能非常大,好,所以直接放在这儿,这就是三个定义好的属性字段啊,那接下来当然就是空餐,还是得创建出来。
12:35
接下来是带参数的构造方法,另外公开的get set全创建出来,最后再来一个to string,对吧?哎,这就是我们定义好的预先啊,先把对应的这些数据什么的都先定义好。
我来说两句