[TOC]

前言

在上篇文章中,我们介绍了clickhouse的新特性——MergeTree启动加速,及其使用方法。本文将介绍MergeTree启动加速的设计原理。

MergeTree启动原理

part结构

我们知道,在clickhouse中,MergeTree表由一个个part组成。每个part对应一个目录,该目录下有两类文件:元数据文件,数据文件和projection part目录(如果该表创建了projection的话)

$ ll ./20210321_310_310_0 
total 36
-rw-r----- 1 root root   28 Feb  9 14:26 primary.idx
-rw-r----- 1 root root    4 Feb  9 14:26 partition.dat
-rw-r----- 1 root root    4 Feb  9 14:26 minmax_day.idx
-rw-r----- 1 root root   10 Feb  9 14:26 default_compression_codec.txt
-rw-r----- 1 root root 1648 Feb  9 14:26 data.mrk3
-rw-r----- 1 root root 2110 Feb  9 14:26 data.bin
-rw-r----- 1 root root    1 Feb  9 14:26 count.txt
-rw-r----- 1 root root  790 Feb  9 14:26 columns.txt
-rw-r----- 1 root root  252 Feb  9 14:26 checksums.txt

其中元数据文件包括:

  • partition.dat: 记录本part所在分区的值
  • primary.idx: 主键索引文件,记录每个Granule起始行的主键值,需配合.mrk3文件使用
  • count.txt: 记录本part的数据行数
  • checksums.txt: 记录本part下所有文件的size和checksum
  • minmax_XX.idx: 分区索引或二级minmax索引文件,记录对应字段的min值和max值
  • default_compression_codec.txt: 记录该part默认的压缩编码,如ZTSD, LZ4等
  • *.mrk, *.mrk2, *.mrk3: 不同版本的mark文件,它记录着每个Granule的起始行数、在对应的数据文件中的偏移位置和解压后的Block中的偏移位置。
  • uuid.txt: 记录本part的唯一id。仅在assign_part_uuids = true时才会出现。
  • ttl.txt: 记录本part的过期时间

其中数据文件包括:

  • XX.bin。part有三种类型,Wide, Compact, InMemory。当part类型为Wide时,每个字段对应一个bin文件和mark文件。当part类型为Compact时,part目录下仅有一个全局的data.bin和data.mrk3文件。当part类型为InMemory时,没有part目录,只有一个全局的wal.bin文件,因此InMemory Part对启动性能的影响很小,不在我们考虑范围之内。

启动流程

clickhouse-server启动之后:

  • 首先加载系统库(包括system/information_schema/INFORMATION_SCHEMA)的元数据,然后attach系统库下的表。
  • 然后调用loadMetadata加载除了系统库之外的database
  • 最后创建HTTP、TCP等server,提供对外查询

系统库中表的数量有限,主要是第二步比较耗时

    try
    {
        auto & database_catalog = DatabaseCatalog::instance();
        /// We load temporary database first, because projections need it.
        database_catalog.initializeAndLoadTemporaryDatabase();
        loadMetadataSystem(global_context);
        /// After attaching system databases we can initialize system log.
        global_context->initializeSystemLogs();
        global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
        /// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
        attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
        /// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread,
        /// that may execute DROP before loadMarkedAsDroppedTables() in background,
        /// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap.
        database_catalog.loadMarkedAsDroppedTables();
        /// Then, load remaining databases
        loadMetadata(global_context, default_database);
        startupSystemTables();
        database_catalog.loadDatabases();
        /// After loading validate that default database exists
        database_catalog.assertDatabaseExists(default_database);
    }

	...
    {
        attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics);

        {
            std::lock_guard lock(servers_lock);
            createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers);
            if (servers.empty())
                throw Exception(
                    "No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
                    ErrorCodes::NO_ELEMENTS_IN_CONFIG);
        }

        if (servers.empty())
             throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
                ErrorCodes::NO_ELEMENTS_IN_CONFIG);

loadMetadata中,

  • 首先遍历metadata目录,获取每个database的名称和元数据路径
  • 对于每个database,加载对应的.sql文件,获取create database query并执行, 并将该database加入到catalog中
  • 执行TablesLoader::loadTables,对每个database对象执行loadStoredObjects,即完成该database下每个table的加载。对于Engine为MergeTree Family的表来说,加载的主要工作就是读取所有part下的元数据文件到内存中。
  • 执行TablesLoader::startupTables,对每个database对象执行startupTables。对于非Replicated的MergeTree Family表,启动包括清理过期part、清理过期WAL、清理空part、启动后台disk move任务(如果该表配置了多disk的话)等工作。对于Replicated的MergeTree Family表,启动包括开启副本间同步、副本选主、启用副本、启动后台disk move等工作
