集計フレームワーク
集計パイプラインは、データ処理パイプラインの概念をモデル化したデータ集計のフレームワークです。
集計の詳細については、サーバー マニュアルの集計パイプラインを参照してください。
前提条件
このガイドのコード例を実行するには、次のコンポーネントを設定する必要があります。
test.restaurants
ドキュメントrestaurants.json
アセットGithubの ファイルのドキュメントが入力された コレクション。次のインポート ステートメントは次のとおりです。
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
重要
このガイドでは、カスタムSubscriber
実装を使用します。これについては、 カスタム サブスクリプション実装ガイドで説明されています。
MongoDB 配置への接続
まず、MongoDB 配置に接続し、 インスタンスとMongoDatabase
MongoCollection
インスタンスを 宣言して定義します。
次のコードは、ポート27017
のlocalhost
で実行されているスタンドアロンの MongoDB 配置に接続します。 次に、 test
データベースを参照するためのdatabase
変数と、 restaurants
コレクションを参照するためのcollection
変数を定義します。
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
MongoDB 配置への接続の詳細については、「 MongoDB への接続」チュートリアルを参照してください。
集計の実行
集計を実行するには、集計ステージのリストをMongoCollection.aggregate()
メソッドに渡します。 このドライバーは、集計ステージのビルダを含むAggregates
ヘルパー クラスを提供します。
この例では、集計パイプラインは次のタスクを実行しています。
$match
ステージを使用して、categories
配列フィールドに要素"Bakery"
を含むドキュメントをフィルタリングします。 この例では、Aggregates.match()
を使用して$match
ステージを構築しています。
$group
ステージを使用して、一致するドキュメントをstars
フィールドでグループ化し、stars
の個別の値ごとにドキュメントの数を累積します。 この例では、Aggregates.group()
を使用して$group
ステージを構築し、Accumulators.sum()
を使用してアキュムレータ式を構築します。$group
ステージ内で使用するアキュムレータ式の場合、ドライバーはAccumulators
ヘルパー クラスを提供します。
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
集計式の使用
$group
アキュムレータ式の場合、ドライバーはAccumulators
ヘルパー クラスを提供します。 その他の集計式は、 Document
クラスを使用して式を手動で構築します。
次の例では、集計パイプラインは$project
ステージを使用して、 name
フィールドと、値がcategories
配列の最初の要素である計算フィールドfirstCategory
のみを返します。 この例では、 Aggregates.project()
とさまざまなProjections
クラスのメソッドを使用して$project
ステージを構築します。
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
集計の説明
集計パイプラインを$explain
するには、 AggregatePublisher.explain()
メソッドを呼び出します。
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());
Atlas Search
次のいずれかのパイプラインステージを含む集計パイプラインを作成して実行中ことで、Atlas Search クエリを実行できます。
$search
$searchMeta
Java Reactive Streams ドライバーは、Atlas Search クエリを実行するための Aggregates.search() メソッドと Aggregates.searchMeta() メソッドを提供します。
Atlas Searchパイプラインステージの詳細については、Atlas ドキュメントの「 集計パイプライン ステージの選択 」を参照してください。
パイプライン検索ステージの作成
検索演算子を使用して、Atlas Searchパイプラインステージで検索条件を作成できます。
Java Reactive Streams ドライバーは、次の演算子のヘルパーメソッドを提供します。
演算子 | 説明 |
---|---|
不完全な入力 string からの文字シーケンスを含む単語またはフレーズを検索します。 | |
2 つ以上の演算子を 1 つのクエリに結合します。 | |
フィールドが指定した値と一致するかどうかを確認します。 | |
指定されたインデックス付きフィールド名へのパスがドキュメント内に存在するかどうかをテストします。 | |
指定されたパスにあるBSON番号、日付、ブール値、objectId、uuid、または string 値の配列を検索し、フィールドの値が指定された配列内の任意の値と等しいドキュメントを返します。 | |
入力ドキュメントに類似するドキュメントを返します。 | |
数値、日付、 GeoJSONポイント値のクエリとスコアリングをサポートします。 | |
インデックス構成で指定されたアナライザを使用して、順序付けられたタームのシーケンスを含むドキュメントを検索します。 | |
インデックス付きフィールドと値の組み合わせのクエリをサポートします。 | |
数値、日付、string 値のクエリとスコアリングをサポートします。 | |
クエリフィールドを 正規式として解釈します。 | |
インデックス構成で指定したアナライザを使用して全文検索を実行します。 | |
検索stringに任意の文字と一致する特殊文字を使用するクエリを有効にします。 |
パイプライン検索ステージの例
注意
Atlas サンプル データセット
この例では、Atlasサンプルデータセットの sample_mflix.movies
コレクションを使用します。無料階層の Atlas クラスターをセットアップし、サンプルデータセットをロードする方法については、Atlas ドキュメントの「 Atlas を使い始める 」チュートリアルを参照してください。
この例を実行する前に、次の定義を持つ Atlas Searchインデックスを movies
コレクションに作成する必要があります。
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
Atlas Search インデックスの作成の詳細については、 インデックスガイドの「Atlas Search インデックス マネジメント」セクションを参照してください。
次のコードでは、次の仕様を持つ $search
ステージが作成されます。
genres
配列に"Comedy"
が含まれていることを確認fullplot
フィールドでフレーズ"new york"
を検索します1950
から2000
までのyear
値と一致します"Love"
というタームで始まるtitle
値を検索します
Bson searchStageFilters = Aggregates.search( SearchOperator.compound() .filter( List.of( SearchOperator.in(fieldPath("genres"), List.of("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ))); Bson projection = Aggregates.project(Projections.fields( Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
Atlas Searchヘルパーメソッドの詳細については、ドライバー コアAPIドキュメントの SearchOperator インターフェース参照を参照してください。
詳細情報
式演算子の完全なリストを表示するには、 マニュアルの「 集計演算子MongoDB Server 」を参照してください。
集計パイプラインの組み立てと例については、 マニュアルの「 集計パイプライン MongoDB Server」を参照してください。
パイプライン ステージの作成の詳細については、 マニュアルの「 集計ステージMongoDB Server 」を参照してください。
MongoDB操作の説明の詳細については、 マニュアルの「 出力 と クエリプラン の説明MongoDB Server 」を参照してください。
API ドキュメント
このガイドで言及されているクラスとメソッドについて詳しくは、次の API ドキュメントを参照してください。