[TOC]

一、介绍

clickhouse社区在21.8版本中引入了clickhouse-keeper, 用于替代zookeeper. clickhouse-keeper是完全兼容zookeeper协议的分布式协调服务。目前该特性处于preproduction状态,官方还在继续完善,想用于生产环境的同学建议还是让子弹飞一会儿

clickhouse-keeper底层依赖raft协议(nuraft库)实现多节点之间状态的线性一致性。而zookeeper则使用ZAB保障一致性

1.1 clickhouse-keeper相比zookeeper的优点

为什么要引入clickhouse-keeper呢?主要是ck使用zookeeper有着众多痛点,

  • 使用java开发
  • 运维不便
  • 要求独立部署
  • zxid overflow问题
  • snapshot和log没有经过压缩
  • 不支持读的线性一致性

而keeper存在着以下优点:

  • 使用c++开发,技术栈与ck统一
  • 即可独立部署,又可集成到ck中
  • 没有zxid overflow问题
  • 读性能更好,写性能相当
  • 支持对snapshot和log的压缩和校验
  • 支持读写的线性一致性

1.2 如何部署

部署clickhouse-keeper应该注意以下几点

  • 使用ssd存储append log
  • 整个集群的节点数量不超过9
  • 一次最多修改一个节点的config
  • 重要参数:heart_beat_interval_ms/election_timeout_lower_bound_ms/election_timeout_upper_bound_ms/can_become_leader/priority/quorum_reads, 具体含义参考:https://clickhouse.com/docs/zh/operations/clickhouse-keeper/

clickhosue-keeper有以下几种不同的部署方式

1 独立部署:类似zookeeper的部署方式,整个clickhouse集群依赖一个独立的clickhouse-keeper集群。

图片

2 每个shard一组keeper: 此时keeper并不独立部署,而是嵌在ck进程中。

图片

3 所有shard共享一组keeper: 此时keeper并不独立部署,而是嵌在ck进程中。

图片

1.3 如何迁移

既然clichouse-keeper优势这么明显,那么如何将zookeeper中的数据迁移到keeper中呢?官方提供了迁移工具clickhouse-keeper-converter, 它能够将zk中的数据dump成keeper能够加载的snapshot。

迁移步骤如下

  • 停止所有zk节点
  • 找到zk leader节点
  • 重启zk leader节点,并再次停止(这一步是为了让leader节点生成一份snapshot)
  • 运行clickhouse-keeper-converter,生成keeper的snapshot文件
  • 启动keeper, 使其加载上一步中的snapshot

二、源码走读

以下仅分析keeper作为独立进程启动时的相关代码

入口:mainEntryClickHouseKeeper → Keeper::main → KeeperTCPHandler::runImpl

2.1 KeeperTCPHandler

KeeperTCPHandler是Keeper中注册的tcp请求回调。

  • keeper端调用KeeperTCPHandlerFactory::createConnection创建连接

  • keeper端接收client端的handshake请求,格式:handshake_length(4B) | protocol_version(4B) | last_zxid_seen(8B) | timeout_ms(4B)

  • keeper端校验handshake成功后,并为其分配sessionid(KeeperDispatcher::getSessionID), 发送handshake响应,格式:SERVER_HANDSHAKE_LENGTH(4B) | protocol_version(4B) | session_timeout(4B) | session_id(8B) | password

  • keeper端将session_id 注册到KeeperDispatcher中(KeeperDispatcher::registerSession)

  • keeper端读取client端请求(KeeperTCPHandler::receiveRequest), 协议:length(4B) | xid (4B) | opnum(4B) | data data部分的格式由请求类型决定。详情见:ZooKeeperRequestFactory::ZooKeeperRequestFactory

    • 如果请求类型为close, 退出当前io线程
    • 如果请求类型为heart beat, 重置session timeout 计时器
    • 如果是其他普通类型的请求,从response队列中取出返回数据,发送给client端

2.2 KeeperDispatcher

KeeperTCPHandler中依赖KeeperDispatcher实现对client端请求的处理,同时保持了keeper集群中状态的线性一致。

