快捷搜索:  汽车  科技

flink强化学习实时推荐(flink实战教程-使用set实时计算当天网站uv)

flink强化学习实时推荐(flink实战教程-使用set实时计算当天网站uv)1>Result{ dateTime='2020-06-2119:23:30'type='IOS' uv=136} 2>Result{ dateTime='2020-06-2119:23:30'type='Android' uv=150} 1>Result{ dateTime='2020-06-2119:23:30'type='H5' uv=134} 1>Result{ dateTime='2020-06-2119:23:31'type='IOS' uv=164} 2>Result{ dateTime='2020-06-2119:23:31'type='Android' uv=177} 1>Res

背景

对于web网站,我们一般会有这样的需求,实时的计算出来当天网站的uv,尽可能快的展示出来。今天我们就讲一下基于java的set集合做一下实时uv的统计。

简易需求:

  • 实时计算出当天零点截止到当前时间各个端(android ios h5)下的uv
  • 每秒钟更新一次统计结果
案例讲解

模拟source

首先我们模拟生成一下最简单的数据,生成一个flink的二元组Tuple2.分别表示分类和用户id

publicstaticclassMySourceimplementsSourceFunction<Tuple2<String Integer>>{ privatevolatilebooleanisRunning=true; Stringcategory[]={"Android" "IOS" "H5"}; @Override publicvoidrun(SourceContext<Tuple2<String Integer>>ctx)throwsException{ while(isRunning){ Thread.sleep(10); //具体是哪个端的用户 Stringtype=category[(int)(Math.random()*(category.length))]; //随机生成10000以内的int类型数据作为userid intuserid=(int)(Math.random()*10000); ctx.collect(Tuple2.of(type userid)); } } @Override publicvoidcancel(){ isRunning=false; } }

flink强化学习实时推荐(flink实战教程-使用set实时计算当天网站uv)(1)

定义窗口

接下来我们定义一个周期是一天的滑动窗口,因为我们要每秒钟输出窗口的数据,所以我们紧接着窗口定义了一个1秒的触发器。

DataStream<Tuple2<String Integer>>dataStream=env.addSource(newMySource()); dataStream.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.days(1) Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .aggregate(newMyAggregate() newWindowResult()) .print();

自定义聚合算子

接下来我们自定义一个聚合算子来实现该功能。

对于聚合算子的理解可以参考这个文章:

https://www.toutiao.com/i6835575784409661959/

publicstaticclassMyAggregate implementsAggregateFunction<Tuple2<String Integer> Set<Integer> Integer>{ @Override publicSet<Integer>createAccumulator(){ returnnewHashSet<>(); } @Override publicSet<Integer>add(Tuple2<String Integer>value Set<Integer>accumulator){ accumulator.add(value.f1); returnaccumulator; } @Override publicIntegergetResult(Set<Integer>accumulator){ returnaccumulator.size(); } @Override publicSet<Integer>merge(Set<Integer>a Set<Integer>b){ a.addAll(b); returna; } }

处理输出结果

我们这里将结果输出到控制台,实际的生产中我们可以将数据写入redis或者hbase等。

1>Result{ dateTime='2020-06-2119:23:30'type='IOS' uv=136} 2>Result{ dateTime='2020-06-2119:23:30'type='Android' uv=150} 1>Result{ dateTime='2020-06-2119:23:30'type='H5' uv=134} 1>Result{ dateTime='2020-06-2119:23:31'type='IOS' uv=164} 2>Result{ dateTime='2020-06-2119:23:31'type='Android' uv=177} 1>Result{ dateTime='2020-06-2119:23:31'type='H5' uv=167} 2>Result{ dateTime='2020-06-2119:23:32'type='Android' uv=205} 1>Result{ dateTime='2020-06-2119:23:32'type='IOS' uv=193} 1>Result{ dateTime='2020-06-2119:23:32'type='H5' uv=198}

完整代码请参考 https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/RealTimePvUv_Set.java

欢迎关注我的公众号:【大数据技术与应用实战】获取更多精彩内容

猜您喜欢: