聚合框架
聚合管道是一个数据聚合框架,以数据处理管道的概念为模型。
要学习;了解有关聚合的更多信息,请参阅服务器手册中的聚合管道。
先决条件
您必须设置以下组件才能运行本指南中的代码示例:
一个
test.restaurants
restaurants.json
集合,其中填充了来自文档资产Github中 文件的文档。以下 import 语句:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
重要
本指南使用自定义Subscriber
实现,如自定义订阅者实现示例指南中所述。
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义MongoDatabase
和MongoCollection
实例。
以下代码连接到在端口27017
上的localhost
上运行的独立 MongoDB 部署。 然后,定义database
变量以引用test
数据库,并collection
变量以引用restaurants
集合:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB教程。
执行聚合
要执行聚合,请将聚合阶段列表传递给MongoCollection.aggregate()
方法。 驾驶员提供了Aggregates
辅助类,其中包含聚合阶段的构建者器。
在此示例中,聚合管道执行以下任务:
使用
$match
阶段筛选categories
数组字段包含元素"Bakery"
的文档。 该示例使用Aggregates.match()
构建$match
阶段。
使用
$group
阶段按stars
字段对匹配文档进行群组,并累积每个不同的stars
值的文档计数。 该示例使用Aggregates.group()
构建$group
阶段,并使用Accumulators.sum()
构建累加器表达式。 对于在$group
阶段使用的累加器表达式,驾驶员提供了Accumulators
辅助类。
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
使用聚合表达式
对于$group
累加器表达式,驾驶员提供了Accumulators
辅助类。 对于其他聚合表达式,请使用Document
类手动构建表达式。
在以下示例中,聚合管道使用$project
阶段仅返回name
字段和计算字段firstCategory
,其值是categories
数组中的第一个元素。 该示例使用Aggregates.project()
和各种Projections
类方法构建$project
阶段:
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
解释聚合
要$explain
聚合管道,请调用AggregatePublisher.explain()
方法:
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());
Atlas Search
您可以通过创建并运行包含以下管道阶段之一的聚合管道来执行Atlas Search查询:
$search
$searchMeta
Java Reactive Streams驾驶员提供聚合。 搜索() 和 Aggregates.searchMeta() 方法执行Atlas Search查询。
要学习;了解有关Atlas Search管道阶段的更多信息,请参阅Atlas文档中的选择聚合管道阶段。
创建管道搜索阶段
您可以使用 Search 操作符在Atlas Search管道阶段中创建搜索条件。
Java Reactive Streams驾驶员为以下操作符提供了辅助方法:
Operator | 说明 |
---|---|
从不完整输入字符串中搜索包含字符序列的单词或短语。 | |
将两个或多个操作符组合到一个查询中。 | |
检查字段是否与您指定的值匹配。映射到 | |
测试指定索引字段名称的路径在文档中是否存在。 | |
在给定路径搜索由BSON数字、日期、布尔值、objectId、uuid 或字符串值组成的大量,并返回该字段的值等于指定大量中任意值的文档。 | |
返回与输入文档类似的文档。 | |
支持对数字、日期和GeoJSON point值进行查询和评分。 | |
使用索引配置中指定的分析器搜索包含有序术语序列的文档。 | |
支持查询索引字段和值的组合。 | |
支持对数字、日期和字符串值进行查询和评分。映射到 | |
将查询字段解释为正则表达式。 | |
使用在索引配置中指定的分析器执行全文搜索。 | |
启用在搜索字符串中使用可匹配任何字符的特殊字符的查询。 |
管道搜索阶段示例
注意
Atlas样本数据集
此示例使用Atlas示例数据集中的 sample_mflix.movies
集合。要学习;了解如何设立免费套餐的Atlas 集群并加载示例数据集,请参阅Atlas文档中的Atlas入门教程。
在运行此示例之前,您必须在 movies
集合上创建一个具有以下定义的Atlas Search索引:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
要学习;了解有关创建Atlas Search索引的更多信息,请参阅 索引指南的Atlas Search索引管理部分。
以下代码创建具有以下规范的 $search
阶段:
检查
genres
大量是否包含"Comedy"
在
fullplot
字段中搜索字段"new york"
匹配介于
1950
和2000
(含)之间的year
值搜索以术语
"Love"
开头的title
值
Bson searchStageFilters = Aggregates.search( SearchOperator.compound() .filter( List.of( SearchOperator.in(fieldPath("genres"), List.of("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ))); Bson projection = Aggregates.project(Projections.fields( Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
要学习;了解有关Atlas Search助手方法的更多信息,请参阅驱动程序核心API文档中的SearchOperator接口参考。
更多信息
要查看表达式操作符的完整列表,请参阅MongoDB Server手册中的聚合操作符。
要学习;了解如何组装聚合管道并查看示例,请参阅MongoDB Server手册中的聚合管道。
要学习;了解有关创建管道阶段的更多信息,请参阅MongoDB Server手册中的聚合阶段。
要学习;了解有关解释MongoDB操作的更多信息,请参阅MongoDB Server手册中的解释输出和查询计划。
API 文档
要学习;了解有关本指南中提到的类和方法的更多信息,请参阅以下API文档: