您现在的位置是:网站首页> 编程资料编程资料
springboot整合mongodb changestream的示例代码_MongoDB_
2023-05-27
406人已围观
简介 springboot整合mongodb changestream的示例代码_MongoDB_
前言
changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更
想必对mysql主从复制原理比较熟悉的同学应该知道,其根本就是从节点通过监听binlog日志,然后解析binlog日志数据达到数据同步的目的,于是,基于mysql主从复制原理,阿里开源了canal这样的数据同步中间件工具
Change Stream 介绍
Chang Stream(变更记录流) 是指collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。
关于changestream做如下说明,提供参考
- 在该特性出现之前,开发者可通过拉取 oplog达到同样的目的;
- 但 oplog 的处理及解析相对复杂,而且存在被回滚的风险,如果使用不当的话还会带来性能问题;
- Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换;
- 由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群
changestream可用于监听的mongodb目标类型
- 单个集合,除系统库(admin/local/config)之外的集合,3.6版本支持
- 单个数据库,除系统库(admin/local/config)之外的数据库集合,4.0版本支持
- 整个集群,整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持
一个Change Stream Event的基本结构如下所示:
{ _id : { }, "operationType" : "", "fullDocument" : { }, "ns" : { "db" : "", "coll" : " }, "updateDescription" : { "updatedFields" : { }, "removedFields" : [ "", ... ] } "clusterTime" : , "txnNumber" : , "lsid" : { "id" : , "uid" : } } 关于上面的数据结构,做简单的解释说明,
- _id,变更事件的Token对象
- operationType,变更类型(见下面介绍)
- fullDocument,文档内容
- ns,监听的目标
- ns.db,变更的数据库
- ns.coll,变更的集合
- documentKey,变更文档的键值,含_id字段
- updateDescription,变更描述
- updateDescription.updatedFields,变更中更新字段
- updateDescription.removedFields,变更中删除字段
- clusterTime,对应oplog的时间戳
- txnNumber,事务编号,仅在多文档事务中出现,4.0版本支持
- lsid,事务关联的会话编号,仅在多文档事务中出现,4.0版本支持
Change Steram支持的变更类型,对于上面的operationType 这个参数,主要包括有以下几个:
- insert,插入文档
- delete,删除文档
- replace,替换文档,当执行replace操作指定upsert时,可能是insert事件
- update,更新文档,当执行update操作指定upsert时,可能是insert事件
- invalidate,失效事件,比如执行了collection.drop或collection.rename
以上的几种类型,可以简单理解为,监听的mongo用户操作的事件类型,比如新增数据,删除数据,修改数据等
以上为changestream的必备理论知识,想要深入学习的话无比要了解,下面通过实操来展示下changestream的使用
环境准备
mongdb复制集群,本例的复制集群对应的mongodb版本为 4.0.X

登录primary节点,创建一个数据库

友情提醒:数据库需要提前创建
1、启动两个Mongo shell,一个操作数据库,一个watch
在其中一个窗口执行如下命令,开启监听
cursor = db.comment.watch()

2、在另一个窗口下,给上面的articledb插入一条数据

数据写入成功后,在第一个窗口下,执行下面的命令:
cursor.next()

说明已经成功监听到新增的数据,修改、删除事件可以做类似的操作即可
以上先通过shell窗口展示了一下changestream的使用效果,接下来,将通过程序演示下如何在客户端集成并使用changestream
Java客户端操作changestream
1、引入maven依赖
org.mongodb mongo-java-driver 3.12.2 com.alibaba fastjson 1.2.75
2、测试类核心代码
import com.mongodb.*; import com.mongodb.client.MongoDatabase; import org.bson.conversions.Bson; import java.util.List; import static java.util.Collections.singletonList; import com.alibaba.fastjson.JSONObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.ChangeStreamDocument; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.util.Arrays.asList; public class MongoTest { private static Logger logger = LoggerFactory.getLogger(MongoTest.class); public static void main(String[] args) { showmogodbdata(); } private static void showmogodbdata() { String sURI = "mongodb://IP:27017"; MongoClient mongoClient = new MongoClient(new MongoClientURI(sURI)); MongoDatabase database = mongoClient.getDatabase("articledb"); MongoCollection collec = database.getCollection("comment"); List pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.articleid': '100007'}"), Filters.in("operationType", asList("insert", "update", "delete"))))); MongoCursor> cursor = collec.watch(pipeline).iterator(); while (cursor.hasNext()) { ChangeStreamDocument next = cursor.next(); logger.info("输出mogodb的next的对应的值" + next.toString()); String Operation = next.getOperationType().getValue(); String tableNames = next.getNamespace().getCollectionName(); System.out.println(tableNames); //获取主键id的值 String pk_id = next.getDocumentKey().toString(); //同步修改数据的操作 if (next.getUpdateDescription() != null) { JSONObject jsonObject = JSONObject.parseObject(next.getUpdateDescription().getUpdatedFields().toJson()); System.out.println(jsonObject); } //同步插入数据的操作 if (next.getFullDocument() != null) { JSONObject jsonObject = JSONObject.parseObject(next.getFullDocument().toJson()); //同步删除数据的操作 if (next.getUpdateDescription() == null && Operation.matches("delete")) { JSONObject jsonObject = JSONObject.parseObject(pk_id); } } 这段程序主要分为几个核心部分,做如下解释说明,
- 连接mogodb服务端及相关配置
- 通过pipline开启watch监听
- 监听到特定数据库下集合的数据变化,然后打印出变化的数据
启动这段程序,观察控制台日志数据

在未对articledb数据库下的comment集合做任何操作之前,由于watch为检测到任何数据变化,所以无法进入到while循环中,接下来,从shell端给comment集合新增一条数据,然后再次观察控制台数据变化

可以看到,控制台很快就检测到变化的数据

以下为完整的日志数据
{ operationType=OperationType{value='insert'}, resumeToken={"_data": "8262138891000000022B022C0100296E5A1004B9065629412942F8852D592B9FD441B946645F696400646213889158B116A29C3FD1140004"}, namespace=articledb.comment, destinationNamespace=null, fullDocument=Document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, userid=1010, nickname=marry}}, documentKey={"_id": {"$oid": "6213889158b116a29c3fd114"}}, clusterTime=Timestamp{value=7067142396626075650, seconds=1645447313, inc=2}, updateDescription=null, txnNumber=null, lsid=null}
至于在业务中的具体使用,可以结合自身的情况,举例来说,应用程序只想监听修改数据的事件,那么就可以在修改数据事件的监听逻辑中,解析变化后的数据做后续的操作
springboot整合changestream
在实际开发中,更通用的场景是整合到springboot工程中使用,有过一定的开发经验的同学应该很容易想到核心的逻辑长什么样了,和canal的客户端操作类似,需要在一个配置类去监听即可
下面来看看具体的整合步骤
1、引入核心依赖
org.springframework.boot spring-boot-starter-web spring-boot-starter-test test spring-boot-starter
2、核心配置文件
本例演示的是基于上文搭建的mongodb复制集群
server.port=8081 #mongodb配置 spring.data.mongodb.uri=mongodb://IP:27017,IP:27018,IP:27019/articledb?maxPoolSize=512
3、编写实体类,映射comment集合中的字段
import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @Document(collection="comment") public class Comment { @Id private String articleid; private String content; private String userid; private String nickname; private Date createdatetime; public String getArticleid() { return articleid; } public void setArticleid(String articleid) { this.articleid = articleid; public String getContent() { return content;
相关内容
- springboot整合mongodb changestream的示例代码_MongoDB_
- 关于mongoDB数据库添加账号的问题_MongoDB_
- 关于mongoDB数据库添加账号的问题_MongoDB_
- mongodb linux下集群搭建过程_MongoDB_
- mongodb linux下集群搭建过程_MongoDB_
- 聊聊MongoDB 带访问控制的副本集部署问题_MongoDB_
- 教你使用mongoose实现多集合关联查询_MongoDB_
- MongoDB高效读写海量数据的方法_MongoDB_
- 使用mongoshake实现mongodb数据同步的操作方法_MongoDB_
- SpringBoot系列之MongoDB Aggregations用法详解_MongoDB_
点击排行
本栏推荐
