Skip to main content

Aggregate

Aggregation pipelines process documents through a sequence of stages, where the output of each stage is the input to the next. This is MongoDB's most powerful query mechanism — it supports grouping, joining collections, computing new fields, bucketing, full-text search, vector search, and more.

Full background is in the official MongoDB documentation.

Building a pipeline

Pipeline stages are added by chaining methods on the Aggregate object:

import mongo4cats.operations.{Accumulator, Aggregate, Filter, Projection, Sort}

val pipeline = Aggregate
.matchBy(Filter.gte("amount", 100)) // $match — filter early to reduce work
.group("$category", Accumulator // $group — group by category
.sum("total", "$amount")
.count("count")
.first("categoryName", "$category.name"))
.sort(Sort.desc("total")) // $sort — order results
.limit(10) // $limit — top 10 categories

Executing the pipeline

import mongo4cats.bson.Document

// Get the first result
val first: IO[Option[Document]] = collection.aggregate[Document](pipeline).first

// Collect all results
val all: IO[Iterable[Document]] = collection.aggregate[Document](pipeline).all

// Stream results
val stream: fs2.Stream[IO, Document] = collection.aggregate[Document](pipeline).stream

With a typed result

If the pipeline output maps to a Scala case class, use aggregateWithCodec:

import io.circe.generic.auto._
import mongo4cats.circe._

final case class CategorySummary(category: String, total: Double, count: Int)

val result: fs2.Stream[IO, CategorySummary] =
collection.aggregateWithCodec[CategorySummary](pipeline).stream

Stage reference

Filtering and shaping

StageMethodDescription
$match.matchBy(filter)Filter documents
$project.project(projection)Include/exclude/compute fields
$addFields / $set.addFields(...) / .set(...)Add new computed fields
$unset.unset(fields*)Remove fields
$replaceWith.replaceWith(expr)Replace document with an expression

Grouping and counting

StageMethodDescription
$group.group(id, accumulator)Group documents and accumulate values
$count.count / .count(field)Count documents into a field
$sortByCount.sortByCount(expr)Group by an expression and sort by count
$bucket.bucket(groupBy, boundaries)Categorise into fixed buckets
$bucketAuto.bucketAuto(groupBy, n)Automatically determine n buckets
$facet.facet(facets*)Run multiple sub-pipelines in parallel

Pagination

StageMethodDescription
$sort.sort(sort)Sort documents
$skip.skip(n)Skip the first n documents
$limit.limit(n)Limit to n documents
$sample.sample(n)Randomly sample n documents

Joining collections

// Simple equality join — adds a "category" array field
Aggregate.lookup("categories", "categoryId", "_id", "category")

// Join with a sub-pipeline (e.g. with additional filtering)
val subPipeline = Aggregate.matchBy(Filter.eq("active", true))
Aggregate.lookup("categories", subPipeline, "category")

$graphLookup for recursive lookups (e.g. org charts, friend-of-friend):

Aggregate.graphLookup(
from = "employees",
startWith = "$reportsTo",
connectFromField = "reportsTo",
connectToField = "_id",
as = "reportingHierarchy"
)

Unwinding arrays

$unwind deconstructs an array field so that each element becomes its own document:

// Emit one document per tag
Aggregate.unwind("$tags")

// Preserve documents where the array is missing or empty
import mongo4cats.models.collection.UnwindOptions
Aggregate.unwind("$tags", UnwindOptions().preserveNullAndEmptyArrays(true))

Writing results

// Write results to a collection (same database)
Aggregate.out("outputCollection")

// Write to a different database
Aggregate.out("otherDb", "outputCollection")

// Merge results into a collection (upsert semantics)
Aggregate.merge("targetCollection")

collection.aggregate[Document](pipeline).toCollection

Set window fields

Compute running totals, moving averages, and other window functions:

import com.mongodb.client.model.{WindowOutputField, Windows}

Aggregate.setWindowFields(
partitionBy = "$category",
sortBy = Sort.asc("date"),
outputs = Seq(
WindowOutputField.sum("runningTotal", "$amount", Windows.unboundedPreceding())
)
)

Union

Combine documents from two collections:

val otherPipeline = Aggregate.matchBy(Filter.eq("source", "external"))
Aggregate.unionWith("otherCollection", otherPipeline)

Densify and fill (MongoDB 5.1+)

