/**
* @Description 新老用户统计分析
* 原来是根据数据中的某个字段
* 现在我们是根据每个 device(设备) 来判断是否是新老用户
* 思考 => device放在状态里面去呢?
* 我们的实现:状态 + 布隆过滤器
*/
public class OsUserCntAppV3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment.readTextFile("data/access.json");
environment.setParallelism(1); //设置并行度为1方便观察
SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
@Override
public Access map(String s) throws Exception {
// json 转 Access
try {
return JSON.parseObject(s, Access.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
//这里是只要不为空的数据 x != null等于把上面的空的数据过滤掉
}).filter(x -> x != null).filter(new FilterFunction<Access>() {
@Override
public boolean filter(Access access) throws Exception {
//只过滤出来 event='startup'的数据
return "startup".equals(access.event);
}
});
//根据设别类型进行keyBy
filter.keyBy(x->x.deviceType)
//全窗口 key I O
.process(new KeyedProcessFunction<String, Access, Access>() {
private ValueState<BloomFilter<String>> state;
@Override
public void open(Configuration parameters) throws Exception {
//使用布隆过滤器
//import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter; 注意路径
//状态初始化 使用ValueState描述符 布隆过滤器传入设备编号
//初始化传入名字和类型
ValueStateDescriptor<BloomFilter<String>> descriptor =
new ValueStateDescriptor<>("s", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
state = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Access value, Context context, Collector<Access> collector) throws Exception {
//来一条处理一条 "device":"4759947c-cd47-433c-ac8f-ae923a6d38b6" 设备ID
String device = value.device;
//状态中获取值
BloomFilter<String> bloomFilter = state.value();
//布隆过滤器固定写法 数据值
if(null==bloomFilter){
bloomFilter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
}
//来一条数据判断一次看过滤器是否包含
//mightContain() 可能包含 !bloomFilter.mightContain(device) 肯定不包含
if(!bloomFilter.mightContain(device)){
//不包含放入设备ID
bloomFilter.put(device);
//1是新用户 这里布直接修改nu字段 应该会造成进来的可能全是 nu=1 这里使用新字段nu2
//nu2第一次赋值为1后是新用户 再进来就会给布隆写成0了
value.nu2=1;
//更新状态
state.update(bloomFilter);
}
//输出
collector.collect(value);
}
}).print();
environment.execute("OsUserCntAppV1");
}
}
event='startup', net='WiFi', channel='华为商城', uid='user_1', nu=1, nu2=1, ip='171.11.85.21',
event='startup', net='WiFi', channel='华为商城', uid='user_1', nu=1, nu2=0, ip='171.11.85.21'