void loadMetadata(ContextMutablePtr context, const String & default_database_name)
{
    ...
    for (fs::directory_iterator it(path); it != dir_end; ++it)
    {
        if (it->is_symlink())
            continue;

        const auto current_file = it->path().filename().string();
        if (!it->is_directory())
        {
            /// TODO: DETACH DATABASE PERMANENTLY ?
            if (fs::path(current_file).extension() == ".sql")
            {
                String db_name = fs::path(current_file).stem();
                if (!isSystemOrInformationSchema(db_name))
                    databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
            }
            
    ...
    TablesLoader::Databases loaded_databases;
    for (const auto & [name, db_path] : databases)
    {
        loadDatabase(context, name, db_path, has_force_restore_data_flag);
        loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
    }

    TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
    loader.loadTables();
    loader.startupTables();
    ...
}

DatabaseOrdinary::loadStoredObjects

  • 首先遍历database目录是下所有*.sql文件。对于每个文件,读取其中的建表语句并解析成AST
  • 对于该database目录下的每张表,生成对应的Storage对象,并attach到本database中。注意这个过程以表为最小粒度并发的,并发度等于cpu物理核数
void DatabaseOrdinary::loadStoredObjects(
    ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
    /** Tables load faster if they are loaded in sorted (by name) order.
      * Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
      *  which does not correspond to order tables creation and does not correspond to order of their location on disk.
      */

    ParsedTablesMetadata metadata;
    loadTablesMetadata(local_context, metadata);
    ...
    
    /// Attach tables.
    for (const auto & name_with_path_and_query : metadata.parsed_tables)
    {
        const auto & name = name_with_path_and_query.first;
        const auto & path = name_with_path_and_query.second.path;
        const auto & ast = name_with_path_and_query.second.ast;
        const auto & create_query = ast->as<const ASTCreateQuery &>();

        if (!create_query.is_dictionary)
        {
            pool.scheduleOrThrowOnError([&]()
            {
                loadTableFromMetadata(local_context, path, name, ast, force_restore);

                /// Messages, so that it's not boring to wait for the server to load for a long time.
                logAboutProgress(log, ++tables_processed, total_tables, watch);
            });
        }
    }

    pool.wait();
    ...
}

那么MergeTree表加载元数据的操作发生在哪里呢?发生在Storage的构造函数中,参考StorageReplicatedMergeTree::StorageReplicatedMergeTree -> MergeTreeData::loadDataParts。在loadDataParts

  • 首先遍历每一个disk下的表的数据目录,收集其中的part。注意这个过程是并行的,并行度为本表关联的disk数量。

    • 对于InMemory part, 加载wal文件中的数据。
    • 对于Wide或Compact part,记录part名到disk的映射关系。
  • 然后加载Wide和Compact part。loadDataPartsFromDisk

  • 最后加载InMemory part. loadDataPartsFromWAL

void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
	...
    for (const auto & disk_ptr : disks)
    {
        if (disk_ptr->isBroken())
            continue;

        auto & disk_parts = disk_part_map[disk_ptr->getName()];
        auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];

        pool.scheduleOrThrowOnError([&, disk_ptr]()
        {
            for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
            {
                /// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
                if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
                    || it->name() == MergeTreeData::DETACHED_DIR_NAME)
                    continue;

                if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
                    disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
                else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
                {
                    std::unique_lock lock(wal_init_lock);
                    if (write_ahead_log != nullptr)
                        throw Exception(
                            "There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
                            ErrorCodes::CORRUPTED_DATA);

                    write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
                    for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
                        disk_wal_parts.push_back(std::move(part));
                }
                
    ...
    if (num_parts > 0)
        loadDataPartsFromDisk(
            broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);

    if (!parts_from_wal.empty())
        loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
}

一个MergeTree表的In memory是相当有限的,我们重点分析Wide和Compact part的加载,即loadDataPartsFromDisk。每个part的加载流程如下:

  • 加载uuid.txt文件
  • 加载columns.txt文件
  • 加载checksums.txt文件
  • 加载任意mark文件。对于Compact part,只有一个mark文件:data.mrk3
  • 从checksums中获取每个column和secondary index文件的大小。
  • 加载primary.idx文件
  • 加载ttl.txt文件
  • 加载projection parts中的metadata文件。
  • 根据checksums校验metadata和data文件的一致性
  • 加载default_compression_codec.txt文件

注意MergeTree表加载part的过程是并行的,并发度默认为cpu硬件核数。当MergeTree表完成所有part的加载后,便可利用元数据构造的分区索引、主键索引、二级索引、Projection加速查询。这就是为什么clickhouse-server必须等待所有表完成加载后才可对外提供查询服务的原因。

为什么慢

从上面的流程可以看到,加载一个MergeTree part最多读取8个metadata 文件。假如clickhouse示例上有70w part, metadata文件的数量为560w。如果MergeTree还创建了一个或多个projection, metadata文件数量可能会超过1000w。对这么多文件的并发读取会瞬间把disk ioutil打满,disk read成为性能瓶颈。这就是为什么MergeTree启动慢的原因。

启动加速优化

优化方案

优化的核心在于减少disk io次数。因此我们决定引入RocksDB用于缓存metadata file。RocksDB启动之后会自身的LSM文件,文件个数远远少于metadata文件数,文件位置在磁盘上也更加集中。当所有part的metadata都缓存到RocksDB时,clickhouse启动时仅从RocksDB中就可获得所有part的metadata,总体来看disk io的次数大大减小了。

缓存结构

RocksDB中:

  • key: 考虑到同一张表的part可能分布在不同的disk中,所以key由两部分组成,disk name和part相对disk的路径,如default:store/b47/b47ee9c8-afa1-41ac-adfb-2e0dc42de819/6_7_7_0/primary.idx
  • value: metadata文件的内容

缓存一致性

image-20220519145315220

既然用RockDB作为缓存层,那么如何保证缓存层和持久化层也就是磁盘文件的一致性呢?主要分为两大类场景:

  • 读meta:当需要读取某个part的元数据时,首先从缓存层查询,如果缓存未命中, 则读取持久化层的结果,并将该结果更新到缓存层并返回结果。如果缓存命中则直接返回结果。该操作主要发生在clickhouse-server启动阶段
  • 写meta:当需要更新某个part的元数据时,对缓存层和持久化层进行双写。该操作可能发生在插入数据、删除分区、后台数据合并/移动、ALTER TABLE等场景下。

那么是否需要考虑缓存的并发问题呢? 一个cache key唯一对应一个metadata文件,而clickhouse MergeTree的设计已经保证了不可能有多写或读写并发的情况出现。因此在设计缓存层时无需考虑这个问题。

如何检测不一致

增加了一层缓存层,那么

源码走读

总结