Menu Docs
Página inicial do Docs
/ / /
Driver de fluxos reativos do Java

Framework de aggregation

O pipeline de agregação é uma estrutura para agregação de dados, modelada sobre o conceito de pipelines de processamento de dados.

Para saber mais sobre agregação, consulte Aggregation Pipeline no manual do servidor.

Você deve configurar os seguintes componentes para executar os exemplos de código neste guia:

  • Uma test.restaurants coleção preenchida com documentos do restaurants.json arquivo nos ativos de documentação do Github.

  • As seguintes declarações de importação:

import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Filters;
import org.bson.Document;

Importante

Este guia usa implementações personalizadas de Subscriber , que são descritas no guia Amostra de implementações personalizadas de assinantes .

Primeiro, conecte a um MongoDB deployment e, em seguida, declare e defina as instâncias MongoDatabase e MongoCollection .

O código a seguir se conecta a uma MongoDB deployment standalone em execução em localhost na porta 27017. Em seguida, define a variável database para fazer referência ao banco de dados test e a variável collection para fazer referência à coleção restaurants :

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");

Para saber mais sobre como se conectar a sistemas do MongoDB, consulte o tutorial Conectar ao MongoDB .

Para executar a agregação, passe uma lista de estágios de agregação para o método MongoCollection.aggregate() . O driver fornece a classe assistente Aggregates que contém construtores para estágios de agregação .

Neste exemplo, o aggregation pipeline executa as seguintes tarefas:

  • Usa um estágio $match para filtrar documentos nos quais o campo de array categories contém o elemento "Bakery". O exemplo utiliza Aggregates.match() para construir o estágio $match .

  • Utiliza um estágio $group para agrupar os documentos correspondentes pelo campo stars , acumulando uma contagem de documentos para cada valor distinto de stars. O exemplo utiliza Aggregates.group() para construir o estágio $group e Accumulators.sum() para construir a expressão acumulador . Para as expressões acumulador para uso no estágio $group , o driver fornece Accumulators classe assistente .

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
)
).subscribe(new PrintDocumentSubscriber());

Para expressões de acumulador do $group , o driver fornece a classe assistente do Accumulators . Para outras expressões de agregação , construa manualmente a expressão utilizando a classe Document .

No exemplo a seguir, o aggregation pipeline usa um estágio $project para retornar somente o campo name e o campo calculado firstCategory cujo valor é o primeiro elemento na array categories . O exemplo utiliza Aggregates.project() e vários métodos de classe Projections para construir o estágio $project :

collection.aggregate(
Arrays.asList(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
new Document("$arrayElemAt", Arrays.asList("$categories", 0))
)
)
)
)
).subscribe(new PrintDocumentSubscriber());

Para $explain um pipeline de agregação , chame o método AggregatePublisher.explain() :

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))))
.explain()
.subscribe(new PrintDocumentSubscriber());

Você pode executar uma query do Atlas Search criando e executando um pipeline de agregação que contenha um dos seguintes estágios de pipeline:

  • $search

  • $searchMeta

O driver Java Reactive Streams fornece os métodos Aggregates.search() e Aggregates.searchMeta() para executar queries no Atlas Search.

Para saber mais sobre os estágios do pipeline do Atlas Search, consulte Escolher o estágio do pipeline de agregação na documentação do Atlas.

Você pode criar os critérios de pesquisa em seu estágio de pipeline do Atlas Search usando Operadores de pesquisa.

O driver Java Reactive Streams fornece métodos de assistente para os seguintes operadores:

Operador
Descrição

Executa uma pesquisa por uma palavra ou frase que contém uma sequência de caracteres de uma string de entrada incompleta.

Combina dois ou mais operadores em uma única query.

Verifica se um campo corresponde a um valor que você especificou. Mapeia para os métodos equals() e equalsNull()

Testa se existe um caminho para um nome de campo indexado especificado em um documento.

Executa uma pesquisa por uma array de valores de número BSON, data, boolean, objectId, uuid ou string no caminho fornecido e retorna documentos em que o valor do campo é igual a qualquer valor na array especificada.

Retorna documentos semelhantes aos documentos de entrada.

Suporta a consulta e pontuação de valores numéricos, de data e de ponto GeoJSON .

Executa uma pesquisa de documentos contendo uma sequência ordenada de termos usando o analisador especificado na configuração do índice.

Suporta a realização de query de uma combinação de campos e valores indexados.

Suporta a consulta e pontuação de valores numéricos, de data e de cadeia de caracteres. Mapeia para os métodos numberRange() e dateRange()

Interpreta o campo de query como uma expressão regular.

Executa uma pesquisa de texto completo usando o analisador especificado na configuração do índice.

Habilita queries que usam caracteres especiais na string de pesquisa que podem corresponder a qualquer caractere.

Observação

Conjunto de Dados de Amostra do Atlas

Este exemplo utiliza a collection sample_mflix.movies a partir do conjunto de dados de amostra do Atlas. Para saber como configurar um Atlas cluster de camada grátis e carregar o conjunto de dados de amostra, consulte o tutorial Introdução ao Atlas na documentação do Atlas.

Antes de executar este exemplo, você deve criar um índice do Atlas Search na coleção movies que tenha a seguinte definição:

{
"mappings": {
"dynamic": true,
"fields": {
"title": {
"analyzer": "lucene.keyword",
"type": "string"
},
"genres": {
"normalizer": "lowercase",
"type": "token"
}
}
}
}

Para saber mais sobre como criar índices de Atlas Search, consulte a seção Gerenciamento de índices de Atlas Search do guia Índices.

O seguinte código cria um estágio $search que tem as seguintes especificações:

  • Verifica se a array genres inclui "Comedy"

  • Pesquisa no campo fullplot a frase "new york"

  • Corresponde a year valores entre 1950 e 2000, inclusive

  • Pesquisa title valores que começam com o termo "Love"

Bson searchStageFilters = Aggregates.search(
SearchOperator.compound()
.filter(
List.of(
SearchOperator.in(fieldPath("genres"), List.of("Comedy")),
SearchOperator.phrase(fieldPath("fullplot"), "new york"),
SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000),
SearchOperator.wildcard(fieldPath("title"), "Love *")
)));
Bson projection = Aggregates.project(Projections.fields(
Projections.include("title", "year", "genres")
));
List<Bson> aggregateStages = List.of(searchStageFilters, projection);
Publisher<Document> publisher = movies.aggregate(aggregateStages);
publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber());
Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979}
{"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}

Para saber mais sobre os métodos assistente do Atlas Search, consulte a referência da interface SearchOperator na documentação da API Driver Core.

Para ver uma lista completa de operadores de expressão , consulte Operadores de aggregation no manual do MongoDB Server .

Para saber mais sobre como montar um pipeline de agregação e ver exemplos, consulte Pipeline de agregação no manual do MongoDB Server .

Para saber mais sobre como criar estágios de pipeline, consulte Estágios de agregação no manual do MongoDB Server .

Para saber mais sobre como explicar as operações do MongoDB , consulte Explicar planos de saída e query no manual do MongoDB Server .

Para saber mais sobre as aulas e os métodos mencionados neste guia, consulte a seguinte documentação da API:

Voltar

Indexes

Nesta página

  • Pré-requisitos
  • Conecte-se a um MongoDB deployment
  • Fazer aggregation
  • Usar expressões de agregação
  • Explicar uma agregação
  • Atlas Search
  • Criar estágios de pesquisa de pipeline
  • Informações adicionais
  • Documentação da API