2.2.1 初始化

dispatcher启动时会在后台生成三个线程(Keeper::main -> Context::initializeKeeperDispatcher -> KeeperDispatcher::initialize):

  • requestThread: 从request queue中取出request, 交给KeeperServer处理(KeeperServer::putRequestBatch),得到response, 放入response queue
  • responseThread: 从response queue中拿到response, 调用对应session id的callback(session_to_response_callback), 将响应数据写入到tcp发送缓冲区中。
  • sessionCleanerTask: 从KeeperKeeper找到不活跃的session id(KeeperServer::getDeadSessions), 从内存中删除(KeeperDispatcher::finishSession)
  • snapshotThread: 监听snapshot queue,调用callback制作快照(KeeperStateMachine::create_snapshot)。制作快照的时机由nuraft lib决定。

2.3 KeeperServer

KeeperServer基于nuraft实现了一个完整的raft实例。

KeeperServer = KeeperStateMachine + KeeperStateManager + KeeperLogStore + nuraft::raft_server + nuraft::asio_service

在nuraft库中,总共包含5个模块:

  • raft server: 协调处理来自client和其他raft实例的请求和响应
  • asio layer: 负责与client/其他node的网络交互,定时器,线程池
  • log store: 负责raft logs的读、写、合并
  • state machine: raft上层的状态机,负责事务的提交和回滚,以及快照管理
  • state manager: 存储和加载集群配置和状态

其中 log store/state machine/state manager都需要用户自行实现。

因此在clickhouse-keeper中,KeeperLogStore通过继承实现了log store模块,KeeperStateMachine实现了state machine, KeeperStateManager实现了state manager

而在test-keeper中,则通过SummingStateMachine实现了state machine, InMemoryLogStore实现了log store. 而test-keeper主要作用是在clickhouse的集成测试中替代zookeeper

2.3.1 KeeperLogStore

我们知道,在raft算法中, leader通过append entry到log实现leader和follower之间的replication.

clickhouse-keeper中,单个append entry的内存表示如下:

struct` `ChangelogRecord``{``  ``ChangelogRecordHeader header;``  ``nuraft::ptr blob;``};` `struct` `ChangelogRecordHeader``{``  ``ChangelogVersion version = CURRENT_CHANGELOG_VERSION;``  ``uint64_t index = 0; ``/// entry log number``  ``uint64_t term = 0;``  ``nuraft::log_val_type value_type{};``  ``uint64_t blob_size = 0;``};

append entry序列化到磁盘上的格式:

checksum(8B) | version(1B) | index(8B) | term(8B) | value_type(4B) | blob_size(8B) | blob(blob_size B)

KeeperLogStore继承自nuraft::log_store

实现了append entry的:

  • 写:追加写(KeeperLogStore::append) 覆盖写(KeeperLogStore::write_at) 批量写(KeeperLogStore::apply_pack) 同步(KeeperLogState::flush)
  • 读:随机读(entry_at) 顺序读(last_entry) 批量读(KeeperLogStore::log_entries)
  • 合并: compact。为什么要对log进行compact呢?因为KeeperStateMachine中生成了snapshot之后, snapshot中up_to_log_idx为最大的已提交log index. 那么log store中up_to_log_idx(included)之前的entry都可以从磁盘上删除。

2.3.2 KeeperStateMachine

继承自nuraft::state_machine, 实现了以下接口:

  • commit: 将某个log entry提交到状态机上(KeeperStorage::processRequest)

  • last_commit_index: 返回最近提交的log entry index

  • apply_snapshot: 将最新的snapshot引用到内部状态机上(KeeperSnapshotManager::deserializeSnapshotFromBuffer)

  • pre_commit: 无论在leader还是follower上,pre_commit都发生在append entry到log store之后,state machine commit之前。在这里为空实现

  • rollback: 发生在follower发现leader发送过来的append entry与自己冲突时,此时follower会先对state machine进行回滚,然后再truncate append entry直到与leader一致,最后再写入leader发送过来的append entry。这里为空实现。

  • last_snapshot: 获取最近的snapshot的元数据

