一、MongoShake简介

MongoShake是阿里云以Golang语言编写的通用平台型服务工具,它通过读取MongoDB的Oplog操作日志来复制MongoDB的数据以实现特定需求。

MongoShake还提供了日志数据的订阅和消费功能,可通过SDK、Kafka、MetaQ等方式的灵活对接,适用于日志订阅、数据中心同步、Cache异步淘汰等场景。

在这里插入图片描述

在这里插入图片描述

下载地址:

软件目录结构:

[root@dn5 software]# tree mongoshake-v2.4.16
mongoshake
├── ChangeLog
├── collector.conf
├── collector.conf.db2db
├── collector.darwin
├── collector.linux
├── collector.windows
├── comparison.py
├── diagnostic
│   └── mongoshake.testrs.journal
├── hypervisor
├── logs
│   ├── collector.log
│   ├── collector.log.1
│   ├── mongoshake.log
│   ├── mongoshake.log.1
│   ├── mongoshake.log.2
│   ├── mongoshake.log.3
│   ├── mongoshake.log.4
│   ├── mongoshake.log.5
│   ├── mongoshake.log.6
│   ├── mongoshake.log.7
│   ├── receiver.log
│   └── receiver.log.1
├── mongoshake.pid
├── mongoshake-stat
├── receiver.conf
├── receiver.darwin
├── receiver.linux
├── receiver.windows
├── start.sh
└── stop.sh

2 directories, 30 files

二、MongoShake支持的数据源

源数据库目标数据库
ECS上的自建MongoDB数据库ECS上的自建MongoDB数据库
本地自建的MongoDB数据库本地自建的MongoDB数据库
阿里云MongoDB实例阿里云MongoDB实例
第三方云MongoDB数据库第三方云MongoDB数据库

重要说明

  • 1、根据如上表格,同步数据库的source 与 target的库类型需保持一致。不同类型的数据源(如云MongoDB同步至本地自建MongoDB),在做数据同步时, 是不支持的,可能不成功。已知华为云MongoDB ReplicaSet集群同步至本地自建数据库会报错,报错类型: Oplog Tailer initialize failed: no oplog ns in mongo。
  • 2、本地单机版MongoDB不会触发oplog的生成, 故源端MongoDB必须为集群版本.

如果是安装的单机版,则在启动mongoshake时,报如下错误:

[root@cdh-test5 mongoshake]# ./collector.linux -conf=collector.conf -verbose
[2022/04/21 11:08:12 CST] [WARN] 
______________________________
\                             \           _         ______ |
 \                             \        /   \___-=O'/|O'/__|
  \  MongoShake, Here we go !!  \_______\          / | /    )
  /                             /        '/-==__ _/__|/__=-|  -GM
 /        Alibaba Cloud        /         *             \ | |
/                             /                        (o)
------------------------------

if you have any problem, please visit https://github.com/alibaba/MongoShake/wiki/FAQ

[2022/04/21 11:08:12 CST] [INFO] New session to mongodb://127.0.0.1:40001 successfully
[2022/04/21 11:08:12 CST] [INFO] Close session with mongodb://127.0.0.1:40001
[2022/04/21 11:08:12 CST] [INFO] New session to mongodb://127.0.0.1:40001 successfully
[2022/04/21 11:08:12 CST] [CRIT] There has no oplog collection in mongo db server
[2022/04/21 11:08:12 CST] [INFO] Close session with mongodb://127.0.0.1:40001
[11:08:12 CST 2022/04/21] [CRIT] (mongoshake/collector/coordinator.(*ReplicationCoordinator).sanitizeMongoDB:134) There has no oplog collection in mongo db server
no oplog ns in mongo. See https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-solve-the-oplog-tailer-initialize-failed-no-oplog-ns-in-mongo-error
[2022/04/21 11:08:12 CST] [CRIT] run replication failed: no oplog ns in mongo. See https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-solve-the-oplog-tailer-initialize-failed-no-oplog-ns-in-mongo-error
[2022/04/21 11:08:12 CST] [WARN] 
                ##### | #####
