我们说一个正常的数仓DWD层的数据分析之前,我们先总结一下不同的处理函数的区别

比如MapFunction ProcessFunction SinkFunction

图片

基本总结为上图,上面的图中也可以看出,Rich比简单的Function功能更强大,而唯一能使用侧输出的,只有ProcessFunction

那么接下来,我们继续DWD层的开发

我们通过了分流的手段,将数据分为了独立的topic,这就需要规划topic对应的指标了

基本总结如下

图片

进行了指标的梳理,形成了上面的指标表格

那么,我们就按照上面的指标,依次的书写代码

首先是uv, 独立访客,或者叫做每日的活跃用户

识别的逻辑基本如下

首先是判断这是这个访客打开的第一个页面

然后是访客在这一天内的第一次进入

确认完成后,将数据保存在Kafka中,作为DWM层

那么书写代码基本如下

首先准备一个数据源

StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “linux1:9092”);

props.put(ConsumerConfig.GROUP_ID_CONFIG, “atguigu”);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);

final DataStreamSource<String> kafkaDS = env.addSource(

new FlinkKafkaConsumer<String>(

“dwd_page_log”,

new SimpleStringSchema(),

props

)

);

然后将string转换为一个JsonObject

final SingleOutputStreamOperator<JSONObject> mapDS =

kafkaDS.map(JSON::parseObject);

其次对用户进行分组,按照JSON中的mid进行分组

final KeyedStream<JSONObject, String> jsonKS = mapDS.keyBy(json -> json.getJSONObject(“common”).getString(“mid”));

然后进行用户的过滤,这里我们使用一个RichFilter函数

这里我们需要使用到今日作为一个状态,所以需要在open函数中进行状态的设置

@Override

public void open(Configuration parameters) throws Exception {

// TODO 给状态设定过期时间

final ValueStateDescriptor<String> lastDate = new ValueStateDescriptor<>(“lastDate”, String.class);

final StateTtlConfig ondDay = StateTtlConfig.newBuilder(Time.days(1)).build();

lastDate.enableTimeToLive(ondDay);

lastDateState = getRuntimeContext().getState(lastDate);

sdf = new SimpleDateFormat(“yyyy-MM-dd”);

}

分别给

private ValueState<String> lastDateState = null;

private SimpleDateFormat sdf = null;

属性进行了初始化

然后对数据进行正式的过滤

我们将顺序进行判断

是否存在last_page_id,如果存在,就说明不是第一次的访问,就直接过滤掉

如果没有,就查看跳入时间是不是状态时间,如果一样,就说明是冗余的数据,需要过滤

不一样,则进行数据传输,并更新时间状态,基本上的代码如下

                    // TODO 过滤数据

@Override

public boolean filter(JSONObject json) throws Exception {

// TODO 1. 判断是否存在上一页数据

final String lastPage =

json.getJSONObject(“page”).getString(“last_page_id”);

if (StringUtils.isNotEmpty(lastPage) ) {

// TODO 1.2 如果有的场合,那么说明不是第一回访问,那么直接过滤掉

return false;

} else {

// TODO 1.1 如果没有的场合,查看跳入时间和当前状态时间是否相同

final String lastDate = lastDateState.value();

final Long ts = json.getLong(“ts”);

final String jsonDate = sdf.format(ts);

if ( StringUtils.isNotEmpty(lastDate)

&& lastDate.equals(jsonDate)

) {

// TODO 1.1.1 如果相同,那么直接过滤掉

return false;

} else {

// TODO 1.1.2 如果不相同,那么数据向后传输

// 更新时间状态

lastDateState.update(jsonDate);

return true;

}

}

}

}

之后便是创建一个sink,从而将数据再次发出给Kafka

.map(json->json.toJSONString()).addSink(

new FlinkKafkaProducer<String>(

“linux1:9092”,

Const.TOPIC_DWM_LOG_UV,

new SimpleStringSchema()

)

);

其次,第二个kpi的计算是跳出率

跳出率就是一个用户访问了网站的一个页面就退出,不在访问网站的其他页面

那么这就需要我们保存哪些只访问一个页面的访客数据

判断的逻辑基本如下

这个页面是用户近期访问的第一个页面,这个可以根据用户是否存在last_page_id来判断,然后很长的一段时间,用户没有访问其他的页面

对于第一个特征,保留last_page_id为空的就可以了,第二个访问的判断,需要使用到CEP技术中的时间观念

基本的框架和上面的UV一致,先是创建一个Source和Map函数,

不过由于涉及到了CEP中的时间概念

所以,需要生成水位线和事件时间

 final KeyedStream<JSONObject, String> midKS = jsonDS.assignTimestampsAndWatermarks(

WatermarkStrategy.<JSONObject>forMonotonousTimestamps()

.withTimestampAssigner(

new SerializableTimestampAssigner<JSONObject>() {

@Override

public long extractTimestamp(JSONObject element, long recordTimestamp) {

return element.getLong(“ts”);

}

}

)

).keyBy(json -> json.getJSONObject(“common”).getString(“mid”));

