Docs Menu
Docs Home
/ / /
Java Reactive Streams ドライバー

集計フレームワーク

集計パイプラインは、データ処理パイプラインの概念をモデル化したデータ集計のフレームワークです。

集計の詳細については、サーバー マニュアルの集計パイプラインを参照してください。

このガイドのコード例を実行するには、次のコンポーネントを設定する必要があります。

  • test.restaurantsドキュメントrestaurants.json アセットGithubの ファイルのドキュメントが入力された コレクション。

  • 次のインポート ステートメントは次のとおりです。

import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Filters;
import org.bson.Document;

重要

このガイドでは、カスタムSubscriber実装を使用します。これについては、 カスタム サブスクリプション実装ガイドで説明されています。

まず、MongoDB 配置に接続し、 インスタンスとMongoDatabase MongoCollectionインスタンスを 宣言して定義します。

次のコードは、ポート27017localhostで実行されているスタンドアロンの MongoDB 配置に接続します。 次に、 testデータベースを参照するためのdatabase変数と、 restaurantsコレクションを参照するためのcollection変数を定義します。

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");

MongoDB 配置への接続の詳細については、「 MongoDB への接続」チュートリアルを参照してください。

集計を実行するには、集計ステージのリストをMongoCollection.aggregate()メソッドに渡します。 このドライバーは、集計ステージのビルダを含むAggregatesヘルパー クラスを提供します。

この例では、集計パイプラインは次のタスクを実行しています。

  • $matchステージを使用して、 categories配列フィールドに要素"Bakery"を含むドキュメントをフィルタリングします。 この例では、 Aggregates.match()を使用して$matchステージを構築しています。

  • $groupステージを使用して、一致するドキュメントをstarsフィールドでグループ化し、 starsの個別の値ごとにドキュメントの数を累積します。 この例では、 Aggregates.group()を使用して$groupステージを構築し、 Accumulators.sum()を使用してアキュムレータ式を構築します。 $groupステージ内で使用するアキュムレータ式の場合、ドライバーはAccumulatorsヘルパー クラスを提供します。

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
)
).subscribe(new PrintDocumentSubscriber());

$groupアキュムレータ式の場合、ドライバーはAccumulatorsヘルパー クラスを提供します。 その他の集計式は、 Documentクラスを使用して式を手動で構築します。

次の例では、集計パイプラインは$projectステージを使用して、 nameフィールドと、値がcategories配列の最初の要素である計算フィールドfirstCategoryのみを返します。 この例では、 Aggregates.project()とさまざまなProjectionsクラスのメソッドを使用して$projectステージを構築します。

collection.aggregate(
Arrays.asList(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
new Document("$arrayElemAt", Arrays.asList("$categories", 0))
)
)
)
)
).subscribe(new PrintDocumentSubscriber());

集計パイプラインを$explainするには、 AggregatePublisher.explain()メソッドを呼び出します。

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))))
.explain()
.subscribe(new PrintDocumentSubscriber());

次のいずれかのパイプラインステージを含む集計パイプラインを作成して実行中ことで、Atlas Search クエリを実行できます。

  • $search

  • $searchMeta

Java Reactive Streams ドライバーは、Atlas Search クエリを実行するための Aggregates.search() メソッドと Aggregates.searchMeta() メソッドを提供します。

Atlas Searchパイプラインステージの詳細については、Atlas ドキュメントの「 集計パイプライン ステージの選択 」を参照してください。

検索演算子を使用して、Atlas Searchパイプラインステージで検索条件を作成できます。

Java Reactive Streams ドライバーは、次の演算子のヘルパーメソッドを提供します。

演算子
説明

不完全な入力 string からの文字シーケンスを含む単語またはフレーズを検索します。

2 つ以上の演算子を 1 つのクエリに結合します。

フィールドが指定した値と一致するかどうかを確認します。 equals() メソッドと equalsNull() メソッドにマッピングする

指定されたインデックス付きフィールド名へのパスがドキュメント内に存在するかどうかをテストします。

指定されたパスにあるBSON番号、日付、ブール値、objectId、uuid、または string 値の配列を検索し、フィールドの値が指定された配列内の任意の値と等しいドキュメントを返します。

入力ドキュメントに類似するドキュメントを返します。

数値、日付、 GeoJSONポイント値のクエリとスコアリングをサポートします。

インデックス構成で指定されたアナライザを使用して、順序付けられたタームのシーケンスを含むドキュメントを検索します。

インデックス付きフィールドと値の組み合わせのクエリをサポートします。

数値、日付、string 値のクエリとスコアリングをサポートします。 numberRange() メソッドと dateRange() メソッドにマッピングする

クエリフィールドを 正規式として解釈します。

インデックス構成で指定したアナライザを使用して全文検索を実行します。

検索stringに任意の文字と一致する特殊文字を使用するクエリを有効にします。

注意

Atlas サンプル データセット

この例では、Atlasサンプルデータセットの sample_mflix.moviesコレクションを使用します。無料階層の Atlas クラスターをセットアップし、サンプルデータセットをロードする方法については、Atlas ドキュメントの「 Atlas を使い始める 」チュートリアルを参照してください。

この例を実行する前に、次の定義を持つ Atlas Searchインデックスを moviesコレクションに作成する必要があります。

{
"mappings": {
"dynamic": true,
"fields": {
"title": {
"analyzer": "lucene.keyword",
"type": "string"
},
"genres": {
"normalizer": "lowercase",
"type": "token"
}
}
}
}

Atlas Search インデックスの作成の詳細については、 インデックスガイドの「Atlas Search インデックス マネジメント」セクションを参照してください。

次のコードでは、次の仕様を持つ $search ステージが作成されます。

  • genres 配列に "Comedy" が含まれていることを確認

  • fullplotフィールドでフレーズ "new york" を検索します

  • 1950 から 2000 までの year 値と一致します

  • "Love" というタームで始まる title 値を検索します

Bson searchStageFilters = Aggregates.search(
SearchOperator.compound()
.filter(
List.of(
SearchOperator.in(fieldPath("genres"), List.of("Comedy")),
SearchOperator.phrase(fieldPath("fullplot"), "new york"),
SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000),
SearchOperator.wildcard(fieldPath("title"), "Love *")
)));
Bson projection = Aggregates.project(Projections.fields(
Projections.include("title", "year", "genres")
));
List<Bson> aggregateStages = List.of(searchStageFilters, projection);
Publisher<Document> publisher = movies.aggregate(aggregateStages);
publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber());
Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979}
{"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}

Atlas Searchヘルパーメソッドの詳細については、ドライバー コアAPIドキュメントの SearchOperator インターフェース参照を参照してください。

式演算子の完全なリストを表示するには、 マニュアルの「 集計演算子MongoDB Server 」を参照してください。

集計パイプラインの組み立てと例については、 マニュアルの「 集計パイプライン MongoDB Server」を参照してください。

パイプライン ステージの作成の詳細については、 マニュアルの「 集計ステージMongoDB Server 」を参照してください。

MongoDB操作の説明の詳細については、 マニュアルの「 出力 クエリプラン の説明MongoDB Server 」を参照してください。

このガイドで言及されているクラスとメソッドについて詳しくは、次の API ドキュメントを参照してください。

戻る

Indexes

項目一覧

  • 前提条件
  • MongoDB 配置への接続
  • 集計の実行
  • 集計式の使用
  • 集計の説明
  • Atlas Search
  • パイプライン検索ステージの作成
  • 詳細情報
  • API ドキュメント