flink强化学习实时推荐(flink实战教程-使用set实时计算当天网站uv)
flink强化学习实时推荐(flink实战教程-使用set实时计算当天网站uv)1>Result{ dateTime='2020-06-2119:23:30'type='IOS' uv=136} 2>Result{ dateTime='2020-06-2119:23:30'type='Android' uv=150} 1>Result{ dateTime='2020-06-2119:23:30'type='H5' uv=134} 1>Result{ dateTime='2020-06-2119:23:31'type='IOS' uv=164} 2>Result{ dateTime='2020-06-2119:23:31'type='Android' uv=177} 1>Res
背景对于web网站,我们一般会有这样的需求,实时的计算出来当天网站的uv,尽可能快的展示出来。今天我们就讲一下基于java的set集合做一下实时uv的统计。
简易需求:
- 实时计算出当天零点截止到当前时间各个端(android ios h5)下的uv
- 每秒钟更新一次统计结果
模拟source
首先我们模拟生成一下最简单的数据,生成一个flink的二元组Tuple2.分别表示分类和用户id
publicstaticclassMySourceimplementsSourceFunction<Tuple2<String Integer>>{
privatevolatilebooleanisRunning=true;
Stringcategory[]={"Android" "IOS" "H5"};
@Override
publicvoidrun(SourceContext<Tuple2<String Integer>>ctx)throwsException{
while(isRunning){
Thread.sleep(10);
//具体是哪个端的用户
Stringtype=category[(int)(Math.random()*(category.length))];
//随机生成10000以内的int类型数据作为userid
intuserid=(int)(Math.random()*10000);
ctx.collect(Tuple2.of(type userid));
}
}
@Override
publicvoidcancel(){
isRunning=false;
}
}
定义窗口
接下来我们定义一个周期是一天的滑动窗口,因为我们要每秒钟输出窗口的数据,所以我们紧接着窗口定义了一个1秒的触发器。
DataStream<Tuple2<String Integer>>dataStream=env.addSource(newMySource());
dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.days(1) Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
.aggregate(newMyAggregate() newWindowResult())
.print();
自定义聚合算子
接下来我们自定义一个聚合算子来实现该功能。
对于聚合算子的理解可以参考这个文章:
https://www.toutiao.com/i6835575784409661959/
publicstaticclassMyAggregate
implementsAggregateFunction<Tuple2<String Integer> Set<Integer> Integer>{
@Override
publicSet<Integer>createAccumulator(){
returnnewHashSet<>();
}
@Override
publicSet<Integer>add(Tuple2<String Integer>value Set<Integer>accumulator){
accumulator.add(value.f1);
returnaccumulator;
}
@Override
publicIntegergetResult(Set<Integer>accumulator){
returnaccumulator.size();
}
@Override
publicSet<Integer>merge(Set<Integer>a Set<Integer>b){
a.addAll(b);
returna;
}
}
处理输出结果
我们这里将结果输出到控制台,实际的生产中我们可以将数据写入redis或者hbase等。
1>Result{ dateTime='2020-06-2119:23:30'type='IOS' uv=136}
2>Result{ dateTime='2020-06-2119:23:30'type='Android' uv=150}
1>Result{ dateTime='2020-06-2119:23:30'type='H5' uv=134}
1>Result{ dateTime='2020-06-2119:23:31'type='IOS' uv=164}
2>Result{ dateTime='2020-06-2119:23:31'type='Android' uv=177}
1>Result{ dateTime='2020-06-2119:23:31'type='H5' uv=167}
2>Result{ dateTime='2020-06-2119:23:32'type='Android' uv=205}
1>Result{ dateTime='2020-06-2119:23:32'type='IOS' uv=193}
1>Result{ dateTime='2020-06-2119:23:32'type='H5' uv=198}
完整代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/RealTimePvUv_Set.java
欢迎关注我的公众号:【大数据技术与应用实战】获取更多精彩内容