然后书写CEP表达式

final Pattern<JSONObject, JSONObject> pattern = Pattern

.<JSONObject>begin(“start”)

.where(

new SimpleCondition<JSONObject>() {

@Override

public boolean filter(JSONObject json) throws Exception {

final String page = json.getJSONObject(“page”).getString(“last_page_id”);

if (StringUtils.isEmpty(page)) {

return true;

} else {

return false;

}

}

}

).next(“next”)

.where(

new SimpleCondition<JSONObject>() {

@Override

public boolean filter(JSONObject json) throws Exception {

final String page = json.getJSONObject(“page”).getString(“page_id”);

if (StringUtils.isNotEmpty(page)) {

return true;

} else {

return false;

}

}

}

).within(Time.seconds(10));

final PatternStream<JSONObject> ps = CEP.pattern(midKS, pattern);

第一个where中筛选出了所有不存在last_page_id的

第二个where中筛选出了还有page_id

要求第二个的时间距离必须要在第一个的10s内

这样所有的符合条件的就是用户没有跳出的

这时候我们取反集

        // TODO 对超时数据进行处理

OutputTag<String> timeoutTag = new OutputTag<String>(“timeout”){};

ps.flatSelect(

timeoutTag,

new PatternFlatTimeoutFunction<JSONObject, String>() {

@Override

public void timeout(Map<String, List<JSONObject>> map, long l, Collector<String> collector) throws Exception {

final List<JSONObject> start = map.get(“start”);

// TODO 将每一个超时的数据都放入到侧输出流中

for ( JSONObject json : start ) {

collector.collect(json.toJSONString());

}

}

},

new PatternFlatSelectFunction<JSONObject, String>() {

@Override

public void flatSelect(Map<String, List<JSONObject>> map, Collector<String> collector) throws Exception {

}

}

).getSideOutput(timeoutTag).addSink(

new FlinkKafkaProducer<String>(

“linux1:9092”,

Const.TOPIC_DWM_LOG_JUMP_OUT,

new SimpleStringSchema()

)

);

如上面所示,我们只搜集其中的timeout数据,保留到侧输出流中,输出到Kafka中

其次我们涉及到了一个宽表的需求

因为订单是一个非常重要的数据,很多的分析都依赖订单表进行统计

为了统计更加的方便,订单表会被整合为宽表

比如下面的表会被整合为一订单宽表

图片

不过我们有两种数据的存储方式,分别对应着事实数据 – kafka,维度数据 – Hbase

我们需要考虑不同维度的整合

分别是事实数据和事实数据关联,也就是流和流之间的关联

事实数据和维度数据关联,也就是流计算之中查询外部数据源

那么我们就按照上面的不同维度的划分,来进行计算不同的代码数据

首先是流和流之间的管理

我们将存在于Kafka的dwd_order_info和dwd_order_detail

为了方便进行序列化和反序列化,我们首先创建对应的Bean

Order_info如下

@Data

public class OrderInfo {

Long id;

Long province_id;

String order_status;

Long user_id;

BigDecimal total_amount;

BigDecimal activity_reduce_amount;

BigDecimal coupon_reduce_amount;

BigDecimal original_total_amount;

BigDecimal feight_fee;

String expire_time;

String create_time;

String operate_time;

String create_date; // 把其他字段处理得到

String create_hour;

Long create_ts;

}

Order_detail如下:

@Data

public class OrderDetail {

Long id;

Long order_id ;

Long sku_id;

BigDecimal order_price ;

Long sku_num ;

String sku_name;

String create_time;

BigDecimal split_total_amount;

BigDecimal split_activity_amount;

BigDecimal split_coupon_amount;

Long create_ts;

}

然后是Kafka的数据联合

首先分别创建Order_info和Order_detail的数据流

        final DataStreamSource<String> orderDS = env.addSource(

new FlinkKafkaConsumer<String>(

Const.TOPIC_DWD_DB_ORDER,

new SimpleStringSchema(),

kafkaProp

)

);

final SingleOutputStreamOperator<OrderInfo> orderInfoTimeDS = orderDS.map(

new RichMapFunction<String, OrderInfo>() {

private SimpleDateFormat sdf = null;

@Override

public void open(Configuration parameters) throws Exception {

sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);

}

@Override

public OrderInfo map(String value) throws Exception {

final OrderInfo orderInfo = JSONObject.parseObject(value, OrderInfo.class);

orderInfo.setCreate_ts(sdf.parse(orderInfo.getCreate_time()).getTime());

return orderInfo;

}

}

).assignTimestampsAndWatermarks(

WatermarkStrategy.<OrderInfo>forMonotonousTimestamps()

.withTimestampAssigner(

new SerializableTimestampAssigner<OrderInfo>() {

@Override

public long extractTimestamp(OrderInfo element, long recordTimestamp) {

return element.getCreate_ts();

}

}

)

)

        final DataStreamSource<String> orderDetailDS = env.addSource(

new FlinkKafkaConsumer<String>(

Const.TOPIC_DWD_DB_ORDER_DETAIL,

new SimpleStringSchema(),

kafkaProp

)

);

