MongoDB的ChangeStream为用户提供了非常便利的获取变化数据接口,在这里为大家提供一种使用思路和实现方式。
前置条件
-
数据库必须是复制集群模式,可以是单节点,但不能是standalone
官方提供了复制集群模式的配置方式:
https://docs.mongodb.com/manual/administration/replica-set-deployment/如果是已经安装好的standalone模式,也可以转换:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/ -
权限说明
需要有相应database的find、changeStream权限
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
用到的方法
- watch()
通过此方法可以获取Change Event数据,从而获得变化数据;该方法可以监听MongoClient、MongoDatabase、MongoCollection不同层面的数据变化。 - ChangeStreamIterable.fullDocument(FullDocument fullDocument)
通过此方法可以设置Update、Replace操作的Change Event中是否包含fullDocument。 - ChangeStreamDocument.getFullDocument()
通过此方法可以获取Insert、Update、Replace操作后的Document全部属性值,对于Update、Replace操作需要配合ChangeStreamIterable.fullDocument(FullDocument fullDocument)方法使用。 - ChangeStreamDocument.getUpdateDescription()
通过此方法获取Update、Replace的变化属性及删除属性。 - ChangeStreamIterable.startAtOperationTime()
用于设置获取从哪个时间点开始的变化数据,还有其他几个类似方法,具体请看官方说明(一手资料优于二传手资料)。
实现代码
long lastOperationTime = 0; //设置获取从哪个时间点开始的变化数据
MongoClient mongoClient = MongoClients.create("mongodb://admin:123456@127.0.0.1:27017/?maxPoolSize=10&w=majority");
MongoCursor<ChangeStreamDocument<Document>> cursor = mongoClient.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.startAtOperationTime(new BsonTimestamp(lastOperationTime))
.iterator();
while(cursor.hasNext()){
ChangeStreamDocument result = cursor.next();
result.getNamespace(); //包含database及collection名
result.getOperationType(); //数据变化类型:INSERT、UPDATE、REPLACE、DELETE
result.getFullDocument(); //Insert、Update、Replace操作后的Document全部属性值
result.getUpdateDescription().getUpdatedFields(); //Update、Replace操作变化的属性
result.getUpdateDescription().getRemovedFields(); //Update、Replace操作删除的字段
}
注意点
- admin/local/config库不能采集;
- 采集Update、Replace、Delete数据不会得到变化前的数据,这点与一些数据库不同;
- url中需要设置w=majority,此参数在部分版本中必须设置,详情可参考官方说明。