Skip to main content

Watch (Change Streams)

Change streams let your application react to changes in a MongoDB collection in real time. When documents are inserted, updated, replaced, or deleted, the stream emits a change event document describing what happened.

Change streams require a MongoDB replica set or sharded cluster (MongoDB 3.6+). They are not available on standalone deployments.

Basic usage

watch returns a WatchQueryBuilder whose .stream method produces an fs2.Stream (or ZStream in the ZIO module):

import cats.effect.IO
import mongo4cats.bson.Document

// Emit a Document for every change in the collection
val changes: fs2.Stream[IO, Document] = collection.watch.stream

changes.evalMap(event => IO.println(s"Change: $event")).compile.drain

Filtering events

Pass an Aggregate pipeline to filter or transform events before they reach your application. This is more efficient than filtering on the client side, because MongoDB applies the pipeline server-side:

import mongo4cats.operations.{Aggregate, Filter}

// Only receive events where the "amount" field is >= 100
val bigChanges: fs2.Stream[IO, Document] =
collection.watch(Aggregate.matchBy(Filter.gte("fullDocument.amount", 100))).stream

You can also transform the event shape using $project or $addFields:

val simplified: fs2.Stream[IO, Document] =
collection
.watch(
Aggregate
.matchBy(Filter.eq("operationType", "insert"))
.project(Projection.include("fullDocument").include("operationType"))
)
.stream

Change event structure

Each emitted document follows the MongoDB change event schema. Key fields:

FieldDescription
operationType"insert", "update", "replace", "delete", "invalidate"
fullDocumentThe full document after the change (for insert/replace; for update, only if fullDocument option is set)
documentKeyThe _id of the changed document
updateDescriptionFor update events: updatedFields and removedFields
nsNamespace: { db, coll }
clusterTimeThe server timestamp of the change

Requesting the full document on updates

By default, update events include only the changed fields in updateDescription, not the full document. To always receive the complete document after the update:

import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.client.model.changestream.FullDocumentBeforeChange

collection.watch.fullDocument(FullDocument.UPDATE_LOOKUP).stream

Resume tokens

Change streams can be resumed after a disconnect using a resume token. The token is available on each event:

import org.bson.BsonDocument

var lastToken: Option[BsonDocument] = None

collection.watch
.resumeAfter(lastToken.orNull) // pass null to start from now
.stream
.evalMap { event =>
lastToken = Some(event.getResumeToken)
IO.println(s"Event: $event")
}
.compile
.drain

Using with a client session

client.startSession.use { session =>
collection.watch(session).stream.evalMap(IO.println).compile.drain
}

Watching multiple collections or a database

To watch all collections in a database, call watch on a MongoDatabase[F] instance. To watch the entire deployment, call it on a MongoClient[F]. The API is identical.