final SingleOutputStreamOperator<OrderDetail> orderDetailTimeDS = orderDetailDS.map(

new RichMapFunction<String, OrderDetail>() {

private SimpleDateFormat sdf = null;

@Override

public void open(Configuration parameters) throws Exception {

sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);

}

@Override

public OrderDetail map(String value) throws Exception {

final OrderDetail orderInfo = JSONObject.parseObject(value, OrderDetail.class);

orderInfo.setCreate_ts(sdf.parse(orderInfo.getCreate_time()).getTime());

return orderInfo;

}

}

).assignTimestampsAndWatermarks(

WatermarkStrategy.<OrderDetail>forMonotonousTimestamps()

.withTimestampAssigner(

new SerializableTimestampAssigner<OrderDetail>() {

@Override

public long extractTimestamp(OrderDetail element, long recordTimestamp) {

return element.getCreate_ts();

}

}

)

);

然后我们将两者按照相同的id进行分组

final KeyedStream<OrderInfo, Long> orderInfoKS = orderInfoTimeDS.keyBy(info -> info.getId());

final KeyedStream<OrderDetail, Long> orderDetailKS = orderDetailTimeDS.keyBy(detail -> detail.getOrder_id());

之后首先我们考虑利用connect的方式进行两者的连接

这里强调一个概念,就是flink中的join分为两种,分别是基于时间窗口的Time Windowed JOIN

还有就是基于状态缓存的join intervalJoin

我们这里使用interval join,毕竟interaval join的使用更加简单,

在这里我们设置捕获前后五秒的数据进行连接

final SingleOutputStreamOperator<OrderWide> joinDS = orderInfoKS.intervalJoin(orderDetailKS).between(Time.seconds(-5), Time.seconds(5))

.process(

new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {

@Override

public void processElement(OrderInfo left, OrderDetail right, Context ctx, Collector<OrderWide> out) throws Exception {

out.collect(new OrderWide(left, right));

}

}

);

其次我们会得到一个包含OrderWide的数据流

其次我们将其和HBase进行合并,创建新的数据流

这里我么使用的是异步IO,这里将外部的IO操作进行异步化,单个并行可以连续发多个请求,哪个请求先返回就处理哪个,提高了流处理的效率,也就是AsyncDataStream

那么我们直接看下AsyncDataStream的处理

我们继承了RichAsyncFunction

public class DIMDataFunction extends RichAsyncFunction<OrderWide, OrderWide>

我们首先在open函数中利用文件来模拟HBase的连接

    private Map<Long, Province> prvMap = new HashMap<Long, Province>();

@Override

public void open(Configuration parameters) throws Exception {

// TODO 将Hbase的操作初始化

// TODO 模仿维度表的查询 – 读取文件

final ExecutionEnvironment env =

ExecutionEnvironment.getExecutionEnvironment();

final DataSource<String> file = env.readTextFile(“data/province.txt”);

final List<String> datas = file.collect();

for ( String dat : datas ) {

final String[] split = dat.split(“\t”);

Province p = new Province(

Long.parseLong(split[0]),

split[1],

split[2],

split[3],

split[4],

split[5]

);

prvMap.put(p.getId(), p);

}

}

其次是连接流,将传入的OrderWide进行加工

@Override

public void asyncInvoke(OrderWide input, ResultFuture<OrderWide> resultFuture) throws Exception {

final Long pid = input.getProvince_id();

final Province province = prvMap.get(pid);

input.setProvince_area_code(province.getArea_code());

input.setProvince_3166_2_code(province.getIso_3166_2());

input.setProvince_name(province.getName());

input.setProvince_iso_code(province.getIso_code());

resultFuture.complete(Collections.singleton(input));

}

最外部的使用如下

final SingleOutputStreamOperator<OrderWide> orderWideDS = AsyncDataStream.unorderedWait(

joinDS,

new DIMDataFunction(),

60,

TimeUnit.SECONDS

);

最终就是标准流程,将数据写入到Kafka的Topic中

 orderWideDS.map(w->JSON.toJSONString(w)).addSink(

new FlinkKafkaProducer<String>(

“linux1:9092”,

Const.TOPIC_DWM_DB_ORDER_WIDE,

new SimpleStringSchema()

)

);

那么这就基本完成了我们处理DWM层的基本逻辑,将一种明细转换为另一种明细,方便后面的统计

本阶段也介绍了

使用State进行去重

利用CEP进行数据的筛选

使用intervalJoin处理流join

学会处理维度关联,利用缓存和异步查询进行性能优化

发表评论

邮箱地址不会被公开。 必填项已用*标注