Framework de aggregation
O pipeline de agregação é uma estrutura para agregação de dados, modelada sobre o conceito de pipelines de processamento de dados.
Para saber mais sobre agregação, consulte Aggregation Pipeline no manual do servidor.
Pré-requisitos
Você deve configurar os seguintes componentes para executar os exemplos de código neste guia:
Uma
test.restaurants
coleção preenchida com documentos dorestaurants.json
arquivo nos ativos de documentação do Github.As seguintes declarações de importação:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
Importante
Este guia usa implementações personalizadas de Subscriber
, que são descritas no guia Amostra de implementações personalizadas de assinantes .
Conecte-se a um MongoDB deployment
Primeiro, conecte a um MongoDB deployment e, em seguida, declare e defina as instâncias MongoDatabase
e MongoCollection
.
O código a seguir se conecta a uma MongoDB deployment standalone em execução em localhost
na porta 27017
. Em seguida, define a variável database
para fazer referência ao banco de dados test
e a variável collection
para fazer referência à coleção restaurants
:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
Para saber mais sobre como se conectar a sistemas do MongoDB, consulte o tutorial Conectar ao MongoDB .
Fazer aggregation
Para executar a agregação, passe uma lista de estágios de agregação para o método MongoCollection.aggregate()
. O driver fornece a classe assistente Aggregates
que contém construtores para estágios de agregação .
Neste exemplo, o aggregation pipeline executa as seguintes tarefas:
Usa um estágio
$match
para filtrar documentos nos quais o campo de arraycategories
contém o elemento"Bakery"
. O exemplo utilizaAggregates.match()
para construir o estágio$match
.
Utiliza um estágio
$group
para agrupar os documentos correspondentes pelo campostars
, acumulando uma contagem de documentos para cada valor distinto destars
. O exemplo utilizaAggregates.group()
para construir o estágio$group
eAccumulators.sum()
para construir a expressão acumulador . Para as expressões acumulador para uso no estágio$group
, o driver forneceAccumulators
classe assistente .
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
Usar expressões de agregação
Para expressões de acumulador do $group
, o driver fornece a classe assistente do Accumulators
. Para outras expressões de agregação , construa manualmente a expressão utilizando a classe Document
.
No exemplo a seguir, o aggregation pipeline usa um estágio $project
para retornar somente o campo name
e o campo calculado firstCategory
cujo valor é o primeiro elemento na array categories
. O exemplo utiliza Aggregates.project()
e vários métodos de classe Projections
para construir o estágio $project
:
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
Explicar uma agregação
Para $explain
um pipeline de agregação , chame o método AggregatePublisher.explain()
:
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());
Atlas Search
Você pode executar uma query do Atlas Search criando e executando um pipeline de agregação que contenha um dos seguintes estágios de pipeline:
$search
$searchMeta
O driver Java Reactive Streams fornece os métodos Aggregates.search() e Aggregates.searchMeta() para executar queries no Atlas Search.
Para saber mais sobre os estágios do pipeline do Atlas Search, consulte Escolher o estágio do pipeline de agregação na documentação do Atlas.
Criar estágios de pesquisa de pipeline
Você pode criar os critérios de pesquisa em seu estágio de pipeline do Atlas Search usando Operadores de pesquisa.
O driver Java Reactive Streams fornece métodos de assistente para os seguintes operadores:
Operador | Descrição |
---|---|
Executa uma pesquisa por uma palavra ou frase que contém uma sequência de caracteres de uma string de entrada incompleta. | |
Combina dois ou mais operadores em uma única query. | |
Verifica se um campo corresponde a um valor que você especificou. Mapeia para os métodos | |
Testa se existe um caminho para um nome de campo indexado especificado em um documento. | |
Executa uma pesquisa por uma array de valores de número BSON, data, boolean, objectId, uuid ou string no caminho fornecido e retorna documentos em que o valor do campo é igual a qualquer valor na array especificada. | |
Retorna documentos semelhantes aos documentos de entrada. | |
Suporta a consulta e pontuação de valores numéricos, de data e de ponto GeoJSON . | |
Executa uma pesquisa de documentos contendo uma sequência ordenada de termos usando o analisador especificado na configuração do índice. | |
Suporta a realização de query de uma combinação de campos e valores indexados. | |
Suporta a consulta e pontuação de valores numéricos, de data e de cadeia de caracteres. Mapeia para os métodos | |
Interpreta o campo de query como uma expressão regular. | |
Executa uma pesquisa de texto completo usando o analisador especificado na configuração do índice. | |
Habilita queries que usam caracteres especiais na string de pesquisa que podem corresponder a qualquer caractere. |
Exemplo de estágio de pesquisa de pipeline
Observação
Conjunto de Dados de Amostra do Atlas
Este exemplo utiliza a collection sample_mflix.movies
a partir do conjunto de dados de amostra do Atlas. Para saber como configurar um Atlas cluster de camada grátis e carregar o conjunto de dados de amostra, consulte o tutorial Introdução ao Atlas na documentação do Atlas.
Antes de executar este exemplo, você deve criar um índice do Atlas Search na coleção movies
que tenha a seguinte definição:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
Para saber mais sobre como criar índices de Atlas Search, consulte a seção Gerenciamento de índices de Atlas Search do guia Índices.
O seguinte código cria um estágio $search
que tem as seguintes especificações:
Verifica se a array
genres
inclui"Comedy"
Pesquisa no campo
fullplot
a frase"new york"
Corresponde a
year
valores entre1950
e2000
, inclusivePesquisa
title
valores que começam com o termo"Love"
Bson searchStageFilters = Aggregates.search( SearchOperator.compound() .filter( List.of( SearchOperator.in(fieldPath("genres"), List.of("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ))); Bson projection = Aggregates.project(Projections.fields( Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
Para saber mais sobre os métodos assistente do Atlas Search, consulte a referência da interface SearchOperator na documentação da API Driver Core.
Informações adicionais
Para ver uma lista completa de operadores de expressão , consulte Operadores de aggregation no manual do MongoDB Server .
Para saber mais sobre como montar um pipeline de agregação e ver exemplos, consulte Pipeline de agregação no manual do MongoDB Server .
Para saber mais sobre como criar estágios de pipeline, consulte Estágios de agregação no manual do MongoDB Server .
Para saber mais sobre como explicar as operações do MongoDB , consulte Explicar planos de saída e query no manual do MongoDB Server .
Documentação da API
Para saber mais sobre as aulas e os métodos mencionados neste guia, consulte a seguinte documentação da API: