Compaction机制要解决的第一个问题,就是如何选取合适的Tablet进行Compaction。Doris的compaction机制采用“生产者-消费者”(producer-consumer)模式,由producer线程持续生产compaction任务,并将生产出的compaction任务提交给compaction线程池进行消费执行,如图1所示。
Doris BE启动时,会在后台启动一个compaction producer线程,同时会创建一个compaction线程池。producer线程持续地生产compaction任务。
在一轮任务生产过程中,会从每个磁盘各选择出一个tablet执行compaction任务。如果compaction线程池中某个磁盘的compaction任务数达到了上限(通过compaction_task_num_per_disk配置,默认值为2),则这一轮任务生产会跳过该磁盘。如果某一轮生产过程从所有磁盘均没有生产出compaction任务(即compaction线程池中每个磁盘的任务数都已达到上限),则producer线程会进入休眠状态,直到2秒超时唤醒,或线程池中某个compaction任务执行完成被唤醒。
在一轮compaction任务生产过程中,进行单个磁盘的任务生产时,需要遍历BE节点上所有的tablet,首先过滤掉不满足条件的tablet,比如:其他磁盘上的tablet、已经提交给compaction线程池的tablet、正在执行alter操作的tablet、初始化失败的tablet、上次compaction任务失败距离当前时刻的时间间隔小于设定阈值(通过min_compaction_failure_interval_sec配置,默认值为600秒)的tablet,然后从剩余的满足条件的tablet中选择tablet score最高的tablet执行compaction任务。
tablet score通过如下公式计算:
ablet_score = k1 * scan_frequency + k2 * compaction_score
其中,k1和k2分别可以通过参数compaction_tablet_scan_frequency_factor(默认值为0)和参数compaction_tablet_compaction_score_factor(默认值为1)动态配置。scan_frequency 表示tablet当前一段时间的scan频率。compaction score的计算方法会在本文的后面进行详细地介绍。
可以通过参数generate_compaction_tasks_min_interval_ms动态配置任务生产的频率,默认值为10ms,即每生产一轮compaction任务,producer线程会休眠10ms。
可以通过参数cumulative_compaction_rounds_for_each_base_compaction_round动态配置cumulative compaction和base compaction的生产周期,默认值为9,即每生产9轮cumulative compaction任务,然后会生产1轮base compaction任务。
可以通过参数disable_auto_compaction动态配置是否关闭compaction producer的任务生产,默认值为false,即不关闭producer的任务生产。
producer生产出的compaction任务需要提交给compaction线程池执行。为了调节BE节点compaction的内存使用量,Doris增加了对compaction任务提交的permission机制,如图2所示。系统维持一定数量的compaction permits(通过参数total_permits_for_compaction_score配置),每一个compaction任务提交给线程池之前都需要向系统申请permits(permits request),只有获得系统分配的permits之后任务才能被提交给compaction线程池,compaction任务在线程池中执行结束之后需要将自己持有permits归还给系统(permits release)。如果系统当前剩余的可分配的compaction permits数量小于本次compaction任务需要的permits数量,则本次任务提交会被阻塞(compaction任务提交是串行执行的,其他需要提交的任务也会被阻塞),直到有其他compaction任务执行结束并释放permits,使得系统有足够数量的permits分配给当前compaction任务。如果某一个compaction任务需要的permits数量超过系统维持的permits总数,则允许当线程池中所有的任务都执行结束之后,将该compaction任务提交给线程池执行。
Doris中单个compaction任务执行过程中的内存使用量与本次compaction任务合并的segment文件数量有关。一个rowset会包含多个segment文件,而一个compaction任务可能包含多个rowset。因此,使用compaction任务中需要合并的segment文件数量作为compaction任务的permits。通过调整系统维持的compaction permits总量可以对BE节点compaction的内存使用量进行调节。
Compaction任务可以概括为两个阶段:compaction preparation和compaction execution,如图3所示。compaction preparation阶段主要是从tablet中选出需要进行版本合并的rowsets,compaction execution阶段主要进行rowsets的版本合并操作。
在Doris中,compaction任务的preparation阶段在permits request之前执行,从tablet中选出需要进行版本合并的rowsets,通过需要合并的segment文件数量计算compaction permits。compaction任务的execution阶段会真正在线程池中执行,进行版本的合并,如图4所示。
Compaction任务提交到线程池之后,可能会在线程池的等待队列中等待较长的时间都没有被调度,当前tablet在这期间可能发生过clone操作,导致compaction preparation阶段选出的需要进行版本合并的rowset发生了改变,因此,在compaction execution阶段一开始需要判断任务等待调度期间tablet是否发生过clone操作,如果发生过clone操作,则本次compaction任务退出,否则,正常执行rowsets的合并。
Doris也提供了http接口,支持手动触发单个tablet的cumulative compaction或base compaction。
Doris的cumulative compaction每次会在cumulative point之后选择相邻的数个rowset进行合并,主要包含5个步骤,分别是计算cumulative point、生成candidate rowsets、选择input rowsets、执行rowsets合并以及更新cumulative point,如图1。其中,前面三个步骤属于compaction preparation阶段,后面两个步骤属于compaction execution阶段。
目前可供选择的cumulative compaction策略有两种:num_based cumulative compaction和size_based cumulative compaction。cumulative compaction的策略选择可以通过参数cumulative_compaction_policy进行配置(默认为size_based)。num_based cumulative compaction是基于rowset的文件数量进行compaction的选择,该策略会在后面的版本中被丢弃。Size_based cumulative compaction策略通过计算每个rowset的大小来决定compaction的选择,可以显著地减少写放大的系数。
下面将详细地对size_based cumulative compaction策略进行介绍。
版本号比cumulative point小的rowset仅会执行base compaction,而版本号比cumulative point大的rowset仅会执行cumulutive compaction。将一个rowset从cumulative 侧移动到base侧(即增大cumulative point)的行为称为一次Promotion。
如果tablet当前的cumulative point值为-1(初始值),则本次计算的cumulative point值不变,仍为-1;否则,执行以下操作进行cumulative point的计算:
(1) 对tablet下所有的rowset按照版本先后进行排序;
(2) Doris会通过计算promotion size来决定是否要对一个rowset执行Promotion。根据base rowset(start version为0)的大小计算tablet当前的promotion_size:
promotion_size = base_rowset_size * ratio
其中,ratio值可以通过cumulative_size_based_promotion_ratio配置,默认值为0.05。promotion_size被限定在cumulative_size_based_promotion_size_mbytes(默认值为1024MB)与cumulative_size_based_promotion_min_size_mbytes(默认值为64MB)之间,promotion size的计算流程如图2所示。
(3)从base rowset开始,依次遍历每一个rowset,当遇到以下情况,则停止遍历,更新tablet的cumulative point:
a.当前rowset与前一个rowset之间出现版本缺失,则更新cumulative point为前一个rowset的end_version+1;
b.当前rowset不是数据删除版本,同时当前rowset没有发生过版本合并,或当前rowset中的segment文件之间存在overlapping,则更新cumulative point为当前rowset的start_version;
c.当前rowset不是数据删除版本,同时当前rowset的大小小于promotion_size,则更新cumulative point为当前rowset的start_version。
依次遍历tablet中按照版本先后排序好的每一个rowset,如果某一个rowset的版本位于cumulative point之后;并且该rowset的创建时间距离当前时刻超过设定的时间间隔(可以通过cumulative_compaction_skip_window_seconds配置,默认值为30秒),或者该rowset参与过版本合并(rowset的start_version与end_version不相等),则将该rowset作为一个候选rowset。将所有候选rowset依次保存在向量candidate rowsets中。
(1)寻找candidate rowsets中最大的连续版本序列。
遍历candidate rowsets中的每一个rowset,如果某两个相邻的rowset之间出现版本缺失,则将candidate rowsets中第一个缺失版本之前的所有rowset作为新的candidate rowsets;
(2)生成input rowsets。
遍历candidate rowsets中的每一个rowset,将访问完成的rowset保存在向量input rowsets中,当遇到以下情况,遍历结束:
a.某一个rowset为数据删除版本(并使用last_delete_version记录当前数据删除版本,last_delete_version初始值为-1),并且input rowsets中的rowset数量不为0(如果input rowsets中的rowset数量为0,则跳过当前rowset,继续访问下一个rowset);
b.input rowsets中的rowset score(表示rowset中的segment文件数目)之和达到上限阈值(通过max_cumulative_compaction_num_singleton_deltas配置,默认值为1000);
c.遍历过程正常完成,input rowsets中包含了candidate rowsets中所有的rowset。
(3)调整input rowsets。
a.如果input rowsets中所有的rowset大小之和达到promotion_size,则不需要调整input rowsets。
b.如果存在数据删除版本的记录(last_delete_version 不为-1,即生成input rowsets的遍历过程因为存在删除数据版本而结束),并且input rowsets中的rowset数量不为1,则不需要调整input rowsets;如果存在数据删除版本的记录,并且input rowsets中的rowset数量为1,同时该rowset中的segment文件之间存在overlapping,则不需要调整input rowsets;如果存在数据删除版本的记录,并且input rowsets中的rowset数量为1,同时该rowset中的segment文件之间不存在overlapping,则清空input rowsets。
c.如果不存在数据删除版本的记录(last_delete_version 为-1),则遍历input rowsets中的rowset。从第一个rowset开始,计算当前rowset的大小等级(current_level),同时计算input rowsets中除当前rowset之外的其他rowset大小之和的等级(remain_level),如果current_level > remain_level,则从input rowsets中删除当前rowset,否则,停止遍历。
【注】level等级划分由参数cumulative_size_based_promotion_size_mbytes (默认值为1024MB)和cumulative_size_based_compaction_lower_size_mbytes (默认值为64MB)确定。最高的level值为cumulative_size_based_promotion_size_mbytes / 2,下一级level值为上一级level值的1/2,直到level值小于cumulative_size_based_compaction_lower_size_mbytes,则设置该级level值为0,level等级划分流程如图3所示。
计算某一个rowset的level值时,如果level[n-1] > rowset_size >= level[n],则该rowset的level值为level[n]。
将input rowsets中的所有rowset进行合并,生成一个output rowset。在执行rowsets合并时,会创建一个Reader和一个Rowset Writer,Rowset Writer与output rowset相对应。
在Reader底层逻辑中,input rowsets中的每一个rowset都会对应一个Rowset Reader。Reader按照key的排序规则逐行读出input rowsets中的数据,然后通过Rowset Writer写入output rowset。aggregation key数据模型和unique key数据模型中,key相同但分散在不同Rowset中的数据行会在rowsets合并后完成聚合。cumulative compaction不会将delete操作删除的数据行进行真正地删除,这部分工作会在base compaction中进行。
cumulative compaction执行结束之后,需要更新cumulative point。
(1)如果存在数据删除版本的记录(last_delete_version 不为-1,即生成input rowsets的遍历过程因为存在删除数据版本而结束),则更新cumulative point为output_rowset的end_version+1;
(2)如果不存在数据删除版本的记录(last_delete_version 为-1),判断output rowset的大小是否超过promotion_size,如果超过,则更新cumulative point为output_rowset的end_version+1,否则,不更新cumulative point。
【注】cumulative compaction执行之前需要计算一次cumulative point,因为上一次cumulative compaction之后可能发生过base compaction,base rowset发生了变化,因此,promotion size发生了变化,cumulative point也会变化。cumulative compaction执行之前计算cumulative point,是为了确定本次 cumulative compaction的边界;cumulative compaction执行之后更新cumulative point,是为了确定下一次可能发生的base compaction的边界。
在compaction producer线程中,需要依据cumulative compaction score生产cumulative compaction任务。依次遍历tablet中的所有rowset,如果某一个rowset的版本位于cumulative point之后,则将该rowset添加到向量rowset_to_compact。
(1)如果rowset_to_compact中所有rowset的大小之和超过promotion_size,则rowset_to_compact中所有rowset的score之和为当前tablet的cumulative compaction score,即rowset_to_compact中所有rowset的segment文件数目之和。
(2)如果rowset_to_compact中所有rowset的大小之和小于promotion_size,则按照版本先后对rowset_to_compact中的rowset进行排序,然后遍历rowset_to_compact中的每一个rowset。计算当前rowset的大小等级(current_level),同时计算rowset_to_compact中除当前rowset之外的其他rowset大小之和的等级(remain_level),如果current_level > remain_level,则从向量rowset_to_compact中删除当前rowset,否则,停止遍历。rowset_to_compact中所有rowset的score之和为当前tablet的cumulative compaction score。
Doris的base compaction会将cumulative point之前的所有rowset进行合并,主要包含3个步骤,分别是选择input rowsets、检查base compaction的执行条件以及执行rowsets合并,如图1所示。其中,前面两个步骤属于compaction preparation阶段,最后一个步骤属于compaction execution阶段。
(1)选择input rowsets。依次遍历tablet中的每一个rowset,获取所有版本位于cumulative point之前的rowset作为input rowsets。
(2)对input rowsets中的所有rowset按照版本先后进行排序。
依次检查以下条件(如图2所示):
(1)版本连续性条件
遍历input rowsets,判断是否有相邻的两个rowset之间存在版本缺失,如果存在版本缺失,则当前tablet不满足base compaction的执行条件,本次base compaction任务结束;否则,检查下一个条件。
(2)rowset overlapping条件
遍历input rowsets,如果有rowset中不同segment文件之间存在overlapping,则当前tablet不满足base compaction的执行条件,本次base compaction任务结束;否则,检查检查下一个条件。
(3) base rowset条件
如果input rowsets中只有两个rowset,并且base rowset(start version为0)的end version为1,则当前tablet不满足base compaction的执行条件,本次base compaction任务结束;否则,检查下一个条件。
(4)rowset的数量条件
如果input rowsets中rowset的数量超过设定的阈值(通过base_compaction_num_cumulative_deltas配置,默认值为5),则当前tablet满足base compaction的执行条件;否则,检查下一个条件。
(5)rowset size条件
如果input rowsets中除base rowset(start version为0)之外的其他rowset大小之和与base rowset大小之比超过设定的阈值(通过base_cumulative_delta_ratio配置,默认值为0.3),则当前tablet满足base compaction的执行条件;否则,检查检查下一个条件。
(6)时间条件
如果当前tablet上一次成功执行base compaction的时间距离当前时刻超过指定的时间间隔(通过base_compaction_interval_seconds_since_last_operation配置,默认值为86400秒,即1天),则当前tablet满足base compaction的执行条件;否则,当前tablet不满足base compaction的执行条件,本次base compaction任务结束。
将input rowsets中的所有rowset进行合并,生成一个output rowset。与cumulative compaction过程中执行rowsets合并的流程相同,不再赘述。值得一提的是,base compaction过程中会将delete操作删除的数据行真正地删除。
在compaction producer线程中,需要依据base compaction score生产base compaction任务。依次遍历tablet中的每一个rowset,所有位于cumulative point之前的rowset的 score之和为tablet当前的base compaction score,即cumulative point之前的rowset中的segment文件数目之和。
Doris 的数据写入模型使用了 LSM-Tree 类似的数据结构。数据都是以追加(Append)的方式写入磁盘的。这种数据结构可以将随机写变为顺序写。这是一种面向写优化的数据结构,他能增强系统的写入吞吐,但是在读逻辑中,需要通过 Merge-on-Read 的方式,在读取时合并多次写入的数据,从而处理写入时的数据变更。
Merge-on-Read 会影响读取的效率,为了降低读取时需要合并的数据量,基于 LSM-Tree 的系统都会引入后台数据合并的逻辑,以一定策略定期的对数据进行合并。Doris 中这种机制被称为 Compaction。
Doris 中每次数据写入会生成一个数据版本。Compaction的过程就是讲多个数据版本合并成一个更大的版本。Compaction 可以带来以下好处:
每个数据版本内的数据是按主键有序的,但是版本之间的数据是无序的。Compaction后形成的大版本将多个小版本的数据变成有序数据。在有序数据中进行数据检索的效率更高。
数据都是以追加的方式写入的,因此 Delete、Update 等操作都是写入一个标记。Compaction 操作可以处理这些标记,进行真正的数据删除或更新,从而在读取时,不再需要根据这些标记来过滤数据。
在聚合模型下,Compaction 能进一步聚合不同数据版本中相同 key 的数据行,从而增加数据聚合度,减少读取时需要实时进行的聚合计算。
用户可能需要根据实际的使用场景来调整 Compaction 的策略,否则可能遇到如下问题:
在高频写入场景下,短时间内会产生大量的数据版本。如果 Compaction 不及时,就会造成大量版本堆积,最终严重影响写入速度。
Compaction 本质上是将已经写入的数据读取后重写写回的过程,这种数据重复写入被称为写放大。一个好的Compaction策略应该在保证效率的前提下,尽量降低写放大系数。过多的 Compaction 会占用大量的磁盘IO资源,影响系统整体效率。
Doris 中用于控制Compaction的参数非常多。本文尝试以下方面,介绍这些参数的含义以及如果通过调整参数来适配场景。
首先,用户的数据表会按照分区和分桶规则,切分成若干个数据分片(Tablet)存储在不同 BE 节点上。每个 Tablet 都有多个副本(默认为3副本)。Compaction 是在每个 BE 上独立进行的,Compaction 逻辑处理的就是一个 BE 节点上所有的数据分片。
前文说到,Doris的数据都是以追加的方式写入系统的。Doris目前的写入依然是以微批的方式进行的,每一批次的数据针对每个 Tablet 都会形成一个 rowset。而一个 Tablet 是由多个Rowset 组成的。每个 Rowset 都有一个对应的起始版本和终止版本。对于新增Rowset,起始版本和终止版本相同,表示为 [6-6]、[7-7] 等。多个Rowset经过 Compaction 形成一个大的 Rowset,起始版本和终止版本为多个版本的并集,如 [6-6]、[7-7]、[8-8] 合并后变成 [6-8]。
Rowset 的数量直接影响到 Compaction 是否能够及时完成。那么一批次导入会生成多少个 Rowset 呢?这里我们举一个例子:
假设集群有3个 BE 节点。每个BE节点2块盘。只有一张表,2个分区,每个分区3个分桶,默认3副本。那么总分片数量是(2 * 3 * 3)18 个,如果均匀分布在所有节点上,则每个盘上3个tablet。假设一次导入涉及到其中一个分区,则一次导入总共产生9个Rowset,即平均每块盘产生1-2个 Rowset。(这里仅考虑数据完全均匀分布的情况下,实际情况中,可能多个 Tablet 集中在某一块磁盘上。)
从上面的例子我们可以得出,rowset的数量直接取决于表的分片数量。举个极端的例子,如果一个Doris集群只有3个BE节点,但是有9000个分片。那么一次导入,每个BE节点就会新增3000个rowset,则至少要进行3000次compaction,才能处理完所有的分片。所以:
合理的设置表的分区、分桶和副本数量,避免过多的分片,可以降低Compaction的开销。
Doris 中有两种 Compaction 操作,分别称为 Base Compaction(BC) 和 Cumulative Compaction(CC)。BC 是将基线数据版本(以0为起始版本的数据)和增量数据版本合并的过程,而CC是增量数据间的合并过程。BC操作因为涉及到基线数据,而基线数据通常比较大,所以操作耗时会比CC长。
如果只有 Base Compaction,则每次增量数据都要和全量的基线数据合并,写放大问题会非常严重,并且每次 Compaction 都相当耗时。因此我们需要引入 Cumulative Compaction 来先对增量数据进行合并,当增量数据合并后的大小达到一定阈值后,再和基线数据合并。这里我们有一个比较通用的 Compaction 调优策略:
在合理范围内,尽量减少 Base Compaction 操作。
BC 和 CC 之间的分界线成为 Cumulative Point(CP),这是一个动态变化的版本号。比CP小的数据版本会只会触发 BC,而比CP大的数据版本,只会触发CC。
整个过程有点类似 2048 小游戏:只有合并后大小足够,才能继续和更大的数据版本合并。
Compaction 的目的是合并多个数据版本,一是避免在读取时大量的 Merge 操作,二是避免大量的数据版本导致的随机IO。因此,Compaction 策略的重点问题,就是如何选择合适的 tablet,以保证节点上不会出现数据版本过多的数据分片。
一个自然的想法,就是每次都选择数据版本最多的数据分片进行 Compaction。这个策略也是 Doris 的默认策略。这个策略在大部分场景下都能很好的工作。但是考虑到一种情况,就是版本多的分片,可能并不是最频繁访问的分片。而 Compaction 的目的就是优化读性能。那么有可能某一张 “写多读少” 表一直在 Compaction,而另一张 “读多写少” 的表不能及时的 Compaction,导致读性能变差。
因此,Doris 在选择数据分片时还引入了 “读取频率” 的因素。“读取频率” 和 “版本数量” 会根据各自的权重,综合计算出一个 Compaction 分数,分数越高的分片,优先做 Compaction。这两个因素的权重由以下 BE 参数控制(取值越大,权重越高):
compaction_tablet_scan_frequency_factor:“读取频率” 的权重值,默认为 0。
compaction_tablet_compaction_score_factor:“版本数量” 的权重,默认为 1。
> “读取频率” 的权重值默认为0,即默认仅考虑 “版本数量” 这个因素。
Compaction 是一个 生产者-消费者 模型。由一个生产者线程负责选择需要做 Compaction 的数据分片,而多个消费者负责执行 Compaction 操作。
生产者线程只有一个,会定期扫描所有 tablet 来选择合适的 compaction 对象。因为 Base Compaction 和 Cumulative Compaction 是不同类型的任务,因此目前的策略是每生成 9 个 CC 任务,生成一个 BC 任务。任务生成的频率由以下两个参数控制:
cumulative_compaction_rounds_for_each_base_compaction_round:多少个CC任务后生成一个BC任务。
generate_compaction_tasks_min_interval_ms:任务生成的间隔。
> 这两个参数通常情况下不需要调整。
生产者线程产生的任务会被提交到消费者线程池。因为 Compaction 是一个IO密集型的任务,为了保证 Compaction 任务不会过多的占用IO资源,Doris 限制了每个磁盘上能够同时进行的 Compaction 任务数量,以及节点整体的任务数量,这些限制由以下参数控制:
compaction_task_num_per_disk:每个磁盘上的任务数,默认为2。该参数必须大于等于2,以保证 BC 和 CC 任务各自至少有一个线程。
max_compaction_threads:消费者线程,即Compaction线程的总数。默认为 10。
举个例子,假设一个 BE 节点配置了3个数据目录(即3块磁盘),每个磁盘上的任务数配置为2,总线程数为5。则同一时间,最多有5个 Compaction 任务在进行,而每块磁盘上最多有2个任务在进行。并且最多有3个 BC 任务在进行,因为每块盘上会自动预留一个线程给CC任务。
另一方面,Compaction 任务同时也是一个内存密集型任务,因为其本质是一个多路归并排序的过程,每一路是一个数据版本。如果一个 Compaction 任务涉及的数据版本很多,则会占用更多的内存,如果仅限制任务数,而不考虑任务的内存开销,则有可能导致系统内存超限。因此,Doris 在上述任务个数限制之外,还增加了一个任务配额限制:
total_permits_for_compaction_score:Compaction 任务配额,默认 10000。
每个 Compaction 任务都有一个配额,其数值就是任务涉及的数据版本数量。假设一个任务需要合并100个版本,则其配额为100。当正在运行的任务配额总和超过配置后,新的任务将被拒绝。
三个配置共同决定了节点所能承受的 Compaction 任务数量。
一个 Compaction 任务对应的是一个数据分片(Tablet)。消费线程拿到一个 Compaction 任务后,会根据 Compaction 的任务类型,选择 tablet 中合适的数据版本(Rowset)进行数据合并。下面分别介绍 Base Compaction 和 Cumulative Compaction 的数据分片选择策略。
前文说过,BC 任务是增量数据和基线数据的合并任务。并且只有比 Cumulative Point(CP) 小的数据版本才会参与 BC 任务。因此,BC 任务的数据版本选取策略比较简单。
首先,会选取所有版本在 0 到 CP之间的 rowset。然后根据以下几个配置参数,判断是否启动一个 BC 任务:
base_compaction_num_cumulative_deltas:一次 BC 任务最小版本数量限制。默认为5。该参数主要为了避免过多 BC 任务。当数据版本数量较少时,BC 是没有必要的。
base_compaction_interval_seconds_since_last_operation:第一个参数限制了当版本数量少时,不会进行 BC 任务。但我们需要避免另一种情况,即某些 tablet 可能仅会导入少量批次的数据,因此当 Doris 发现一个 tablet 长时间没有执行过 BC 任务时,也会触发 BC 任务。这个参数就是控制这个时间的,默认是 86400,单位是秒。
> 以上两个参数通常情况下不需要修改,在某些情况下如何需要想尽快合并基线数据,可以尝试改小 base_compaction_num_cumulative_deltas 参数。但这个参数只会影响到 “被选中的 tablet”。而 “被选中” 的前提是这个 tablet 的数据版本数量是最多的。
CC 任务只会选取版本比 CP 大的数据版本。其本身的选取策略也比较简单,即从 CP 版本开始,依次向后选取数据版本。最终的数据版本集合由以下参数控制:
min_cumulative_compaction_num_singleton_deltas:一次 CC 任务最少的版本数量限制。这个配置是和 cumulative_size_based_compaction_lower_size_mbytes 配置同时判断的。即如果版本数量小于阈值,并且数据量也小于阈值,则不会触发 CC 任务。以避免躲过不比较的 CC 任务。默认是5。
max_cumulative_compaction_num_singleton_deltas:一次 CC 任务最大的版本数量限制。以防止一次 CC 任务合并的版本数量过多,占用过多资源。默认是1000。
cumulative_size_based_compaction_lower_size_mbytes:一次 CC 任务最少的数据量,和min_cumulative_compaction_num_singleton_delta 同时判断。默认是 64,单位是 MB。
简单来说,默认配置下,就是从 CP 版本开始往后选取 rowset。最少选5个,最多选 1000 个,然后判断数据量是否大于阈值即可。
CC 任务还有一个重要步骤,就是在合并任务结束后,设置新的 Cumulative Point。CC 任务合并完成后,会产生一个合并后的新的数据版本,而我们要做的就是判断这个新的数据版是 “晋升” 到 BC 任务区,还是依然保留在 CC 任务区。举个例子:
假设当前 CP 是 10。有一个 CC 任务合并了 [10-13] [14-14] [15-15] 后生成了 [10-15] 这个版本。如果决定将 [10-15] 版本移动到 BC 任务区,则需修改 CP 为 15,否则 CP 保持不变,依然为 10。
CP 只会增加,不会减少。以下参数决定了是否更新 CP:
cumulative_size_based_promotion_ratio:晋升比率。默认 0.05。
cumulative_size_based_promotion_min_size_mbytes:最小晋升大小,默认 64,单位 MB。
cumulative_size_based_promotion_size_mbytes:最大晋升大小,默认 1024,单位 MB。
以上参数比较难理解,这里我们先解释下 “晋升” 的原则。一个 CC 任务生成的 rowset 的晋升原则,是其数据大小和基线数据的大小在 “同一量级”。这个类似 2048 小游戏,只有相同的数字才能合并形成更大的数字。而上面三个参数,就是用于判断一个新的rowset是否匹配基线数据的数量级。举例说明:
在默认配置下,假设当前基线数据(即所有 CP 之前的数据版本)的数据量为 10GB,则晋升量级为 (10GB * 0.05)512MB。这个数值大于 64 MB 小于 1024 MB,满足条件。所以如果 CC 任务生成的新的 rowset 的大小大于 512 MB,则可以晋升,即 CP 增加。而假设基线数据为 50GB,则晋升量级为(50GB * 0.05)2.5GB。这个数值大于 64 MB 也大于 1024 MB,因此晋升量级会被调整为 1024 MB。所以如果 CC 任务生成的新的 rowset 的大小大于 1024 MB,则可以晋升,即 CP 增加。
从上面的例子可以看出,cumulative_size_based_promotion_ratio 用于定义 “同一量级”,0.05 即表示数据量大于基线数据的 5% 的 rowset 都有晋升的可能,而 cumulative_size_based_promotion_min_size_mbytes 和 cumulative_size_based_promotion_size_mbytes 用于保证晋升不会过于频繁或过于严格。
> 这三个参数会直接影响 BC 和 CC 任务的频率,尤其在高频导入场景下需要适当调整。我们会在后续文章中举例说明。
还有一些参数和 Compaction 相关,在某些情况下需要修改:
disable_auto_compaction:默认为 false,修改为 true 则会禁止 Compaction 操作。该参数仅在一些调试情况,或者 compaction 异常需要临时关闭的情况下才需使用。
通过 DELETE FROM 语句执行的数据删除操作,在 Doris 中也会生成一个数据版本用于标记删除。这种类型的数据版本比较特殊,我们成为 “删除版本”。删除版本只能通过 Base Compaction 任务处理。因此在在遇到删除版本时,Cumulative Point 会强制增加,将删除版本移动到 BC 任务区。因此数据导入和删除交替发生的场景通常会导致 Compaction 灾难。比如以下版本序列:
[11-11] 删除版本
[12-12]
[13-13] 删除版本
[14-14]
[15-15] 删除版本
[16-16]
[17-17] 删除版本
...在这种情况下,CC 任务几乎不会被触发(因为CC任务只能选择一个版本,而无法处理删除版本),所有版本都会交给 Base Compaction 处理,导致 Compaction 进度缓慢。目前Doris还无法很好的处理这种场景,因此需要在业务上尽量避免。
Compaction 的目的是合并多个数据版本,一是避免在读取时大量的 Merge 操作,二是避免大量的数据版本导致的随机IO。并且在这个过程中,Compaction 操作不能占用太多的系统资源。所以我们可以以结果为导向,从以下两个方面反推是否需要调整 Compaction 策略。
Doris 提供数据版本数量的监控数据。如果你部署了 Prometheus + Grafana 的监控,则可以通过 Grafana 仪表盘的 BE Base Compaction Score 和 BE Cumu Compaction Score 图表查看到这个监控数据的趋势图:
> 这个图表展示的是每个 BE 节点,所有 Tablet 中数据版本最多的那个 Tablet 的版本数量,可以反映出当前版本堆积情况。
> 部署方式参阅:http://doris.incubator.apache.org/master/zh-CN/administrator-guide/operation/monitor-alert.html
如果没有安装这个监控,如果你是用的 Palo 0.14.7 版本以上,也可以通过以下命令在命令行查看这个监控数据的趋势图:
mysql> ADMIN SHOW BACKEND METRIC ("nodes" = "30746894", "metrics" = "BE_BASE_COMPACTION_SCORE", "time" = "last 4 hours");
mysql> ADMIN SHOW BACKEND METRIC ("nodes" = "30746894", "metrics" = "BE_CUMU_COMPACTION_SCORE", "time" = "last 4 hours");
> 该命令具体帮助可执行 HELP ADMIN SHOW METRIC 查看
注意这里有两个指标,分别表示 Base Compaction 和 Cumulative Compaction 所对应的版本数量。在大部分情况下,我们只需要查看 Cumulative Compaction 的指标,即可大致了解集群的数据版本堆积情况。
版本是否堆积没有一个明确的界限,而是根据使用场景和查询延迟进行判断的一个经验值。我们可以按照以下步骤进行简单的推断:
观察数据版本数量的趋势,如果趋势平稳,则说明 Compaction 和导入速度基本持平。如果呈上升态势,则说明 Compaction 速度跟不上导入速度了。如果呈下降态势,说明 Compaction 速度超过了导入速度。如果呈上升态势,或在平稳状态但数值较高,则需要考虑调整 Compaction 参数以加快 Compaction 的进度。
通常版本数量维持在 100 以内可以视为正常。而在大部分批量导入或低频导入场景下,版本数量通常为10-20甚至更低。
Compaction 资源占用主要是 IO 和 内存。
对于 Compaction 占用的内存,可以在浏览器打开以下链接:
http://be_host:webserver_port/mem_tracker
在搜索框中输入 AutoCompaction:
则可以查看当前Compaction的内存开销和历史峰值开销。
而对于 IO 操作,目前还没有提供单独的 Compaction 操作的 IO 监控,我们只能根据集群整体的 IO 利用率情况来做判断。我们可以查看监控图 Disk IO util:
或者通过命令在命令行查看(Palo 0.14.7 以上版本):
mysql> ADMIN SHOW BACKEND METRIC ("nodes" = "30746894", "metrics" = "BE_DISK_IO", "time" = "last 4 hours");
这个监控展示的是每个 BE 节点上磁盘的 IO util 指标。数值越高表示IO越繁忙。当然大部分情况下 IO 资源都是查询请求消耗的,这个监控主要用于指导我们是否需要增加或减少 Compaction 任务数。
如果版本数量有上升趋势或者数值较高,则可以从以下两方面优化 Compaction:
在优化 Compaction 执行逻辑之前,我们需要使用一些命令来进一步查看一些Compaction的细节信息。
首先,我们通过监控图找到一个版本数量最高的 BE 节点。然后执行以下命令分析日志:
$> grep "succeed to do base" log/be.INFO.log.20210505-142010 |tail -n 100
$> grep "succeed to do cumu" log/be.INFO.log.20210505-142010 |tail -n 100
以上两个命令可以查看最近100个执行完成的 compaction 任务:
I0505 17:06:56.143455 675 compaction.cpp:135] succeed to do cumulative compaction. tablet=106827682.505347040.d040c1cdf71e5c95-3a002a06127ccd86, output_version=2-2631, current_max_version=2633, disk=/home/disk6/palo.HDD, segments=57. elapsed time=2.29371s. cumulative_compaction_policy=SIZE_BASED.
I0505 17:06:56.520058 666 compaction.cpp:135] succeed to do cumulative compaction. tablet=106822189.1661856168.654562832a620ea6-46fe84c73ea84795, output_version=2-3247, current_max_version=3250, disk=/home/disk2/palo.HDD, segments=22. elapsed time=2.66858s. cumulative_compaction_policy=SIZE_BASED.
通过日志时间可以判断 Compaction 是否在持续正确的执行,通过 elapsed time 可以观察每个任务的执行时间。
我们还可以执行以下命令展示最近100个 compaction 任务的配额(permits):
$> grep "permits" log/be.INFO |tail -n 100
I0505 17:04:07.120920 667 compaction.cpp:83] start cumulative compaction. tablet=106827970.777011641.9c474de1b8ba9199-4addeb135d6834ac, output_version=2-2623, permits: 39
I0505 17:04:13.898777 672 compaction.cpp:83] start cumulative compaction. tablet=106822777.1948936074.a44ac9462e79b76d-4a33ee39559bb0bf, output_version=2-3238, permits: 22
配额和版本数量成正比。
我们可以找到 permits 较大的一个任务对应的 tablet id,如上图permit 为 39 的任务的 tablet id 为 106827970,然后继续分析这个 tablet 的 compaction 情况。
通过 MySQL 客户端连接 Doris 集群后,执行:
mysql> show tablet 106827970;
+--------------------------+-----------+---------------+-----------+---------+----------+-------------+----------+--------+----------------------------------------------------------------------------+
| DbName | TableName | PartitionName | IndexName | DbId | TableId | PartitionId | IndexId | IsSync | DetailCmd |
+--------------------------+-----------+---------------+-----------+---------+----------+-------------+----------+--------+----------------------------------------------------------------------------+
| default_cluster:test | tbl1 | p20210505 | tbl1 | 3828954 | 63708800 | 106826829 | 63709761 | true | SHOW PROC '/dbs/3828954/63708800/partitions/106826829/63709761/106827970'; |
+--------------------------+-----------+---------------+-----------+---------+----------+-------------+----------+--------+----------------------------------------------------------------------------+
然后执行后面的 SHOW PROC 语句,我们可以获得这个 tablet 所有副本的详细信息。其中 VersionCount 列表示对应副本的数据版本数量。我们可以选取一个 VersionCount 较大的副本,在浏览器打开 CompactionStatus 列显示的 URL,得到如下Json结果:
{
"cumulative policy type": "SIZE_BASED",
"cumulative point": 18438,
"last cumulative failure time": "1970-01-01 08:00:00.000",
"last base failure time": "1970-01-01 08:00:00.000",
"last cumulative success time": "2021-05-05 17:18:48.904",
"last base success time": "2021-05-05 16:14:49.786",
"rowsets": [
"[0-17444] 13 DATA NONOVERLAPPING 0200000000b1fb8d344f83103113563dd81740036795499d 2.86 GB",
"[17445-17751] 1 DATA NONOVERLAPPING 0200000000b25183344f83103113563dd81740036795499d 68.61 MB",
"[17752-18089] 1 DATA NONOVERLAPPING 0200000000b2b9a2344f83103113563dd81740036795499d 74.52 MB",
"[18090-18437] 1 DATA NONOVERLAPPING 0200000000b32686344f83103113563dd81740036795499d 76.41 MB",
"[18438-18678] 1 DATA NONOVERLAPPING 0200000000b37084344f83103113563dd81740036795499d 53.07 MB",
"[18679-18679] 1 DATA NONOVERLAPPING 0200000000b36d87344f83103113563dd81740036795499d 3.11 KB",
"[18680-18680] 1 DATA NONOVERLAPPING 0200000000b36d70344f83103113563dd81740036795499d 258.40 KB",
"[18681-18681] 1 DATA NONOVERLAPPING 0200000000b36da0344f83103113563dd81740036795499d 266.98 KB",
],
"stale_rowsets": [
],
"stale version path": [
]
}
这里我们可以看到一个 tablet 的 Cumulative Point,最近一次成功、失败的 BC/CC 任务时间,以及每个 rowset 的版本信息。如上面这个示例,我们可以得出以下结论:
我们还可以进一步的通过以下命令分析指定 tablet id 的日志:
# 查看 tablet 48062815 最近十个任务的配额情况
$> grep permits log/be.INFO |grep 48062815 |tail -n 10
# 查看 tablet 48062815 最近十个执行完成的 compaction 任务
$> grep "succeed to do" log/be.INFO |grep 48062815 |tail -n 10
另外,我们还可以在浏览器打开以下 URL,查看一个 BE 节点当前正在执行的 compaction 任务:
be_host:webserver_port/api/compaction/run_status
{
"CumulativeCompaction": {
"/home/disk2/palo": [],
"/home/disk1/palo": [
"48061239"
]
},
"BaseCompaction": {
"/home/disk2/palo": [],
"/home/disk1/palo": [
"48062815",
"48061276"
]
}
}
这个接口可以看到每个磁盘上当前正在执行的 compaction 任务。
通过以上一系列的分析,我们应该可以对系统的 Compaction 情况有以下判断:
这些结论将指导我们对 Compaction 进行调优。
增加 Compaction 线程数是一个非常直接的加速 Compaction 的方法。但是更多的任务意味着更大的 IO 和 内存开销。尤其在机械磁盘上,因为随机读写问题,有时可能单线程串行执行的效率会高于多线程并行执行。Doris 默认配置为每块盘两个 Compaction 任务(这也是最小的合法配置),最多 10 个任务。如果磁盘数量多于 5,在内存允许的情况下,可以修改 max_compaction_threads 参数增加总任务数,以保证每块盘可以执行两个 Compaction 任务。
对于机械磁盘,不建议增加每块盘的任务数。对于固态硬盘,可以考虑修改 compaction_task_num_per_disk 参数适当增加每块盘的任务数,如修改为 4。注意修改这个参数的同时可能还需同步修改 max_compaction_threads,使得 max_compaction_threads 大于等于 compaction_task_num_per_disk * 磁盘数量。
这个优化方式比较复杂,我们尝试从几个场景出发来说明:
场景一:基线数据量大,Base Compaction 任务执行时间长。
BC 任务执行时间长,意味着一个任务会长时间占用 Compaction 工作线程,从而导致其他 tablet 的 compaction 任务时间被挤占。如果是因为 0 号版本的基线数据量较大导致,则我们可以考虑尽量推迟增量rowset 晋升到 BC 任务区的时间。以下两个参数将影响这个逻辑:
cumulative_size_based_promotion_ratio:默认 0.05,基线数据量乘以这个系数,即晋升阈值。可以调大这个系数来提高晋升阈值。
cumulative_size_based_promotion_size_mbytes:默认 1024MB。如果增量rowset的数据量大于这个值,则会忽略第一个参数的阈值直接晋升。因此需要同时调整这个参数来提升晋升阈值。
当然,提升晋升阈值,会导致单个 BC 任务需要处理更大的数据量,耗时更长,但是总体的数据量会减少。举个例子。基线数据大小为 1024GB,假设晋升阈值分别为 100MB 和 200MB。数据导入速度为 100MB/分钟。每5个版本执行一次 BC。那么理论上在10分钟内,阈值为 100MB 时,BC 任务处理的总数据量为 (1024 + 100 * 5)* 2 = 3048MB。阈值为 200MB 是,BC 任务处理的总数据量为 (1024 + 200 * 5) = 2024 MB。
场景二:增量数据版本数量增长较快,Cumulative Compaction 处理过多版本,耗时较长。
max_cumulative_compaction_num_singleton_deltas 参数控制一个 CC 任务最多合并多少个数据版本,默认值为 1000。我们考虑这样一种场景:针对某一个 tablet,其数据版本的增长速度为 1个/秒。而其 CC 任务的执行时间 + 调度时间是 1000秒(即单个 CC 任务的执行时间加上Compaction再一次调度到这个 tablet 的时间总和)。那么我们可能会看到这个 tablet 的版本数量在 1-1000之间浮动(这里我们忽略基线版本数量)。因为在下一次 CC 任务执行前的 1000 秒内,又会累积 1000 个版本。
这种情况可能导致这个 tablet 的读取效率很不稳定。这时我们可以尝试调小 max_cumulative_compaction_num_singleton_deltas 这个参数,这样一个 CC 所要合并的版本数更少,执行时间更短,执行频率会更高。还是刚才这个场景,假设参数调整到500,而对应的 CC 任务的执行时间 + 调度时间也降低到 500,则理论上这个 tablet 的版本数量将会在 1-500 之间浮动,相比于之前,版本数量更稳定。
当然这个只是理论数值,实际情况还要考虑任务的具体执行时间、调度情况等等。
某些情况下,自动 Compaction 策略可能无法选取到某些 tablet,这时我们可能需要通过 Compaction 接口来主动触发指定 tablet 的 Compaction。我们以 curl 命令举例:
curl -X POST http://192.168.1.1:8040/api/compaction/run?tablet_id=106818600\&schema_hash=6979334\&compact_type=cumulative
这里我们指定 id 为 106818600,schema hash 为 6979334 的 tablet 进行 Cumulative Compaction(compact_type参数为 base 则触发 Base Compaction)。其中 schema hash 可以通过 SHOW TABLET tablet_id 命令得到的 SHOW PROC 命令获取。
如果提交成功,则会返回:
{"status": "Success", "msg": "compaction task is successfully triggered."}
这是一个异步操作,命令只是提交compaction 任务,之后我们可以通过以下 API 来查看任务是否在运行:
curl -X GET http://192.168.1.1:8040/api/compaction/run_status?tablet_id=106818600\&schema_hash=6979334
返回结果:
{
"status" : "Success",
"run_status" : false,
"msg" : "compaction task for this tablet is running",
"tablet_id" : 106818600,
"schema_hash" : 6979334,
"compact_type" : "cumulative"
}
当然也可以直接查看 tablet 的版本情况:
curl -X GET http://192.168.1.1:8040/api/compaction/show?tablet_id=106818600\&schema_hash=6979334