在实时数仓中,我们经常需要做维表关联,但是用户维表一般在业务库中,业务方是不允许大数据部门直接到业务数据库进行维表关联,因为这会影响线上业务。此时我们需要将用户维表采集到大数据平台,然后事实表就可以直接跟维表进行关联,从而生成事实宽表,具体场景如下图所示。
答案,是不行的。因为在生产环境中,我们还需要考虑如下问题:
总体来说,维表关联有三种基础的方式,分别为实时lookup数据库关联、预加载维表关联和维表变更日志关联。对于不同的关联方式,我们该如何选择呢?我们有哪些考量和选择的依据呢?我们主要通过以下指标来进行衡量。
所谓实时lookup数据库关联,就是在用户自定义函数中通过关联字段直接访问数据库实现关联的方式。
开发量非常小,数据库压力随着流量增长会越来也大,只能基于ProcessingTime做关联(哪怕Flink设置的基于EventTime),如果数据有延迟或者重放,会得到不一致的结果。
实时lookup数据库关联还可以再细分为三种方式:同步lookup数据库关联、异步lookup数据库关联和带缓存的数据库lookup关联。
同步实时数据库lookup关联实现最简单,只需要在一个 RichMapFunction或者 RichFlat-MapFunction中访问数据库,处理好关联逻辑后将结果数据输出即可。
缺点:
适合场景:流量低的作业,且维表更新实时性要求不高/维表更新频率低的场景
异步实时数据库lookup关联需要借助AsyncIO来异步访问维表数据。AsyncIO可以充分利用数据库提供的异步client库并发处理lookup请求,提高task并行实例的吞吐量。AsyncIO提供lookup结果的有序和无序输出,由用户自己选择是否保证event的顺序。
缺点:
适合场景:流量不大的作业,且维表更新实时性要求不高/维表更新频率低的场景
同步和异步lookup方式的通病是数据库负载高(相同维表数据多次lookup还是要走数据库)为了解决这个问题,可引入缓存来减少直接对数据库的请求。而且缓存不需要通过 checkpoint 机制持久化,所以采用本地缓存,例如Guava Cache可以比较轻松的实现。
缺点:
相对于实时lookup数据库关联每条数据都要请求数据库(带缓存时请求略有减少),预加载维表关联是在作业启动时就把维表加载到内存中,不过如果没有命中数据就关联不上了。
预加载维表关联还可以再细分为四种方式:启动时预加载维表、启动时预加载分区维表、启动时预加载维表+定时刷新和启动时预加载维表+定时lookup数据库。
启动时预加载维表实现比较简单,作业初始化时,在用户函数的open方法中读取数据库的维表数据放到内存中,且缓存的维表数据不作为State,每次重启时open方法都被再次执行,从而加载新的维表数据。
缺点:
适合场景:适合维表较小,且维表更新实时性要求不高/维表更新频率低的场景,例如IP库、用户信息表
如果维表很大,每个subtask(task并行实例)都加载全量维表数据会耗费太多内存。因此可以按照业务数据相同的分区规则加载对应的分区维表数据即可。注意这里不要用keyBy(hash的),一定要自定义分区。例如业务数据按照user_id划分为:subtask1(0-499),subtask2(500-100)…,那么在open方法中要根据subtask的id和并行度来计算当前subtask应该记载的维表数据。
优点:
缺点:
适合场景:适合维表大,且维表更新实时性要求不高/维表更新频率低的场景
启动时加载维表有两个主要限制:1、维表大小限制(分区解决);2、维表数据不能更新限制;启动时预加载维表+定时刷新就是为了解决后面那个限制。定时刷新可以采用 ProcessFucntion 提供的 Timer 或者直接在open方法中初始化一个额外线程(池)来做,区别是Timer 要求KeyedStream(Data-Stream#partitionCustom 并不会返回一个 KeyedStream),所以open方法中额外线程处理刷新是个好办法。
优点:
缺点:
适合场景:维表大小都可以,但维表更新实时性要求不是特别高/维表更新频率较低的场景,满足大部分维表关联业务
其实就是将启动预加载维表和实时look两种方式混合使用,将预加载的维表作为缓存给实时lookup使用,未命中则到数据库里查找。
优点:
缺点:
适合场景:适合流量低,且维表更新实时性要求不高/维表更新频率低的场景
二者结合时,注意事项:
前面两种关联方式,是1个流和一个静态表的关联,维表变更日志关联是把维表changelog放到数据流中,然后两个流做join。所谓变更日志类似于MySQL的binlog,通常由维表数据库端将日志push到消息队列(例如Kafka)。
我们称业务数据流为main stream,称维表变更数据流为dimchange stream。
因为维表的变更日志是带时间戳的,因此我们能都知道同一条维表数据在不同时刻的状态。
维表变更日志关联还可以再细分为三种方式:Processing Time 维表变更日志关联、Event Time 维表变更日志关联和Temporal Table Join。
基于ProcessTime做关联,可以利用keyBy 将两个流中关联字段相同的数据划分到KeyedCoProcessFunction的同一分区,然后用MapState把维表数据保存下来。在业务数据流的一条记录进到函数时,到 State 中查找有无符合条件的 join 对象,若有则关联输出结果,若无则根据 join 的类型决定是直接丢弃还是与空值关联。这里要注意的是,State 的大小要尽量控制好。首先是只保存每个 key 最新的维度数据值,其次是要给 State 设置好 TTL,让 Flink 可以自动清理。
优点:
缺点:
适合场景:对维表的变更实时性要求比较高的场景,但数准确性要求不太高的场景
跟ProcessTime维表变更日志关联基本一样,不同的是维表变更日志会保存多个时间版本,当业务数据到来时,根绝他的EventTime找到当时的维表数据跟它关联,而不是总用最新数据,低延时的同时又能大大提高准确性。因为State没有提供基于EventTime的TTL,需要自己设计和时间过期的State的清理策略(例如一个Event Time Timer,定时器不要太多,太多影响性能),也可以根据版本数来控制清理逻辑,比如保存5个版本,当新的维表变更数据到来时,判断是否超过了5个版本,超过了就把最老的版本清理掉。
优点:
缺点:
Temporal Table Join 是 Flink SQL/Table API 原生支持的,它对两个数据流的输入都进行了缓存,因此比基于 Event Time 的维表变更日志关联,它可以容忍任意数据流的延迟,数据准确性更高。
Temporal Table Join 在 SQL/Table API 使用时是十分简单的,但如果想在 DataStream API 中使用,则需要自己实现对应的逻辑。
总体思路是使用一个 KeyedCoProcessFunction,将 dimchange 数据流以时间版本为 key 保存在 MapState 中(与基于 Event Time 的维表变更日志关联相同),再将业务数据流和输出结果也用 State 缓存起来(同样以 Event Time 为 key),一直等到 Watermark 提升到它们对应的 Event Time,才把结果输出和将两个数据流的输入清理掉。
建议不要自己实现,乖乖的用Flink SQL/Table API。
优点:
缺点:
适合场景:实现方式最复杂,准确性最好,适合数据准确性要求高且要求容忍一定延迟(分钟级)的业务
利用Flink实现实时数仓,实现维表关联的方式非常多,既可以实时lookup数据库关联,又可以预加载维表关联,还可以维表变更日志关联。具体选择哪种方式实现,还需要结合前面具体的指标来衡量,并根据业务需求来选择最合适的实现方式。