Bucket Shuffle Join 是在 Doris 0.14 版本中正式加入的新功能。旨在为某些 Join 查询提供本地性优化,来减少数据在节点间的传输耗时,来加速查询。
它的设计、实现和效果可以参阅
上面的图片展示了Bucket Shuffle Join的工作原理。SQL语句为 A表 join B表,并且join的等值表达式命中了A的数据分布列。而Bucket Shuffle Join会根据A表的数据分布信息,将B表的数据发送到对应的A表的数据存储计算节点。Bucket Shuffle Join开销如下:
B < min(3B, A + B)
B <= min(3B, B)
可见,相比于Broadcast Join与Shuffle Join, Bucket Shuffle Join有着较为明显的性能优势。减少数据在节点间的传输耗时和Join时的内存开销。相对于Doris原有的Join方式,它有着下面的优点
将session变量enable_bucket_shuffle_join
设置为true
,则FE在进行查询规划时就会默认将能够转换为Bucket Shuffle Join的查询自动规划为Bucket Shuffle Join。
set enable_bucket_shuffle_join = true;
在FE进行分布式查询规划时,优先选择的顺序为 Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。但是如果用户显式hint了Join的类型,如:
select * from test join [shuffle] baseall on test.k1 = baseall.k1;
则上述的选择优先顺序则不生效。
该session变量在0.14版本默认为true
, 而0.13版本需要手动设置为true
。
可以通过explain
命令来查看Join是否为Bucket Shuffle Join:
| 2:HASH JOIN |
| | join op: INNER JOIN (BUCKET_SHUFFLE) |
| | hash predicates: |
| | colocate: false, reason: table not in the same group |
| | equal join conjunct: `test`.`k1` = `baseall`.`k1`
在Join类型之中会指明使用的Join方式为:BUCKET_SHUFFLE
。
在绝大多数场景之中,用户只需要默认打开session变量的开关就可以透明的使用这种Join方式带来的性能提升,但是如果了解Bucket Shuffle Join的规划规则,可以帮助我们利用它写出更加高效的SQL。
where
条件使分区裁剪的策略能够生效。Colocation Join 是在 Doris 0.9 版本中引入的新功能。旨在为某些 Join 查询提供本地性优化,来减少数据在节点间的传输耗时,加速查询。
最初的设计、实现和效果可以参阅 。
Colocation Join 功能经过一次改版,设计和使用方式和最初设计稍有不同。本文档主要介绍 Colocation Join 的原理、实现、使用方式和注意事项。
注意:这个属性不会被CCR同步,如果这个表是被CCR复制而来的,即PROPERTIES中包含is_being_synced = true
时,这个属性将会在这个表中被擦除。
Colocation Join 功能,是将一组拥有相同 CGS 的 Table 组成一个 CG。并保证这些 Table 对应的数据分片会落在同一个 BE 节点上。使得当 CG 内的表进行分桶列上的 Join 操作时,可以通过直接进行本地数据 Join,减少数据在节点间的传输耗时。
一个表的数据,最终会根据分桶列值 Hash、对桶数取模的后落在某一个分桶内。假设一个 Table 的分桶数为 8,则共有 [0, 1, 2, 3, 4, 5, 6, 7]
8 个分桶(Bucket),我们称这样一个序列为一个 BucketsSequence
。每个 Bucket 内会有一个或多个数据分片(Tablet)。当表为单分区表时,一个 Bucket 内仅有一个 Tablet。如果是多分区表,则会有多个。
为了使得 Table 能够有相同的数据分布,同一 CG 内的 Table 必须保证以下属性相同:
同一个 CG 内的表,分区的个数、范围以及分区列的类型不要求一致。
在固定了分桶列和分桶数后,同一个 CG 内的表会拥有相同的 BucketsSequence。而副本数决定了每个分桶内的 Tablet 的多个副本,存放在哪些 BE 上。假设 BucketsSequence 为 [0, 1, 2, 3, 4, 5, 6, 7]
,BE 节点有 [A, B, C, D]
4个。则一个可能的数据分布如下:
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| A | | B | | C | | D | | A | | B | | C | | D |
| | | | | | | | | | | | | | | |
| B | | C | | D | | A | | B | | C | | D | | A |
| | | | | | | | | | | | | | | |
| C | | D | | A | | B | | C | | D | | A | | B |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
CG 内所有表的数据都会按照上面的规则进行统一分布,这样就保证了,分桶列值相同的数据都在同一个 BE 节点上,可以进行本地数据 Join。
建表时,可以在 PROPERTIES
中指定属性 "colocate_with" = "group_name"
,表示这个表是一个 Colocation Join 表,并且归属于一个指定的 Colocation Group。
示例:
CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);
如果指定的 Group 不存在,则 Doris 会自动创建一个只包含当前这张表的 Group。如果 Group 已存在,则 Doris 会检查当前表是否满足 Colocation Group Schema。如果满足,则会创建该表,并将该表加入 Group。同时,表会根据已存在的 Group 中的数据分布规则创建分片和副本。 Group 归属于一个 Database,Group 的名字在一个 Database 内唯一。在内部存储是 Group 的全名为 dbId_groupName
,但用户只感知 groupName。
SinceVersion dev
2.0 版本中,Doris 支持了跨Database的 Group。在建表时,需使用关键词 __global__
作为 Group 名称的前缀。如:
CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "__global__group1"
);
__global__
前缀的 Group 不再归属于一个 Database,其名称也是全局唯一的。
通过创建 Global Group,可以实现跨 Database 的 Colocate Join。
当 Group 中最后一张表彻底删除后(彻底删除是指从回收站中删除。通常,一张表通过 DROP TABLE
命令删除后,会在回收站默认停留一天的时间后,再删除),该 Group 也会被自动删除。
以下命令可以查看集群内已存在的 Group 信息。
SHOW PROC '/colocation_group';
+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
Colocation 副本均衡和修复
一节)。通过以下命令可以进一步查看一个 Group 的数据分布情况:
SHOW PROC '/colocation_group/10005.10008';
+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
可以对一个已经创建的表,修改其 Colocation Group 属性。示例:
ALTER TABLE tbl SET ("colocate_with" = "group2");
也可以通过以下命令,删除一个表的 Colocation 属性:
ALTER TABLE tbl SET ("colocate_with" = "");
当对一个具有 Colocation 属性的表进行增加分区(ADD PARTITION)、修改副本数时,Doris 会检查修改是否会违反 Colocation Group Schema,如果违反则会拒绝。
Colocation 表的副本分布需要遵循 Group 中指定的分布,所以在副本修复和均衡方面和普通分片有所区别。
Group 自身有一个 Stable 属性,当 Stable 为 true 时,表示当前 Group 内的表的所有分片没有正在进行变动,Colocation 特性可以正常使用。当 Stable 为 false 时(Unstable),表示当前 Group 内有部分表的分片正在做修复或迁移,此时,相关表的 Colocation Join 将退化为普通 Join。
副本只能存储在指定的 BE 节点上。所以当某个 BE 不可用时(宕机、Decommission 等),需要寻找一个新的 BE 进行替换。Doris 会优先寻找负载最低的 BE 进行替换。替换后,该 Bucket 内的所有在旧 BE 上的数据分片都要做修复。迁移过程中,Group 被标记为 Unstable。
Doris 会尽力将 Colocation 表的分片均匀分布在所有 BE 节点上。对于普通表的副本均衡,是以单副本为粒度的,即单独为每一个副本寻找负载较低的 BE 节点即可。而 Colocation 表的均衡是 Bucket 级别的,即一个 Bucket 内的所有副本都会一起迁移。我们采用一个简单的均衡算法,即在不考虑副本实际大小,而只根据副本数量,将 BucketsSequence 均匀的分布在所有 BE 上。具体算法可以参阅 ColocateTableBalancer.java
中的代码注释。
注1:当前的 Colocation 副本均衡和修复算法,对于异构部署的 Doris 集群效果可能不佳。所谓异构部署,即 BE 节点的磁盘容量、数量、磁盘类型(SSD 和 HDD)不一致。在异构部署情况下,可能出现小容量的 BE 节点和大容量的 BE 节点存储了相同的副本数量。
注2:当一个 Group 处于 Unstable 状态时,其中的表的 Join 将退化为普通 Join。此时可能会极大降低集群的查询性能。如果不希望系统自动均衡,可以设置 FE 的配置项
disable_colocate_balance
来禁止自动均衡。然后在合适的时间打开即可。(具体参阅高级操作
一节)
对 Colocation 表的查询方式和普通表一样,用户无需感知 Colocation 属性。如果 Colocation 表所在的 Group 处于 Unstable 状态,将自动退化为普通 Join。
举例说明:
表1:
CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
表2:
CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
查看查询计划:
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN |
| | hash predicates: |
| | colocate: true |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl2 |
| | PREAGGREGATION: OFF. Reason: null |
| | partitions=0/1 |
| | rollup: null |
| | buckets=0/0 |
| | cardinality=-1 |
| | avgRowSize=0.0 |
| | numNodes=0 |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
+----------------------------------------------------+
如果 Colocation Join 生效,则 Hash Join 节点会显示 colocate: true
。
如果没有生效,则查询计划如下:
+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+
HASH JOIN 节点会显示对应原因:colocate: false, reason: group is not stable
。同时会有一个 EXCHANGE 节点生成。
disable_colocate_relocate
是否关闭 Doris 的自动 Colocation 副本修复。默认为 false,即不关闭。该参数只影响 Colocation 表的副本修复,不影响普通表。
disable_colocate_balance
是否关闭 Doris 的自动 Colocation 副本均衡。默认为 false,即不关闭。该参数只影响 Colocation 表的副本均衡,不影响普通表。
以上参数可以动态修改,设置方式请参阅 HELP ADMIN SHOW CONFIG;
和 HELP ADMIN SET CONFIG;
。
disable_colocate_join
是否关闭 Colocation Join 功能。在 0.10 及之前的版本,默认为 true,即关闭。在之后的某个版本中将默认为 false,即开启。
use_new_tablet_scheduler
在 0.10 及之前的版本中,新的副本调度逻辑与 Colocation Join 功能不兼容,所以在 0.10 及之前版本,如果 disable_colocate_join = false
,则需设置 use_new_tablet_scheduler = false
,即关闭新的副本调度器。之后的版本中,use_new_tablet_scheduler
将衡为 true。
Doris 提供了几个和 Colocation Join 有关的 HTTP Restful API,用于查看和修改 Colocation Group。
该 API 实现在 FE 端,使用 fe_host:fe_http_port
进行访问。需要 ADMIN 权限。
查看集群的全部 Colocation 信息
GET /api/colocate
返回以 Json 格式表示内部 Colocation 信息。
{
"msg": "success",
"code": 0,
"data": {
"infos": [
["10003.12002", "10003_group1", "10037, 10043", "1", "1", "int(11)", "true"]
],
"unstableGroupIds": [],
"allGroupIds": [{
"dbId": 10003,
"grpId": 12002
}]
},
"count": 0
}
将 Group 标记为 Stable 或 Unstable
标记为 Stable
POST /api/colocate/group_stable?db_id=10005&group_id=10008
返回:200
标记为 Unstable
DELETE /api/colocate/group_stable?db_id=10005&group_id=10008
返回:200
设置 Group 的数据分布
该接口可以强制设置某一 Group 的数分布。
POST /api/colocate/bucketseq?db_id=10005&group_id=10008
Body:
[[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]
返回 200
其中 Body 是以嵌套数组表示的 BucketsSequence 以及每个 Bucket 中分片分布所在 BE 的 id。
注意,使用该命令,可能需要将 FE 的配置 disable_colocate_relocate
和 disable_colocate_balance
设为 true。即关闭系统自动的 Colocation 副本修复和均衡。否则可能在修改后,会被系统自动重置。
Runtime Filter 是在 Doris 0.15 版本中正式加入的新功能。旨在为某些 Join 查询在运行时动态生成过滤条件,来减少扫描的数据量,避免不必要的I/O和网络传输,从而加速查询。
它的设计、实现和效果可以参阅 。
A join B on A.a=B.b
中的A.a=B.b
,在查询规划时基于此生成join conjuncts,包含join Build和Probe使用的expr,其中Build expr在Runtime Filter中称为src expr,Probe expr在Runtime Filter中称为target expr。Runtime Filter在查询规划时生成,在HashJoinNode中构建,在ScanNode中应用。
举个例子,当前存在T1表与T2表的Join查询,它的Join方式为HashJoin,T1是一张事实表,数据行数为100000,T2是一张维度表,数据行数为2000,Doris join的实际情况是:
| > HashJoinNode <
| | |
| | 100000 | 2000
| | |
| OlapScanNode OlapScanNode
| ^ ^
| | 100000 | 2000
| T1 T2
|
显而易见对T2扫描数据要远远快于T1,如果我们主动等待一段时间再扫描T1,等T2将扫描的数据记录交给HashJoinNode后,HashJoinNode根据T2的数据计算出一个过滤条件,比如T2数据的最大和最小值,或者构建一个Bloom Filter,接着将这个过滤条件发给等待扫描T1的ScanNode,后者应用这个过滤条件,将过滤后的数据交给HashJoinNode,从而减少probe hash table的次数和网络开销,这个过滤条件就是Runtime Filter,效果如下:
| > HashJoinNode <
| | |
| | 6000 | 2000
| | |
| OlapScanNode OlapScanNode
| ^ ^
| | 100000 | 2000
| T1 T2
|
如果能将过滤条件(Runtime Filter)下推到存储引擎,则某些情况下可以利用索引来直接减少扫描的数据量,从而大大减少扫描耗时,效果如下:
| > HashJoinNode <
| | |
| | 6000 | 2000
| | |
| OlapScanNode OlapScanNode
| ^ ^
| | 6000 | 2000
| T1 T2
|
可见,和谓词下推、分区裁剪不同,Runtime Filter是在运行时动态生成的过滤条件,即在查询运行时解析join on clause确定过滤表达式,并将表达式广播给正在读取左表的ScanNode,从而减少扫描的数据量,进而减少probe hash table的次数,避免不必要的I/O和网络传输。
Runtime Filter主要用于大表join小表的优化,如果左表的数据量太小,或者右表的数据量太大,则Runtime Filter可能不会取得预期效果。
与Runtime Filter相关的查询选项信息,请参阅以下部分:
runtime_filter_type
: 包括Bloom Filter、MinMax Filter、IN predicate、IN Or Bloom Filter、Bitmap Filter,默认会使用IN Or Bloom Filter,部分情况下同时使用Bloom Filter、MinMax Filter、IN predicate时性能更高。runtime_filter_mode
: 用于调整Runtime Filter的下推策略,包括OFF、LOCAL、GLOBAL三种策略,默认设置为GLOBAL策略runtime_filter_wait_time_ms
: 左表的ScanNode等待每个Runtime Filter的时间,默认1000msruntime_filters_max_num
: 每个查询可应用的Runtime Filter中Bloom Filter的最大数量,默认10runtime_bloom_filter_min_size
: Runtime Filter中Bloom Filter的最小长度,默认1048576(1M)runtime_bloom_filter_max_size
: Runtime Filter中Bloom Filter的最大长度,默认16777216(16M)runtime_bloom_filter_size
: Runtime Filter中Bloom Filter的默认长度,默认2097152(2M)runtime_filter_max_in_num
: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认1024下面对查询选项做进一步说明。
使用的Runtime Filter类型。
类型: 数字(1, 2, 4, 8, 16)或者相对应的助记符字符串(IN, BLOOM_FILTER, MIN_MAX, IN_OR_BLOOM_FILTER
, BITMAP_FILTER),默认8(IN_OR_BLOOM_FILTER
),使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,例如:
set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
等价于:
set runtime_filter_type=7;
使用注意事项
runtime_filter_max_in_num
调整),否则使用Bloom filter。用于控制Runtime Filter在instance之间传输的范围。
类型: 数字(0, 1, 2)或者相对应的助记符字符串(OFF, LOCAL, GLOBAL),默认2(GLOBAL)。
使用注意事项
LOCAL:相对保守,构建的Runtime Filter只能在同一个instance(查询执行的最小单元)上同一个Fragment中使用,即Runtime Filter生产者(构建Filter的HashJoinNode)和消费者(使用RuntimeFilter的ScanNode)在同一个Fragment,比如broadcast join的一般场景;
GLOBAL:相对激进,除满足LOCAL策略的场景外,还可以将Runtime Filter合并后通过网络传输到不同instance上的不同Fragment中使用,比如Runtime Filter生产者和消费者在不同Fragment,比如shuffle join。
大多数情况下GLOBAL策略可以在更广泛的场景对查询进行优化,但在有些shuffle join中生成和合并Runtime Filter的开销超过给查询带来的性能优势,可以考虑更改为LOCAL策略。
如果集群中涉及的join查询不会因为Runtime Filter而提高性能,您可以将设置更改为OFF,从而完全关闭该功能。
在不同Fragment上构建和应用Runtime Filter时,需要合并Runtime Filter的原因和策略可参阅
Runtime Filter的等待耗时。
类型: 整数,默认1000,单位ms
使用注意事项
在开启Runtime Filter后,左表的ScanNode会为每一个分配给自己的Runtime Filter等待一段时间再扫描数据,即如果ScanNode被分配了3个Runtime Filter,那么它最多会等待3000ms。
因为Runtime Filter的构建和合并均需要时间,ScanNode会尝试将等待时间内到达的Runtime Filter下推到存储引擎,如果超过等待时间后,ScanNode会使用已经到达的Runtime Filter直接开始扫描数据。
如果Runtime Filter在ScanNode开始扫描之后到达,则ScanNode不会将该Runtime Filter下推到存储引擎,而是对已经从存储引擎扫描上来的数据,在ScanNode上基于该Runtime Filter使用表达式过滤,之前已经扫描的数据则不会应用该Runtime Filter,这样得到的中间数据规模会大于最优解,但可以避免严重的裂化。
如果集群比较繁忙,并且集群上有许多资源密集型或长耗时的查询,可以考虑增加等待时间,以避免复杂查询错过优化机会。如果集群负载较轻,并且集群上有许多只需要几秒的小查询,可以考虑减少等待时间,以避免每个查询增加1s的延迟。
每个查询生成的Runtime Filter中Bloom Filter数量的上限。
类型: 整数,默认10
使用注意事项 目前仅对Bloom Filter的数量进行限制,因为相比MinMax Filter和IN predicate,Bloom Filter构建和应用的代价更高。
如果生成的Bloom Filter超过允许的最大数量,则保留选择性大的Bloom Filter,选择性大意味着预期可以过滤更多的行。这个设置可以防止Bloom Filter耗费过多的内存开销而导致潜在的问题。
选择性=(HashJoinNode Cardinality / HashJoinNode left child Cardinality)
-- 因为目前FE拿到Cardinality不准,所以这里Bloom Filter计算的选择性与实际不准,因此最终可能只是随机保留了部分Bloom Filter。
仅在对涉及大表间join的某些长耗时查询进行调优时,才需要调整此查询选项。
包括runtime_bloom_filter_min_size
、runtime_bloom_filter_max_size
、runtime_bloom_filter_size
,用于确定Runtime Filter使用的Bloom Filter数据结构的大小(以字节为单位)。
类型: 整数
使用注意事项 因为需要保证每个HashJoinNode构建的Bloom Filter长度相同才能合并,所以目前在FE查询规划时计算Bloom Filter的长度。
如果能拿到join右表统计信息中的数据行数(Cardinality),会尝试根据Cardinality估计Bloom Filter的最佳大小,并四舍五入到最接近的2的幂(以2为底的log值)。如果无法拿到右表的Cardinality,则会使用默认的Bloom Filter长度runtime_bloom_filter_size
。runtime_bloom_filter_min_size
和runtime_bloom_filter_max_size
用于限制最终使用的Bloom Filter长度最小和最大值。
更大的Bloom Filter在处理高基数的输入集时更有效,但需要消耗更多的内存。假如查询中需要过滤高基数列(比如含有数百万个不同的取值),可以考虑增加runtime_bloom_filter_size
的值进行一些基准测试,这有助于使Bloom Filter过滤的更加精准,从而获得预期的性能提升。
Bloom Filter的有效性取决于查询的数据分布,因此通常仅对一些特定查询额外调整其Bloom Filter长度,而不是全局修改,一般仅在对涉及大表间join的某些长耗时查询进行调优时,才需要调整此查询选项。
explain
命令可以显示的查询计划中包括每个Fragment使用的join on clause信息,以及Fragment生成和使用Runtime Filter的注释,从而确认是否将Runtime Filter应用到了期望的join on clause上。
runtime filters: filter_id[type] <- table.column
。runtime filters: filter_id[type] -> table.column
。下面例子中的查询使用了一个ID为RF000的Runtime Filter。
CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2 PROPERTIES("replication_num" = "1");
INSERT INTO test VALUES (1), (2), (3), (4);
CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2 PROPERTIES("replication_num" = "1");
INSERT INTO test2 VALUES (3), (4), (5);
EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+-------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`t1` |
| |
| 4:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test`.`t1` |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BUCKET_SHUFFLE) |
| | equal join conjunct: `test`.`t1` = `test2`.`t2` |
| | runtime filters: RF000[in] <- `test2`.`t2` |
| | |
| |----3:EXCHANGE |
| | |
| 0:OlapScanNode |
| TABLE: test |
| runtime filters: RF000[in] -> `test`.`t1` |
| |
| PLAN FRAGMENT 2 |
| OUTPUT EXPRS: |
| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test2`.`t2` |
| |
| 1:OlapScanNode |
| TABLE: test2 |
+-------------------------------------------------------------------+
-- 上面`runtime filters`的行显示了`PLAN FRAGMENT 1`的`2:HASH JOIN`生成了ID为RF000的IN predicate,
-- 其中`test2`.`t2`的key values仅在运行时可知,
-- 在`0:OlapScanNode`使用了该IN predicate用于在读取`test`.`t1`时过滤不必要的数据。
SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
-- 返回2行结果[3, 4];
-- 通过query的profile(set enable_profile=true;)可以查看查询内部工作的详细信息,
-- 包括每个Runtime Filter是否下推、等待耗时、以及OLAP_SCAN_NODE从prepare到接收到Runtime Filter的总时长。
RuntimeFilter:in:
- HasPushDownToEngine: true
- AWaitTimeCost: 0ns
- EffectTimeCost: 2.76ms
-- 此外,在profile的OLAP_SCAN_NODE中还可以查看Runtime Filter下推后的过滤效果和耗时。
- RowsVectorPredFiltered: 9.320008M (9320008)
- VectorPredEvalTime: 364.39ms
HLL
或者BITMAP
;COALESCE/IFNULL/CASE
,因为当outer join上层其他join的join on clause包含NULL-checking表达式并生成Runtime Filter时,将这个Runtime Filter下推到outer join的左表时可能导致结果不正确;PlanNode.Conjuncts
生成的Runtime Filter下推,与HashJoinNode的eqJoinConjuncts
和otherJoinConjuncts
不同,PlanNode.Conjuncts
生成的Runtime Filter在测试中发现可能会导致错误的结果,例如IN
子查询转换为join时,自动生成的join on clause将保存在PlanNode.Conjuncts
中,此时应用Runtime Filter可能会导致结果缺少一些行。Doris 支持两种物理算子,一类是 Hash Join,另一类是 Nest Loop Join。
作为分布式的 MPP 数据库, 在 Join 的过程中是需要进行数据的 Shuffle。数据需要进行拆分调度,才能保证最终的 Join 结果是正确的。举个简单的例子,假设关系S 和 R 进行Join,N 表示参与 Join 计算的节点的数量;T 则表示关系的 Tuple 数目。
Doris 支持 4 种 Shuffle 方式
Broadcast Join
它要求把右表全量的数据都发送到左表上,即每一个参与 Join 的节点,它都拥有右表全量的数据,也就是 T(R)。
它适用的场景是比较通用的,同时能够支持 Hash Join 和 Nest loop Join,它的网络开销 N * T(R)。
左表数据不移动,右表数据发送到左表数据的扫描节点。
Shuffle Join
当进行 Hash Join 时候,可以通过 Join 列计算对应的 Hash 值,并进行 Hash 分桶。
它的网络开销则是:T(S) + T(R),但它只能支持 Hash Join,因为它是根据 Join 的条件也去做计算分桶的。
左右表数据根据分区,计算的结果发送到不同的分区节点上。
Bucket Shuffle Join
Doris 的表数据本身是通过 Hash 计算分桶的,所以就可以利用表本身的分桶列的性质来进行 Join 数据的 Shuffle。假如两张表需要做 Join,并且 Join 列是左表的分桶列,那么左表的数据其实可以不用去移动右表通过左表的数据分桶发送数据就可以完成 Join 的计算。
它的网络开销则是:T(R)相当于只 Shuffle 右表的数据就可以了。
左表数据不移动,右表数据根据分区计算的结果发送到左表扫表的节点
Colocate
它与 Bucket Shuffle Join 相似,相当于在数据导入的时候,根据预设的 Join 列的场景已经做好了数据的 Shuffle。那么实际查询的时候就可以直接进行 Join 计算而不需要考虑数据的 Shuffle 问题了。
数据已经预先分区,直接在本地进行 Join 计算
Shuffle方式 | 网络开销 | 物理算子 | 适用场景 |
---|---|---|---|
BroadCast | N * T(R) | Hash Join / Nest Loop Join | 通用 |
Shuffle | T(S) + T(R) | Hash Join | 通用 |
Bucket Shuffle | T(R) | Hash Join | Join条件中存在左表的分布式列,且左表执行时为单分区 |
Colocate | 0 | Hash Join | Join条件中存在左表的分布式列,且左右表同属于一个Colocate Group |
N : 参与 Join 计算的 Instance 个数
T(关系) : 关系的 Tuple 数目
上面这 4 种方式灵活度是从高到低的,它对这个数据分布的要求是越来越严格,但 Join 计算的性能也是越来越好的。
Doris 在进行 Hash Join 计算时会在右表构建一个哈希表,左表流式的通过右表的哈希表从而得出 Join 结果。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的时候,同时生成一个基于哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点。通过这样的方式,Doris 可以在运行时进行数据过滤。
假如左表是一张大表,右表是一张小表,那么利用右表生成的过滤条件就可以把绝大多数在 Join 层要过滤的数据在数据读取时就提前过滤,这样就能大幅度的提升 Join 查询的性能。
当前 Doris 支持三种类型 RuntimeFilter
Runtime Filter 适用的场景有两个要求:
当符合上面两个条件的情况下,开启 Runtime Filter 就能收获比较好的效果
当 Join 列为左表的 Key 列时,RuntimeFilter 会下推到存储引擎。Doris 本身支持延迟物化,
延迟物化简单来说是这样的:假如需要扫描 A、B、C 三列,在 A 列上有一个过滤条件: A 等于 2,要扫描 100 行的话,可以先把 A 列的 100 行扫描出来,再通过 A = 2 这个过滤条件过滤。之后通过过滤完成后的结果,再去读取 B、C 列,这样就能极大的降低数据的读取 IO。所以说 Runtime Filter 如果在 Key 列上生成,同时利用 Doris 本身的延迟物化来进一步提升查询的性能。
Doris 提供了三种不同的 Runtime Filter 类型:
数据库一旦涉及到多表 Join,Join 的顺序对整个 Join 查询的性能是影响很大的。假设有三张表 Join,参考下面这张图,左边是 a 表跟 b 张表先做 Join,中间结果的有 2000 行,然后与 c 表再进行 Join 计算。
接下来看右图,把 Join 的顺序调整了一下。把 a 表先与 c 表 Join,生成的中间结果只有 100,然后最终再与 b 表 Join 计算。最终的 Join 结果是一样的,但是它生成的中间结果有 20 倍的差距,这就会产生一个很大的性能 Diff 了。
Doris 目前支持基于规则的 Join Reorder 算法。它的逻辑是:
Doris Join 调优的方法:
上面的 4 步基本上完成了一个标准的 Join 调优流程,接着就是实际去查询验证它,看看效果到底怎么样。
如果前面 4 种方式串联起来之后,还是不奏效。这时候可能就需要去做 Join 语句的改写,或者是数据分布的调整、需要重新去 Recheck 整个数据分布是否合理,包括查询 Join 语句,可能需要做一些手动的调整。当然这种方式是心智成本是比较高的,也就是说要在尝试前面方式不奏效的情况下,才需要去做进一步的分析。
一个四张表 Join 的查询,通过 Profile 的时候发现第二个 Join 耗时很高,耗时 14 秒。
进一步分析 Profile 之后,发现 BuildRows,就是右表的数据量是大概 2500 万。而 ProbeRows ( ProbeRows 是左表的数据量)只有 1 万多。这种场景下右表是远远大于左表,这显然是个不合理的情况。这显然说明 Join 的顺序出现了一些问题。这时候尝试改变 Session 变量,开启 Join Reorder。
set enable_cost_based_join_reorder = true
这次耗时从 14 秒降到了 4 秒,性能提升了 3 倍多。
此时再 Check Profile 的时候,左右表的顺序已经调整正确,即右表是小表,左表是大表。基于小表去构建哈希表,开销是很小的,这就是典型的一个利用 Join Reorder 去提升 Join 性能的一个场景
存在一个慢查询,查看 Profile 之后,整个 Join 节点耗时大概44秒。它的右表有 1000 万,左表有 6000 万,最终返回的结果也只有 6000 万。
这里可以大致的估算出过滤率是很高的,那为什么 Runtime Filter 没有生效呢?通过 Query Plan 去查看它,发现它只开启了 IN 的 Runtime Filter。
当右表超过1024行的话, IN 是不生效的,所以根本起不到什么过滤的效果,所以尝试调整 RuntimeFilter 的类型。
这里改为了 BloomFilter,左表的 6000 万条数据过滤了 5900 万条。基本上 99% 的数据都被过滤掉了,这个效果是很显著的。查询也从原来的 44 秒降到了 13 秒,性能提升了大概也是三倍多。
下面是一个比较极端的 Case,通过一些环境变量调优也没有办法解决,因为它涉及到 SQL Rewrite,所以这里列出来了原始的 SQL 。
select 100.00 * sum (case
when P_type like 'PROMOS'
then 1 extendedprice * (1 - 1 discount)
else 0
end ) / sum(1 extendedprice * (1 - 1 discount)) as promo revenue
from lineitem, part
where
1_partkey = p_partkey
and 1_shipdate >= date '1997-06-01'
and 1 shipdate < date '1997-06-01' + interval '1' month
这个 Join 查询是很简单的,单纯的一个左右表的 Join 。当然它上面有一些过滤条件,打开 Profile 的时候,发现整个查询 Hash Join 执行了三分多钟,它是一个 BroadCast 的 Join,它的右表有 2 亿条,左表只有 70 万。在这种情况下选择了 Broadcast Join 是不合理的,这相当于要把 2 亿条做一个 Hash Table,然后用 70 万条遍历两亿条的 Hash Table ,这显然是不合理的。
为什么会产生不合理的 Join 顺序呢?其实这个左表是一个 10 亿条级别的大表,它上面加了两个过滤条件,加完这两个过滤条件之后, 10 亿条的数据就剩 70 万条了。但 Doris 目前没有一个好的统计信息收集的框架,所以它不知道这个过滤条件的过滤率到底怎么样。所以这个 Join 顺序安排的时候,就选择了错误的 Join 的左右表顺序,导致它的性能是极其低下的。
下图是改写完成之后的一个 SQL 语句,在 Join 后面添加了一个Join Hint,在Join 后面加一个方括号,然后把需要的 Join 方式写入。这里选择了 Shuffle Join,可以看到右边它实际查询计划里面看到这个数据确实是做了 Partition ,原先 3 分钟的耗时通过这样的改写完之后只剩下 7 秒,性能提升明显
最后我们总结 Doris Join 优化调优的四点建议: