Skip to main content

Aggregate

Aggregation operations can be used for processing data from multiple MongoDB collections and returning combined results. In MongoDB aggregations are represented in a form of data processing pipelines where documents go through multiple transformations defined in each step. More detailed explanation of the aggregation process can be found in the official documentation.

To create such aggregation pipeline, Aggregate constructor can be used:

import mongo4cats.operations.{Accumulator, Aggregate, Sort}

// specification for grouping multiple transactions from the same group:
val accumulator = Accumulator
.sum("count", 1) // number of transactions in a given group
.sum("totalAmount", "$amount") // total amount
.first("categoryId", "$category._id") // id of a category under which all transactions are grouped

val aggregation = Aggregate
.group("$category", accumulator) // group all transactions by categoryId and accumulate result into a given specification
.lookup("categories", "categoryId", "_id", "category") // find a category for each group of transactions by category id
.sort(Sort.desc("totalAmount")) // define the order of the produced results

Once the aggregation pipeline is defined, the aggregation operation can be executed by calling aggregate method on a MongoCollection[F] instance. Similarly to find, the result of aggregate can be returned in a form of a single (first) document, list of all documents or a stream:

import mongo4cats.bson.Document

val result: IO[Option[Document]] = collection.aggregate[Document](aggregation).first
val result: IO[Iterable[Document]] = collection.aggregate[Document](aggregation).all
val result: fs2.Stream[IO, Document] = collection.aggregate[Document](aggregation).stream

Analogously to distinct, the result of an aggregation can be tied to a specific class:

val result: fs2.Stream[IO, MyClass] = collection.aggregateWithCodec[MyClass](aggregation).stream

If aggregation pipeline ends with the $out stage (write document to a specified collection), toCollection method can be used:

val result: IO[Unit] = collection.aggregate[Document](aggregation).toCollection