Oh we finish ? # _ _ #|# _ _ #
               #      |      #
         |       ############
                     # #
  |                  # #
                    #   #
         |     |    #   #      |        |
  |  |             #     #               |
         | |   |   # .-. #         |
                   #( O )#    |    |     |
  |  ################. .###############  |
   ##  _ _|____|     ###     |_ __| _  ##
  #  |                                |  #
  #  |    |    |    |   |    |    |   |  #
   ######################################
                   #     #
                    #####

[11:08:12 CST 2022/04/21] [CRIT] (main.startup:139) run replication failed: no oplog ns in mongo. See https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-solve-the-oplog-tailer-initialize-failed-no-oplog-ns-in-mongo-error
[root@cdh-test5 mongoshake]# 

MongoDB集群版的安装, 请参考笔者另一篇文章:

三、MongoShake初探

这里仅演示使用 MongoShake, 实现MongoDB数据实时导入Kafka中.

在 MongoShake 中实现该功能比较简单, 只需要配置 collector(控制器,实现oplog数据的连接) 与 receiver(接收器,实现oplog的重放、解析、导入到下游的sink即kafka里) 即可.

3.1 collector侧配置 & 启动

1). collector侧配置

  1. collector.conf
  1 conf.version = 6
  2 # 增量获取
  3 sync_mode=incr
  4 #mongo_urls=mongodb://127.0.0.1:40001
  5 mongo_urls=mongodb://111.111.111.111:40001,111.111.111.112:40002,111.111.111.113:40003
  6 
  7 tunnel = kafka
  8 tunnel.address = mongo_shake_test@dn3.test.com:9092,dn4.test.com:9092,dn5.test.com:9092
  9 tunnel.message = json
 10 # 工作线程数,receiver.conf中的replayer oplog重放工作线程数应与此保持一致
 11 incr_sync.worker = 8
 12 incr_sync.mongo_fetch_method = oplog
  1. collector侧启动
[root@dn5 bin]# ./collector.linux -conf=collector.conf -verbose


[2022/04/21 20:54:23 CST] [WARN] 
______________________________
\                             \           _         ______ |
 \                             \        /   \___-=O'/|O'/__|
  \  MongoShake, Here we go !!  \_______\          / | /    )
  /                             /        '/-==__ _/__|/__=-|  -GM
 /        Alibaba Cloud        /         *             \ | |
/                             /                        (o)
------------------------------

if you have any problem, please visit https://github.com/alibaba/MongoShake/wiki/FAQ

[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] Collector startup. shard_by[collection] gids[[]]
[2022/04/21 20:54:23 CST] [INFO] Collector configuration {"ConfVersion":6,"Id":"mongoshake","MasterQuorum":false,"FullSyncHTTPListenPort":9101,"IncrSyncHTTPListenPort":9100,"SystemProfilePort":9200,"LogLevel":"info","LogDirectory":"","LogFileName":"mongoshake.log","LogFlush":false,"SyncMode":"incr","MongoUrls":["mongodb://111.111.111.111:40001"],"MongoCsUrl":"","MongoSUrl":"","MongoConnectMode":"secondaryPreferred","Tunnel":"kafka","TunnelAddress":["mongo_shake_test@dn3.testhdp.com:9092,dn4.testhdp.com:9092,dn5.testhdp.com:9092"],"TunnelMessage":"json","FilterNamespaceBlack":null,"FilterNamespaceWhite":null,"FilterPassSpecialDb":null,"FilterDDLEnable":false,"CheckpointStorageUrl":"mongodb://111.111.111.111:40001","CheckpointStorageDb":"mongoshake","CheckpointStorageCollection":"ckpt_default","CheckpointStartPosition":1,"TransformNamespace":null,"FullSyncReaderCollectionParallel":6,"FullSyncReaderWriteDocumentParallel":8,"FullSyncReaderReadDocumentCount":0,"FullSyncReaderDocumentBatchSize":128,"FullSyncCollectionDrop":false,"FullSyncCreateIndex":"foreground","FullSyncReaderOplogStoreDisk":false,"FullSyncReaderOplogStoreDiskMaxSize":256000,"FullSyncExecutorInsertOnDupUpdate":false,"FullSyncExecutorFilterOrphanDocument":false,"FullSyncExecutorMajorityEnable":false,"IncrSyncMongoFetchMethod":"oplog","IncrSyncChangeStreamWatchFullDocument":false,"IncrSyncOplogGIDS":null,"IncrSyncShardKey":"collection","IncrSyncShardByObjectIdWhiteList":null,"IncrSyncWorker":8,"IncrSyncTargetDelay":0,"IncrSyncWorkerOplogCompressor":"none","IncrSyncWorkerBatchQueueSize":64,"IncrSyncAdaptiveBatchingMaxSize":1024,"IncrSyncFetcherBufferCapacity":256,"IncrSyncExecutorUpsert":false,"IncrSyncExecutorInsertOnDupUpdate":false,"IncrSyncConflictWriteTo":"none","IncrSyncExecutorMajorityEnable":false,"CheckpointStorage":"database","CheckpointInterval":5000,"FullSyncExecutorDebug":false,"IncrSyncDBRef":false,"IncrSyncExecutor":1,"IncrSyncExecutorDebug":false,"IncrSyncReaderDebug":"","IncrSyncCollisionEnable":false,"IncrSyncReaderBufferTime":1,"Version":"improve-2.4.16,7bd0a9be075037779760fe8704ea2e95d25d908b,release,go1.10.3,2020-11-04_14:33:56","IncrSyncTunnel":"","IncrSyncTunnelAddress":null,"IncrSyncTunnelMessage":"","HTTPListenPort":0,"SystemProfile":0}
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] Close session with mongodb://111.111.111.111:40001
[2022/04/21 20:54:23 CST] [INFO] all node timestamp map: map[testrs:{7088979101280632833 7089039621664800769}]
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] testrs Regenerate checkpoint but won't persist. content: {"name":"testrs","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] testrs checkpoint using mongod/replica_set: {"name":"testrs","ckpt":1,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}, ckptRemote set? [false]
[2022/04/21 20:54:23 CST] [INFO] sync mode run incr
[2022/04/21 20:54:23 CST] [INFO] start running with mode[incr], fullBeginTs[0[0, 0]]
[2022/04/21 20:54:23 CST] [INFO] start incr replication
[2022/04/21 20:54:23 CST] [INFO] RealSourceIncrSync[0]: url[mongodb://111.111.111.111:40001], name[testrs], startTimestamp[4294967296]
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-0 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-1 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-2 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-3 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-4 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-5 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-6 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-7 start working with jobs batch queue. buffer capacity 64
[2022/04/21 20:54:23 CST] [CRIT] start incr sync server with port[9100] failed: listen tcp :9100: bind: address already in use
[2022/04/21 20:54:23 CST] [INFO] Syncer[testrs] poll oplog syncer start. ckpt_interval[5000ms], gid[[]], shard_key[collection]
[20:54:23 CST 2022/04/21] [CRIT] (mongoshake/collector/coordinator.(*ReplicationCoordinator).startOplogReplication.func1:69) start incr sync server with port[9100] failed: listen tcp :9100: bind: address already in use
[2022/04/21 20:54:23 CST] [INFO] Oplog sync[testrs] create checkpoint manager with url[mongodb://111.111.111.111:40001] table[mongoshake.ckpt_default] start-position[4294967296[1, 0]]
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [INFO] testrs Regenerate checkpoint but won't persist. content: {"name":"testrs","ckpt":4294967296,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] load checkpoint value: {"name":"testrs","ckpt":4294967296,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] persister replset[testrs] update fetch status to: store memory and apply
[2022/04/21 20:54:23 CST] [INFO] testrs Regenerate checkpoint but won't persist. content: {"name":"testrs","ckpt":4294967296,"version":2,"fetch_method":"","oplog_disk_queue":"","oplog_disk_queue_apply_finish_ts":1}
[2022/04/21 20:54:23 CST] [INFO] set query timestamp: 4294967296[1, 0]
[2022/04/21 20:54:23 CST] [INFO] start fetcher with src[mongodb://111.111.111.111:40001] replica-name[testrs] query-ts[4294967296[1, 0]]
[2022/04/21 20:54:23 CST] [INFO] oplogReader[src:mongodb://111.111.111.111:40001 replset:testrs] ensure network
[2022/04/21 20:54:23 CST] [INFO] New session to mongodb://111.111.111.111:40001 successfully
[2022/04/21 20:54:23 CST] [WARN] oplog_reader current starting point[4294967296[1, 0]] is smaller than the oldest timestamp[7088979101280632833[1650531567, 1]]!
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-3 transfer retransmit:false send [1] logs. reply_acked [7088991015519911938[1650534341, 2]], list_unack [0] 
[2022/04/21 20:54:23 CST] [INFO] Collector-worker-6 transfer retransmit:false send [1] logs. reply_acked [7088984680443150338[1650532866, 2]], list_unack [0] 
[2022/04/21 20:54:24 CST] [INFO] Collector-worker-6 transfer retransmit:false send [1] logs. reply_acked [7089039059024084993[1650545527, 1]], list_unack [0] 
[2022/04/21 20:54:28 CST] [INFO] [name=testrs, stage=incr, get=1422, filter=1419, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:33 CST] [INFO] [name=testrs, stage=incr, get=1423, filter=1420, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:38 CST] [INFO] [name=testrs, stage=incr, get=1423, filter=1420, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:43 CST] [INFO] [name=testrs, stage=incr, get=1424, filter=1421, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:48 CST] [INFO] [name=testrs, stage=incr, get=1424, filter=1421, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:53 CST] [INFO] [name=testrs, stage=incr, get=1426, filter=1422, worker_consume=3, worker_apply=3, worker_failed_times=0, success=3, tps=0, ckpt_times=0, retransimit_times=0, tunnel_traffic=470.00B, lsn_ckpt={0[0, 0], 1970-01-01 08:00:00}, lsn_ack={7089039059024084993[1650545527, 1], 2022-04-21 20:52:07}]]
[2022/04/21 20:54:54 CST] [INFO] Syncer[testrs] batcher flushes cached oplog
[2022/04/21 20:54:54 CST] [INFO] Collector-worker-6 transfer retransmit:false send [1] logs. reply_acked [7089039767693688833[1650545692, 1]], list_unack [0] 

2). receiver侧配置 & 启动

  1. receiver.conf
[root@dn5 mongoshake]# cat receiver.conf 
# global log level. lower level message
# will be filter
log.level = info
# log directory. log and pid file will be stored into this file.
# if not set, default is "./logs/"
# log和pid文件的目录,如果不设置默认打到当前路径的logs目录。
log.dir =
# log file name.
log.file = receiver.log
# log flush enable. If set false, logs may not be print when exit. If
# set true, performance will be decreased extremely
# 设置log刷新,false表示包含缓存,如果true那么每条log都会直接刷屏,但对性能有影响;
# 反之,退出不一定能打印所有的log,调试时建议配置true。
log.flush = false

# profiling on net/http/profile
system_profile_port = 9500

# tunnel pipeline type. now we support rpc,tcp,file,mock,kafka
#tunnel = rpc
tunnel = kafka
# tunnel target resource url
# for rpc. this is receiver socket address
# for tcp. this is receiver socket address
# for file. this is the file path, for instance "data"
# for mock. this is useless. mongoshake will generate random data including "i", "d", "u", "n"
# for kafka. this is the topic and brokers address which split by comma, for
# instance: topic@brokers1,brokers2, default topic is "mongoshake"
#tunnel.address = 127.0.0.1:30033
tunnel.address = mongo_shake_test@dn3.test.com:9092,dn4.test.com:9092,dn5.test.com:9092

# replayer worker concurrency. must equal to the collector worker number
replayer = 8
  1. receiver侧启动
[root@dn5 mongoshake]# ./receiver.linux -conf=receiver.conf -verbose

[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] ExampleReplayer start. pending queue capacity 256
[2022/04/20 22:30:18 CST] [INFO] receiver is starting...

3). Kafka端监控数据变化

[root@dn3 ~]# kafka-console-consumer --zookeeper dn3.test.com:2181 --topic mongo_shake_test --from-beginning

...

{"ts":7088984680443150338,"h":536529010439892526,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"626122023df47cb4576149bb"},{"Name":"name","Value":"David"}],"o2":null}
{"ts":7088991015519911938,"h":7122990587426851361,"v":2,"op":"i","ns":"test.runoob","o":[{"Name":"_id","Value":"626127c52a312206504466ec"},{"Name":"name","Value":"Andy"}],"o2":null}

