文章目录
[TOC]
前言
在上篇文章中,我们介绍了clickhouse的新特性——MergeTree启动加速,及其使用方法。本文将介绍MergeTree启动加速的设计原理。
MergeTree启动原理
part结构
我们知道,在clickhouse中,MergeTree表由一个个part组成。每个part对应一个目录,该目录下有两类文件:元数据文件,数据文件和projection part目录(如果该表创建了projection的话)
$ ll ./20210321_310_310_0
total 36
-rw-r----- 1 root root 28 Feb 9 14:26 primary.idx
-rw-r----- 1 root root 4 Feb 9 14:26 partition.dat
-rw-r----- 1 root root 4 Feb 9 14:26 minmax_day.idx
-rw-r----- 1 root root 10 Feb 9 14:26 default_compression_codec.txt
-rw-r----- 1 root root 1648 Feb 9 14:26 data.mrk3
-rw-r----- 1 root root 2110 Feb 9 14:26 data.bin
-rw-r----- 1 root root 1 Feb 9 14:26 count.txt
-rw-r----- 1 root root 790 Feb 9 14:26 columns.txt
-rw-r----- 1 root root 252 Feb 9 14:26 checksums.txt
其中元数据文件包括:
- partition.dat: 记录本part所在分区的值
- primary.idx: 主键索引文件,记录每个Granule起始行的主键值,需配合.mrk3文件使用
- count.txt: 记录本part的数据行数
- checksums.txt: 记录本part下所有文件的size和checksum
- minmax_XX.idx: 分区索引或二级minmax索引文件,记录对应字段的min值和max值
- default_compression_codec.txt: 记录该part默认的压缩编码,如ZTSD, LZ4等
- *.mrk, *.mrk2, *.mrk3: 不同版本的mark文件,它记录着每个Granule的起始行数、在对应的数据文件中的偏移位置和解压后的Block中的偏移位置。
- uuid.txt: 记录本part的唯一id。仅在assign_part_uuids = true时才会出现。
- ttl.txt: 记录本part的过期时间
其中数据文件包括:
- XX.bin。part有三种类型,Wide, Compact, InMemory。当part类型为Wide时,每个字段对应一个bin文件和mark文件。当part类型为Compact时,part目录下仅有一个全局的data.bin和data.mrk3文件。当part类型为InMemory时,没有part目录,只有一个全局的wal.bin文件,因此InMemory Part对启动性能的影响很小,不在我们考虑范围之内。
启动流程
clickhouse-server启动之后:
- 首先加载系统库(包括system/information_schema/INFORMATION_SCHEMA)的元数据,然后attach系统库下的表。
- 然后调用loadMetadata加载除了系统库之外的database
- 最后创建HTTP、TCP等server,提供对外查询
系统库中表的数量有限,主要是第二步比较耗时
try
{
auto & database_catalog = DatabaseCatalog::instance();
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
/// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread,
/// that may execute DROP before loadMarkedAsDroppedTables() in background,
/// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap.
database_catalog.loadMarkedAsDroppedTables();
/// Then, load remaining databases
loadMetadata(global_context, default_database);
startupSystemTables();
database_catalog.loadDatabases();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);
}
...
{
attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics);
{
std::lock_guard lock(servers_lock);
createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers);
if (servers.empty())
throw Exception(
"No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
if (servers.empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
在loadMetadata
中,
- 首先遍历metadata目录,获取每个database的名称和元数据路径
- 对于每个database,加载对应的
.sql文件,获取create database query并执行, 并将该database加入到catalog中 - 执行
TablesLoader::loadTables
,对每个database对象执行loadStoredObjects
,即完成该database下每个table的加载。对于Engine为MergeTree Family的表来说,加载的主要工作就是读取所有part下的元数据文件到内存中。 - 执行
TablesLoader::startupTables
,对每个database对象执行startupTables
。对于非Replicated的MergeTree Family表,启动包括清理过期part、清理过期WAL、清理空part、启动后台disk move任务(如果该表配置了多disk的话)等工作。对于Replicated的MergeTree Family表,启动包括开启副本间同步、副本选主、启用副本、启动后台disk move等工作
void loadMetadata(ContextMutablePtr context, const String & default_database_name)
{
...
for (fs::directory_iterator it(path); it != dir_end; ++it)
{
if (it->is_symlink())
continue;
const auto current_file = it->path().filename().string();
if (!it->is_directory())
{
/// TODO: DETACH DATABASE PERMANENTLY ?
if (fs::path(current_file).extension() == ".sql")
{
String db_name = fs::path(current_file).stem();
if (!isSystemOrInformationSchema(db_name))
databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
}
...
TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)
{
loadDatabase(context, name, db_path, has_force_restore_data_flag);
loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
}
TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
loader.loadTables();
loader.startupTables();
...
}
在DatabaseOrdinary::loadStoredObjects
中
- 首先遍历database目录是下所有*.sql文件。对于每个文件,读取其中的建表语句并解析成AST
- 对于该database目录下的每张表,生成对应的Storage对象,并attach到本database中。注意这个过程以表为最小粒度并发的,并发度等于cpu物理核数
void DatabaseOrdinary::loadStoredObjects(
ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
* which does not correspond to order tables creation and does not correspond to order of their location on disk.
*/
ParsedTablesMetadata metadata;
loadTablesMetadata(local_context, metadata);
...
/// Attach tables.
for (const auto & name_with_path_and_query : metadata.parsed_tables)
{
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
{
pool.scheduleOrThrowOnError([&]()
{
loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
}
}
pool.wait();
...
}
那么MergeTree表加载元数据的操作发生在哪里呢?发生在Storage的构造函数中,参考StorageReplicatedMergeTree::StorageReplicatedMergeTree
-> MergeTreeData::loadDataParts
。在loadDataParts
中
首先遍历每一个disk下的表的数据目录,收集其中的part。注意这个过程是并行的,并行度为本表关联的disk数量。
- 对于InMemory part, 加载wal文件中的数据。
- 对于Wide或Compact part,记录part名到disk的映射关系。
然后加载Wide和Compact part。
loadDataPartsFromDisk
最后加载InMemory part.
loadDataPartsFromWAL
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
...
for (const auto & disk_ptr : disks)
{
if (disk_ptr->isBroken())
continue;
auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
pool.scheduleOrThrowOnError([&, disk_ptr]()
{
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
{
std::unique_lock lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
ErrorCodes::CORRUPTED_DATA);
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
disk_wal_parts.push_back(std::move(part));
}
...
if (num_parts > 0)
loadDataPartsFromDisk(
broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
if (!parts_from_wal.empty())
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
}
一个MergeTree表的In memory是相当有限的,我们重点分析Wide和Compact part的加载,即loadDataPartsFromDisk
。每个part的加载流程如下:
- 加载uuid.txt文件
- 加载columns.txt文件
- 加载checksums.txt文件
- 加载任意mark文件。对于Compact part,只有一个mark文件:data.mrk3
- 从checksums中获取每个column和secondary index文件的大小。
- 加载primary.idx文件
- 加载ttl.txt文件
- 加载projection parts中的metadata文件。
- 根据checksums校验metadata和data文件的一致性
- 加载default_compression_codec.txt文件
注意MergeTree表加载part的过程是并行的,并发度默认为cpu硬件核数。当MergeTree表完成所有part的加载后,便可利用元数据构造的分区索引、主键索引、二级索引、Projection加速查询。这就是为什么clickhouse-server必须等待所有表完成加载后才可对外提供查询服务的原因。
为什么慢
从上面的流程可以看到,加载一个MergeTree part最多读取8个metadata 文件。假如clickhouse示例上有70w part, metadata文件的数量为560w。如果MergeTree还创建了一个或多个projection, metadata文件数量可能会超过1000w。对这么多文件的并发读取会瞬间把disk ioutil打满,disk read成为性能瓶颈。这就是为什么MergeTree启动慢的原因。
启动加速优化
优化方案
优化的核心在于减少disk io次数。因此我们决定引入RocksDB用于缓存metadata file。RocksDB启动之后会自身的LSM文件,文件个数远远少于metadata文件数,文件位置在磁盘上也更加集中。当所有part的metadata都缓存到RocksDB时,clickhouse启动时仅从RocksDB中就可获得所有part的metadata,总体来看disk io的次数大大减小了。
缓存结构
RocksDB中:
- key: 考虑到同一张表的part可能分布在不同的disk中,所以key由两部分组成,disk name和part相对disk的路径,如
default:store/b47/b47ee9c8-afa1-41ac-adfb-2e0dc42de819/6_7_7_0/primary.idx
- value: metadata文件的内容
缓存一致性
既然用RockDB作为缓存层,那么如何保证缓存层和持久化层也就是磁盘文件的一致性呢?主要分为两大类场景:
- 读meta:当需要读取某个part的元数据时,首先从缓存层查询,如果缓存未命中, 则读取持久化层的结果,并将该结果更新到缓存层并返回结果。如果缓存命中则直接返回结果。该操作主要发生在clickhouse-server启动阶段
- 写meta:当需要更新某个part的元数据时,对缓存层和持久化层进行双写。该操作可能发生在插入数据、删除分区、后台数据合并/移动、ALTER TABLE等场景下。
那么是否需要考虑缓存的并发问题呢? 一个cache key唯一对应一个metadata文件,而clickhouse MergeTree的设计已经保证了不可能有多写或读写并发的情况出现。因此在设计缓存层时无需考虑这个问题。
如何检测不一致
增加了一层缓存层,那么
源码走读
总结
文章作者 后端侠
上次更新 0001-01-01