mongoshake使用手册

利用mongoshake完成mongodb数据迁移

mongoshake概述

MongoShake是采用GO语言编写的MongoDB数据同步订阅工具,其通过读取oplog操作日志来捕获数据,业务通过消费对应的日志数据来实现不同的场景,例如灾备或多活。项目地址:alibaba/MongoShake

MongoShake将捕获到的日志数据发送给不同的tunnel通道,其类型包含:

  • Direct:直接写入目标MongoDB
  • RPC:通过net/rpc方式连接
  • TCP:通过TCP方式连接
  • File:通过文件的方式关联
  • Kafka:利用kafka作为消息中转
  • Mock:测试试用,不写入tunnel

MongoShake在源端上支持Sharding、replset、mongod三种架构,目标端则支持mongod和mongos。对于replset环境,建议优先从secondary节点读取,减轻primary的压力。

除此之外,MongoShake还具备以下特性:

  • 并行复制:支持并行复制,粒度可为id、collection或auto。
  • HA:定期对上下文进行保存,当服务切换或重启后能够继续提供服务
  • 黑白名单:可以进行db或collection的筛选
  • 压缩:发送日志数据前进行日志压缩,格式支持gzip、zlib或deflate
  • Checkpoint:checkpoint记录了同步位点信息,根据位点信息能够知道当前同步到哪个位置
  • 同步模式:支持全量同步、增量同步和全量+增量

安装配置mongoshake

下载mongoshake二进制包,当前版本为v2.8.4

1
wget https://github.com/alibaba/MongoShake/releases/download/release-v2.8.4-20230425/mongo-shake-v2.8.4.tgz

解压mongoshake安装包

1
tar -xzf mongo-shake-v2.8.4.tgz

修改配置文件(collector.conf)

参数 说明
full_sync.http_port 全量监控端口
incr_sync.http_port 增量监控端口
sync_mode 同步模式:all(全量+增量),full(全量),incr(增量)
mongo_urls 源端mongodb连接串,如果源端为sharding,则配置为后端所有shard副本集连接地址,分号分割
mongo_cs_url 如果源端为sharding,则配置为config节点连接地址
mongo_s_url 如果源端为sharding,则配置为mongos的地址,多个mongos以逗号分割
tunnel 通道模式,可选direct、rpc、tcp、kafka等。direct用于直接写入目标mongodb
tunnel.address 目标端mongodb地址,sharding配置mongos地址,格式与mongo_urls对齐
mongo_connect_mode 连接模式。secondaryPreferred表示优先从secondary拉取
filter.namespace.black 黑名单过滤。可以是db,也可以是db.collection,分号分割
filter.namespace.white 白名单过滤。可以是db,也可以是db.collection,分号分割
filter.ddl_enable 是否开启DDL同步,源端sharding架构暂不支持开启
checkpoint.storage.url checkpoint集合保存地址,默认写入源端
checkpoint.storage.db checkpoint集合的库名
checkpoint.storage.collection checkpoint集合名
checkpoint.start_position 如果checkpoint不存在,则根据指定时间进行增量同步,仅适用于incr_sync.mongo_fetch_method=oplog的场景
transform.namespace namespace对象重命名,例如db1.tab1:db2:tab2
skip.nsshardkey.verify 分片键一致性检查,sharding->sharding场景下,如果分片键不一致检查失败会退出同步
full_sync.reader.collection_parallel 全量同步并发拉取的表数量
full_sync.reader.write_document_parallel 全量同步时同一张表并发写进程数量
full_sync.reader.document_batch_size 全量同步时目标端写入时每个线程的文档数量
full_sync.reader.fetch_batch_size 全量同步时源端每次读取文档数量
full_sync.collection_exist_drop 目标库存在时是否在同步前先删除
full_sync.create_index 全量同步完成后索引的创建方式,none不创建,background后台创建,foreground前台创建
incr_sync.mongo_fetch_method 增量同步模式,可选oplog或change_stream(需要>4.0)
incr_sync.worker 增量同步并行写入线程数
incr_sync.target_delay 配置复制延迟
incr_sync.executor.upsert update语句是否采用upsert选项
incr_sync.executor.insert_on_dup_update 插入数据存在时,是否转换为update

启动mongoshake开启同步

1
./start.sh collector.conf

如需停止同步可执行stop.sh

1
./stop.sh mongoshake.pid

监控同步进程

全量监控

用户可以通过curl对full_sync.http_port的值来进行全量监控,提供了下列接口

  • conf:查看配置文件
  • Progress:查看全量同步进度
  • Index:查看索引同步情况
  • Sentinel:控制全量运行,目前仅支持限速
1
curl -s  http://127.0.0.1:9101/progress | python -m json.tool

如数据库资源负载过高,可进行限速

1
curl -X POST --data '{"TPS":5000}' 127.0.0.1:9101/sentinel/options

查看同步速率

1
curl -s 127.0.0.1:9101/sentinel | python -m json.tool

增量监控

用户可以通过curl对full_sync.http_port的值来进行全量监控,提供了下列接口

  • Conf:查看配置文件信息
  • Repl:复制的整体信息
  • Queue:syncer内部的队列情况
  • Worker: worker内部的情况
  • Executer:写入端的统计信息
  • Persist:内部角色persist的内部情况
  • Sentinel:内部控制情况
1
curl -s http://127.0.0.1:9100/repl | python -m json.tool

除了上诉方式,也可以通过mongoshake-stat --port=9100的方式查看同步状态

参数 说明
logs_get/sec 每秒获取的oplog数量
logs_repl/sec 每秒执行重放的oplog数量
logs_success/sec 每秒重放成功的oplog数量
lsn.time 最后发送oplog的时间
lsn_ack.time 目标端确认写入的时间
lsn_ckpt.time Checkpoint持久化的时间
now.time 当前时间
replset 源数据库副本集名称
tps/sec 每秒TPS数量

存在的问题

源端只暴露mongos地址

当源端为sharding架构,且只能暴露mongos地址时,按文档最佳实践——常见场景配置 · alibaba/MongoShake Wiki · GitHub 配置,同步存在下列问题:

1. checkpoint集合数据不更新

实际使用中发现在全量同步结束后,checkpoint集合会生成一个怪异的TS,根据日志信息该checkpoint转换为时间戳后为2038-01-19 11:14:07,且无论任何环境任何时间生成的checkpoint都相同 mongoshake-error1 并且checkpoint数据不会更新,这样就会导致任务重启后增量同步异常,无法同步数据

猜想时间:源端只配置mongos的情况下,进行ALL模式同步,全量同步结束后内存和checkpoint集合的数据不一致,不重启就没问题,重启后就会读取checkpoint集合里面的数据来恢复,但默认的时间是2038-01-19 11:14:07,导致增量同步获取不到数据

github issues早已有人反馈这个bug,但暂未修复该问题,最新的2.4.8版本依旧存在该问题https://github.com/alibaba/MongoShake/issues/834

根据文档显示,mongoshake对于change stream的实现是通过解析ts来构建resumeToken,我们可以指定一个略早一点的LSN_ACK对应的ts。目前想到的解决方案就是任务重新启动前获取监控或日志信息中的LSN_ACK值来手动更新到checkpoint表中。

1
db.ckpt_default.updateOne({_id:ObjectId("663f1c9038154e5fdea0dcf1")},{$set:{ckpt:NumberLong("7367659501685571585"),fetch_method:"change_stream"}})

手动调整后检查增量同步恢复正常,并且能够自动更新维护checkpoint表了.

2. mongos启动报错

如果mongo_s_url配置了1个以上的mongos地址时,mongoshake启动会报下列错误,只填一个地址则正常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
        panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xa39f17]

goroutine 1 [running]:
main.handleExit()
        /u02/shuntong.zhang/db_src/MongoShake/cmd/collector/collector.go:183 +0x8a
panic(0xc966c0, 0x1373a40)
        /u02/shuntong.zhang/local_bin/go1_15_10/go/src/runtime/panic.go:969 +0x1b9
github.com/alibaba/MongoShake/v2/common.GetDBVersion(0x0, 0x60, 0xd88b21, 0x12, 0x1)
        /u02/shuntong.zhang/db_src/MongoShake/common/db_opertion.go:51 +0x37
main.checkConnection(0x0, 0x0)
        /u02/shuntong.zhang/db_src/MongoShake/cmd/collector/sanitize.go:327 +0x1c9
main.SanitizeOptions(0xc000175ef8, 0xc82080)
        /u02/shuntong.zhang/db_src/MongoShake/cmd/collector/sanitize.go:40 +0x12e
main.main()
        /u02/shuntong.zhang/db_src/MongoShake/cmd/collector/collector.go:59 +0x3ce
- child process[94936] terminated .
- child process killed in 0 seconds , may wrong ! exit 

3. checkpoint.start_position参数无法转换

incr+change_stream配置下,checkpoint.start_position无法正确转换,mongoshake会将时间转换为oplog模式下的时间戳,与change_stream格式下的checkpoint格式不同,导致checkpoint.start_position无效,无法同步启动前的差异数据 mongoshake-error2

如果了解如何转换为change stream的ts,可以手动创建一个checkpoint来完成增量同步。据了解,目前有一个aliyun改写版本优化了,但未公开

comments powered by Disqus