{"ts":7088984680443150338,"h":536529010439892526,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"626122023df47cb4576149bb"},{"Name":"name","Value":"David"}],"o2":null}
{"ts":7088991015519911938,"h":7122990587426851361,"v":2,"op":"i","ns":"test.runoob","o":[{"Name":"_id","Value":"626127c52a312206504466ec"},{"Name":"name","Value":"Andy"}],"o2":null}
{"ts":7089039059024084993,"h":6914250978284002788,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"626153772a312206504466ed"},{"Name":"name","Value":"Echo"}],"o2":null}
{"ts":7089039767693688833,"h":9017971889662735601,"v":2,"op":"i","ns":"runoob.runoob","o":[{"Name":"_id","Value":"6261541c2a312206504466ee"},{"Name":"name","Value":"John"}],"o2":null}
{"ts":7089040119881007106,"h":4077432025564863895,"v":2,"op":"i","ns":"runoob.test","o":[{"Name":"_id","Value":"6261546e2a312206504466ef"},{"Name":"name","Value":"lily"},{"Name":"age","Value":"20"}],"o2":null}

TroubleShooting

Q: How to solve the "Oplog Tailer initialize failed" error?
A: If the error is about syncer error, please check whether source database can be connected by mongo command. If the error is about worker error, please check your tunnel configuration.

Q: How to solve the "Oplog Tailer initialize failed: no reachable servers" error?
A: First, you should check your MongoDB is reachable. If you only configure single node in your mongo_urls, this error also happens. We highly recommand to configure whole MongoDB address that includes primary, secondary and hidden no matter replicaSet or Sharding in your mongo_urls, but if you insist on doing that, please set mongo_connect_mode = standalone which has been added since v2.0.6.

Q: How to solve the "error type[*mgo.QueryError] error[no such cmd: applyOps]" error?
A: applyOps in DDL is not supported for sharding.

Q: How to solve the "Oplog Tailer initialize failed: no oplog ns in mongo" error?
A: This is usually a problem with insufficient account permissions, so, please check your permission of oplog table. If the source is sharding, the account should be added into each shard because there is no local database in mongos. When source is sharding, the mongo_urls should be the shards address split by semicolon(;) like: mongo_urls: mongodb://user1:passwd1@10.1.1.1:20011,10.1.1.2:20112;mongodb://user2:passwd2@10.1.2.1:20011,10.1.2.2:20112. Since v2.0.6, MongoShake doesn't throw this error when sync mode is full sync(sync_mode = document).

参考列表:

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