import com.mongodb.client.model.densify.{DensifyOptions, DensifyRange}
import com.mongodb.client.model.fill.{FillOptions, FillOutputField}

// Fill in missing date steps
Aggregate.densify("date", DensifyRange.fullRangeWithStep(DensifyOptions.densifyOptions(), 1, "day"))

// Replace null/missing values
Aggregate.fill(
FillOptions.fillOptions().sortBy(Sort.asc("date").toBson),
FillOutputField.locf("price") // last observation carried forward
)

Accumulator reference

Accumulator is used with .group(id, accumulator) to compute aggregate values per group:

val acc = Accumulator
.sum("totalAmount", "$amount") // $sum
.count("count") // $sum: 1
.avg("avgAmount", "$amount") // $avg
.first("firstItem", "$name") // $first
.last("lastItem", "$name") // $last
.min("minAmount", "$amount") // $min
.max("maxAmount", "$amount") // $max
.push("items", "$name") // $push — collect into array
.addToSet("uniqueItems", "$name") // $addToSet — collect unique values

Combining pipelines

Two Aggregate values can be concatenated:

val stage1 = Aggregate.matchBy(Filter.eq("active", true))
val stage2 = Aggregate.sort(Sort.desc("score")).limit(5)
val combined = stage1.combinedWith(stage2)

These stages require a MongoDB Atlas cluster with Atlas Search enabled.

import com.mongodb.client.model.search.{SearchOperator, SearchOptions}

// Full-text Atlas Search
Aggregate.search(
SearchOperator.text(FieldSearchPath.fieldPath("description"), "functional programming")
)

// Vector search (requires a vector index)
import com.mongodb.client.model.search.{FieldSearchPath, VectorSearchOptions}

Aggregate.vectorSearch(
path = FieldSearchPath.fieldPath("embedding"),
queryVector = queryEmbedding,
index = "vector_index",
limit = 10L,
options = VectorSearchOptions.approximateVectorSearchOptions(100L)
)

Using raw Document expressions

The typed Aggregate builders cover the most common stages, but MongoDB's aggregation language has hundreds of operators. Whenever a built-in method doesn't exist for a particular operator (e.g. $filter, $map, $reduce, $cond, $switch, custom $expr expressions), you can drop down to raw Document / BsonValue values and pass them directly to addFields, set, project, and group.

Conditional expressions with $cond

A common use case is computing a derived field whose value depends on another field. addFields accepts any Document as the expression value:

import mongo4cats.bson.{BsonValue, Document}
import mongo4cats.bson.syntax._
import mongo4cats.operations.{Aggregate, Filter}

// Label each order as "high" or "standard" based on its total value
val pipeline = Aggregate
.matchBy(Filter.eq("status", "completed"))
.addFields(
"priority" -> Document(
"$cond" := Document(
"if" := BsonValue.document("$gte" := BsonValue.array(BsonValue.string("$total"), BsonValue.int(500))),
"then" := BsonValue.string("high"),
"else" := BsonValue.string("standard")
)
)
)

Filtering arrays inline with $filter

$filter keeps only the array elements that match a condition. Pass the expression as a Document to addFields:

// Keep only the order lines where quantity > 0
val pipeline = Aggregate
.matchBy(Filter.eq("status", "pending"))
.addFields(
"activeLines" -> Document(
"$filter" := Document(
"input" := "$lines",
"as" := "line",
"cond" := BsonValue.document(
"$gt" := BsonValue.array(BsonValue.string("$$line.qty"), BsonValue.int(0))
)
)
)
)

Computed projections from raw expressions

Projection.computed(fieldName, expression) accepts any value as the expression, so you can shape the output document using raw operators:

import mongo4cats.operations.Projection

val pipeline = Aggregate
.matchBy(Filter.eq("category", "electronics"))
.project(
Projection
.excludeId
.include("name")
.computed(
"discountedPrice",
Document("$multiply" := BsonValue.array(BsonValue.string("$price"), BsonValue.double(0.9)))
)
.computed(
"inStock",
Document("$gt" := BsonValue.array(BsonValue.string("$qty"), BsonValue.int(0)))
)
)

The general rule

Any place in the Aggregate or Projection API that accepts a TExpression type parameter will accept a Document or BsonValue. This means the entire MongoDB aggregation expression language is accessible — the typed builders are a convenience layer on top of the same underlying BSON.