MongoShake数据同步&迁移工具

一、简介

MongoShake是一个以golang语言进行编写的通用的平台型服务,通过读取MongoDB集群的Oplog操作日志,对MongoDB的数据进行复制,后续通过操作日志实现特定需求。日志可以提供很多场景化的应用,为此,我们在设计时就考虑了把MongoShake做成通用的平台型服务。通过操作日志,我们提供日志数据订阅消费PUB/SUB功能,可通过SDK、Kafka、MetaQ等方式灵活对接以适应不同场景(如日志订阅、数据中心同步、Cache异步淘汰等)。集群数据同步是其中核心应用场景,通过抓取oplog后进行回放达到同步目的,实现灾备和多活的业务场景。

1.1 应用场景举例

  1. MongoDB集群间数据的异步复制,免去业务双写开销。
  2. MongoDB集群间数据的镜像备份(当前1.0开源版本支持受限)
  3. 日志离线分析
  4. 日志订阅
  5. 数据路由。根据业务需求,结合日志订阅和过滤机制,可以获取关注的数据,达到数据路由的功能。
  6. Cache同步。日志分析的结果,知道哪些Cache可以被淘汰,哪些Cache可以进行预加载,反向推动Cache的更新。
  7. 基于日志的集群监控

1.2 功能介绍

MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。现有通道类型有:
- Direct:直接写入目的MongoDB
- RPC:通过net/rpc方式连接
- TCP:通过tcp方式连接
- File:通过文件方式对接
- Kafka:通过Kafka方式对接
- Mock:用于测试,不写入tunnel,抛弃所有数据

消费者可以通过对接tunnel通道获取关注的数据,例如对接Direct通道直接写入目的MongoDB,或者对接RPC进行同步数据传输等。此外,用户还可以自己创建自己的API进行灵活接入。

MongoShake对接的源数据库支持单个mongod,replica set和sharding三种模式。目的数据库支持mongod和mongos。如果源端数据库为replica set,我们建议对接备库以减少主库的压力;如果为sharding模式,那么每个shard都将对接到MongoShake并进行并行抓取。对于目的库来说,可以对接多个mongos,不同的数据将会哈希后写入不同的mongos。

二、 安装及配置

2.1 下载

wget https://github.com/alibaba/MongoShake/releases/download/release-v2.6.5-20210630/mongo-shake-v2.6.5.tar.gz;
tar zxvf mongo-shake-v2.6.5.tar.gz;
cd mongo-shake-v2.6.5;

2.2 修改配置

collector.conf:

# 同步模式,all表示全量+增量同步,full表示全量同步,incr表示增量同步。
sync_mode = all
# 源MongoDB连接串信息,逗号分隔同一个副本集内的结点,分号分隔分片sharding实例,免密模式
# 可以忽略“username:password@”,注意,密码里面不能含有'@'符号。
# 分片集:
mongo_urls = mongodb://192.168.121.221:27001,192.168.121.222:27001;mongodb://192.168.121.222:27002,192.168.121.223:27002;mongodb://192.168.121.223:27003,192.168.121.221:27003
# 如果源端是sharding,此处需要配置源端sharding的cs的地址
mongo_cs_url = mongodb://192.168.121.221:21000,192.168.121.222:21000,192.168.121.223:21000
# 如果源端采用change stream拉取,这里还需要配置至少一个mongos的地址,多个mongos地址以逗号(,)分割
mongo_s_url = mongodb://192.168.121.221:20000,192.168.121.222:20000,192.168.121.223:20000
#目的端连接串信息,分号分割不同的mongos。
tunnel.address = mongodb://192.168.121.224:20000;192.168.121.225:20000;192.168.121.226:20000
# checkpoint存储信息,用于支持断点续传。可以为第三方的,默认不用填写
MongoDB
checkpoint.storage.url =
# 内部发送的worker数目,如果机器性能足够,可以提高worker个数。
incr_sync.worker = 2
# 对于目的端是kafka等非direct tunnel,启用多少个序列化线程,必须为"incr_sync.worker"的倍数。
# 默认为"incr_sync.worker"的值。
incr_sync.tunnel.write_thread = 4

2.3 插入测试数据

mongos> for(i=0;i<10000000;i++){    db.log.insert({name:'nametest',age:i})     }

2.4 启动同步

./collector.linux -conf=collector.conf



当出现如下日志时,即代表全量数据同步已完成,并进入增量数据同步模式。


如果同步过程中源集群有大量写入,就有有失败的提示

同步过程中源集群监控

目标集群其他节点

目标集群mongoshake节点

2.5 监控MongoShake状态

./mongoshake-stat --port=9100

2.6 校验

yum install pymongo -y;
./comparison.py --src="192.168.121.223:20000" --dest="192.168.121.226:20000" --count=10000000 --excludeDbs=admin,local,mongoshake --excludeCollections=system.profile --comparisonMode=all

THE END
分享
二维码
< <上一篇
下一篇>>
文章目录
关闭
目 录