mongoshake概述
MongoShake是采用GO语言编写的MongoDB数据同步订阅工具,其通过读取oplog操作日志来捕获数据,业务通过消费对应的日志数据来实现不同的场景,例如灾备或多活。项目地址:alibaba/MongoShake
MongoShake将捕获到的日志数据发送给不同的tunnel通道,其类型包含:
- Direct:直接写入目标MongoDB
- RPC:通过net/rpc方式连接
- TCP:通过TCP方式连接
- File:通过文件的方式关联
- Kafka:利用kafka作为消息中转
- Mock:测试试用,不写入tunnel
MongoShake在源端上支持Sharding、replset、mongod三种架构,目标端则支持mongod和mongos。对于replset环境,建议优先从secondary节点读取,减轻primary的压力。
除此之外,MongoShake还具备以下特性:
- 并行复制:支持并行复制,粒度可为id、collection或auto。
- HA:定期对上下文进行保存,当服务切换或重启后能够继续提供服务
- 黑白名单:可以进行db或collection的筛选
- 压缩:发送日志数据前进行日志压缩,格式支持gzip、zlib或deflate
- Checkpoint:checkpoint记录了同步位点信息,根据位点信息能够知道当前同步到哪个位置
- 同步模式:支持全量同步、增量同步和全量+增量
安装配置mongoshake
下载mongoshake二进制包,当前版本为v2.8.4
|
|
解压mongoshake安装包
|
|
修改配置文件(collector.conf)
参数 | 说明 |
---|---|
full_sync.http_port | 全量监控端口 |
incr_sync.http_port | 增量监控端口 |
sync_mode | 同步模式:all(全量+增量),full(全量),incr(增量) |
mongo_urls | 源端mongodb连接串,如果源端为sharding,则配置为后端所有shard副本集连接地址,分号分割 |
mongo_cs_url | 如果源端为sharding,则配置为config节点连接地址 |
mongo_s_url | 如果源端为sharding,则配置为mongos的地址,多个mongos以逗号分割 |
tunnel | 通道模式,可选direct、rpc、tcp、kafka等。direct用于直接写入目标mongodb |
tunnel.address | 目标端mongodb地址,sharding配置mongos地址,格式与mongo_urls对齐 |
mongo_connect_mode | 连接模式。secondaryPreferred表示优先从secondary拉取 |
filter.namespace.black | 黑名单过滤。可以是db,也可以是db.collection,分号分割 |
filter.namespace.white | 白名单过滤。可以是db,也可以是db.collection,分号分割 |
filter.ddl_enable | 是否开启DDL同步,源端sharding架构暂不支持开启 |
checkpoint.storage.url | checkpoint集合保存地址,默认写入源端 |
checkpoint.storage.db | checkpoint集合的库名 |
checkpoint.storage.collection | checkpoint集合名 |
checkpoint.start_position | 如果checkpoint不存在,则根据指定时间进行增量同步,仅适用于incr_sync.mongo_fetch_method=oplog的场景 |
transform.namespace | namespace对象重命名,例如db1.tab1:db2:tab2 |
skip.nsshardkey.verify | 分片键一致性检查,sharding->sharding场景下,如果分片键不一致检查失败会退出同步 |
full_sync.reader.collection_parallel | 全量同步并发拉取的表数量 |
full_sync.reader.write_document_parallel | 全量同步时同一张表并发写进程数量 |
full_sync.reader.document_batch_size | 全量同步时目标端写入时每个线程的文档数量 |
full_sync.reader.fetch_batch_size | 全量同步时源端每次读取文档数量 |
full_sync.collection_exist_drop | 目标库存在时是否在同步前先删除 |
full_sync.create_index | 全量同步完成后索引的创建方式,none不创建,background后台创建,foreground前台创建 |
incr_sync.mongo_fetch_method | 增量同步模式,可选oplog或change_stream(需要>4.0) |
incr_sync.worker | 增量同步并行写入线程数 |
incr_sync.target_delay | 配置复制延迟 |
incr_sync.executor.upsert | update语句是否采用upsert选项 |
incr_sync.executor.insert_on_dup_update | 插入数据存在时,是否转换为update |
启动mongoshake开启同步
|
|
如需停止同步可执行stop.sh
|
|
监控同步进程
全量监控
用户可以通过curl对full_sync.http_port的值来进行全量监控,提供了下列接口
- conf:查看配置文件
- Progress:查看全量同步进度
- Index:查看索引同步情况
- Sentinel:控制全量运行,目前仅支持限速
|
|
如数据库资源负载过高,可进行限速
|
|
查看同步速率
|
|
增量监控
用户可以通过curl对full_sync.http_port的值来进行全量监控,提供了下列接口
- Conf:查看配置文件信息
- Repl:复制的整体信息
- Queue:syncer内部的队列情况
- Worker: worker内部的情况
- Executer:写入端的统计信息
- Persist:内部角色persist的内部情况
- Sentinel:内部控制情况
|
|
除了上诉方式,也可以通过mongoshake-stat --port=9100
的方式查看同步状态
参数 | 说明 |
---|---|
logs_get/sec | 每秒获取的oplog数量 |
logs_repl/sec | 每秒执行重放的oplog数量 |
logs_success/sec | 每秒重放成功的oplog数量 |
lsn.time | 最后发送oplog的时间 |
lsn_ack.time | 目标端确认写入的时间 |
lsn_ckpt.time | Checkpoint持久化的时间 |
now.time | 当前时间 |
replset | 源数据库副本集名称 |
tps/sec | 每秒TPS数量 |
存在的问题
源端只暴露mongos地址
当源端为sharding架构,且只能暴露mongos地址时,按文档最佳实践——常见场景配置 · alibaba/MongoShake Wiki · GitHub 配置,同步存在下列问题:
1. checkpoint集合数据不更新
实际使用中发现在全量同步结束后,checkpoint集合会生成一个怪异的TS,根据日志信息该checkpoint转换为时间戳后为2038-01-19 11:14:07,且无论任何环境任何时间生成的checkpoint都相同 并且checkpoint数据不会更新,这样就会导致任务重启后增量同步异常,无法同步数据
猜想时间:源端只配置mongos的情况下,进行ALL模式同步,全量同步结束后内存和checkpoint集合的数据不一致,不重启就没问题,重启后就会读取checkpoint集合里面的数据来恢复,但默认的时间是2038-01-19 11:14:07,导致增量同步获取不到数据
github issues早已有人反馈这个bug,但暂未修复该问题,最新的2.4.8版本依旧存在该问题https://github.com/alibaba/MongoShake/issues/834
根据文档显示,mongoshake对于change stream的实现是通过解析ts来构建resumeToken,我们可以指定一个略早一点的LSN_ACK对应的ts。目前想到的解决方案就是任务重新启动前获取监控或日志信息中的LSN_ACK值来手动更新到checkpoint表中。
|
|
手动调整后检查增量同步恢复正常,并且能够自动更新维护checkpoint表了.
2. mongos启动报错
如果mongo_s_url配置了1个以上的mongos地址时,mongoshake启动会报下列错误,只填一个地址则正常
|
|
3. checkpoint.start_position参数无法转换
incr+change_stream配置下,checkpoint.start_position无法正确转换,mongoshake会将时间转换为oplog模式下的时间戳,与change_stream格式下的checkpoint格式不同,导致checkpoint.start_position无效,无法同步启动前的差异数据
如果了解如何转换为change stream的ts,可以手动创建一个checkpoint来完成增量同步。据了解,目前有一个aliyun改写版本优化了,但未公开