  • create_snapshot: 在内存中创建snapshot, 并发送到异步队列,由KeeperDispatcher中的snapshotThread后台线程异步写入磁盘。

  • read_logical_snp_obj:leader发送snapshot给落后较多的follower时,会调用该接口。对于一个snapshot,可分多次调用该接口。第一次调用时,obj_id必须是零,此时data_out为空。snapshot数据在第二次调用及以后才会输出。

  • save_logical_snp_obj: follower接受leader发送过来的snapshot时,会调用该接口。分多次调用,同样的,第一次调用时,obj_id = 0,表示snapshot传输的开始。第二次调用时,读取snapshot数据到内存,并序列化到磁盘。详情见:https://github.com/eBay/NuRaft/blob/master/docs/snapshot_transmission.md

KeeperStateMachine = KeeperStorage + KeeperSnapshotManager

其中KeeperStorage实现了同Zookeeper相同的内部状态, KeeperSnapshotManager实现了对多个snapshot的管理,它支持snapshot的序列化和反序列化

当clickhouse-keeper启动时,KeeperStateMachine又是如何从磁盘上初始化的呢?首先生成了KeeperSnapshotManager对象,然后从其中读取最新的snapshot, 并反序列化到KeeperStorage中。这样KeeperStateMachine就有了一个初始状态

2.3.2.1 KeeperStorage

所有的数据都在内存中,是类似zk的状态机。实现了类似zk的所有逻辑操作、会话管理。它的数据主要包含几部分:

  • container: 使用SnapshotableHashTable来存储节点数据, 是一个可生成快照的哈希表,key为zk path, value为zk node(包含data, acl_id, children等)。SnapshotableHashTable由一个list和map组成,map的value是list iterator, list包含了多个版本的kv数据,而map反应了最新版本的kv视图

  • ephemerals: 用于存储每个session对应的临时节点路径

  • sessions_and_watchers: 用于存储每个session监听的节点路径

  • session_and_timeout:每个session对应的超时时间

  • acl_map:acl_id到acl的映射关系

  • watches:当前活跃监听。key: 节点路径,value: list of subscribe session

  • list_watches: 同上,区别在于是否监听子节点

初始化流程:

  • 没有快照时,在container中插入root path: “/”
  • 有快照时,从快照中加载 (KeeperStateMachine::init → KeeperSnapshotManager::deserializeSnapshotFromBuffer)

处理请求流程:

  • 如果是close请求。从storage中清理掉所有相关session, watch,并返回response
  • 如果是heartbeat请求。空处理
  • 其他常规请求。检查auth(如果要求的话),处理请求,添加watch(如果有的话),返回

退出流程:

  • 从内存中删除临时节点和watch相关数据结构

2.3.2.2 KeeperSnapshotManager

管理所有快照文件,支持快照的序列化(到磁盘)和反序列化(到内存缓冲区)

初始化:给定snapshot目录,执行list directory得到所有snapshot开头的快照文件,加载到existing_snapshots中

序列化到磁盘:serializeSnapshotToBuffer + serializeSnapshotBufferToDisk 。输入KeeperStorage, 输出snapshot文件

反序列化到内存:deserializeSnapshotBufferFromDisk + deserializeSnapshotFromBuffer。输入snapshot文件,输出KeeperStorage

KeeperStorage序列化之后的格式: snapshot_version | snapshot_meta | session_id | acl_map | container | session_and_timeout 详情见:KeeperStorageSnapshot::serialize

2.3.3 KeeperStateManager

实现了raft集群配置和状态的存储和加载。

初始化流程:

  • 加载clickhouse-keeper配置文件
  • 从配置生成KeeperLogStore对象
  • 从配置中确定本地raft server/port, 以及cluster_config
  • KeeperLogStore加载本地log文件,起点为KeeperStateMachine中的last_committed_idx

三、参考

altiny ppt: https://www.slideshare.net/Altinity/clickhouse-keeper

clickhouse-keeper文档:https://clickhouse.com/docs/zh/operations/clickhouse-keeper/

nuraft文档:https://github.com/eBay/NuRaft/tree/master/docs