Skip to content
27 changes: 27 additions & 0 deletions commons-datastore/commons-datastore-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,36 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</dependency>
<!--
JUnit and flapdoodle are compile-scope + optional because EmbeddedMongoDBManager
and EmbeddedMongoDBRule (under .mongodb.test) ship in the main jar so downstream
projects can reuse them. They don't transitively propagate: consumers must add
their own test-scope flapdoodle dependency to actually use them.
-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.process</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>de.flapdoodle.reverse</groupId>
<artifactId>de.flapdoodle.reverse</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!--TODO Review -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.opencb.commons.datastore.core.QueryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
* MongoDBCursor wrapper for queries that require a long time to process the results.
* Avoids {@link MongoCursorNotFoundException}.
Expand All @@ -47,6 +52,7 @@ public class MongoPersistentCursor implements MongoCursor<Document> {
private final QueryOptions options;
private final Bson query;
private final Bson projection;
private final List<Bson> pipeline;
private MongoDBCollection collection;

private MongoCursor<Document> mongoCursor;
Expand All @@ -71,6 +77,57 @@ public MongoPersistentCursor(MongoDBCollection collection, Bson query, Bson proj
this.options = options;
this.query = query;
this.projection = projection;
this.pipeline = null;
this.collection = collection;

if (batchSize > 0) {
this.batchSize = batchSize;
}
if (limit > 0) {
this.limit = limit;
}
if (skip > 0) {
this.skip = skip;
}

reset();
}

/**
* Create a persistent cursor backed by an aggregation pipeline.
* The pipeline's first stage should be a {@code $match} stage so that the resume filter
* ({@code _id > lastId}) can be merged into it efficiently.
* The cursor will automatically add a {@code $sort: {_id: 1}} stage when no sort is present,
* ensuring deterministic ordering required for reliable resume.
* Will fail if the {@code $project} stage excludes the {@code _id} field.
*
* @param collection MongoDB collection to run the pipeline against.
* @param pipeline Aggregation pipeline. Must not be empty and should start with {@code $match}.
* @param options Query options (BATCH_SIZE, LIMIT, SKIP are read from here).
*/
public MongoPersistentCursor(MongoDBCollection collection, List<Bson> pipeline, QueryOptions options) {
this(collection, pipeline, options,
options != null ? options.getInt(MongoDBCollection.BATCH_SIZE, 0) : 0,
options != null ? options.getInt(QueryOptions.LIMIT, 0) : 0,
options != null ? options.getInt(QueryOptions.SKIP, 0) : 0);
}

/**
* Create a persistent cursor backed by an aggregation pipeline with explicit pagination parameters.
*
* @param collection MongoDB collection to run the pipeline against.
* @param pipeline Aggregation pipeline. Must not be empty and should start with {@code $match}.
* @param options Query options (used for sort detection).
* @param batchSize MongoDB cursor batch size (0 = server default).
* @param limit Maximum number of documents to return (0 = unlimited).
* @param skip Number of documents to skip at the start (0 = none).
*/
public MongoPersistentCursor(MongoDBCollection collection, List<Bson> pipeline, QueryOptions options,
int batchSize, int limit, int skip) {
this.options = options;
this.query = null;
this.projection = null;
this.pipeline = new ArrayList<>(pipeline);
this.collection = collection;

if (batchSize > 0) {
Expand All @@ -93,6 +150,15 @@ protected void reset() {
}

protected MongoPersistentCursor resume(Object lastObjectId) {
if (pipeline != null) {
resumeAggregate(lastObjectId);
} else {
resumeFind(lastObjectId);
}
return this;
}

private void resumeFind(Object lastObjectId) {
Bson query;
if (lastObjectId != null) {
query = Filters.and(Filters.gt("_id", lastObjectId), this.query);
Expand All @@ -106,15 +172,69 @@ protected MongoPersistentCursor resume(Object lastObjectId) {
mongoCursor = iterable
.batchSize(batchSize)
.limit(limit)
.skip(skip)
.skip(lastObjectId == null ? skip : 0)
.iterator();
return this;
}

/**
* Build and execute an aggregation pipeline cursor, injecting a resume filter and ensuring
* deterministic {@code _id} ordering when resuming after a cursor expiry.
*/
private void resumeAggregate(Object lastObjectId) {
List<Bson> activePipeline = new ArrayList<>(pipeline);

// Inject _id > lastId into the first $match stage so MongoDB can use the index.
if (lastObjectId != null) {
Bson resumeFilter = Filters.gt("_id", lastObjectId);

BsonDocument stage = activePipeline.get(0).toBsonDocument();

if (!activePipeline.isEmpty() && stage.containsKey("$match")) {
BsonDocument existingMatch = stage.get("$match").asDocument();
activePipeline.set(0, new Document("$match", Filters.and(existingMatch, resumeFilter)));
} else {
activePipeline.add(0, new Document("$match", resumeFilter));
}
}

// Ensure a $sort: {_id: 1} stage is present so that any resume starts past the right point.
// Only added when no explicit sort stage exists and no sort is specified in options.
boolean hasSortStage = activePipeline.stream().anyMatch(s -> s.toBsonDocument().containsKey("$sort"));
if (!hasSortStage && (options == null || !options.containsKey(QueryOptions.SORT))) {
// Insert before any existing $skip or $limit stages.
int insertPos = activePipeline.size();
for (int i = 0; i < activePipeline.size(); i++) {
BsonDocument stage = activePipeline.get(i).toBsonDocument();
if (stage.containsKey("$skip") || stage.containsKey("$limit")) {
insertPos = i;
break;
}
}
activePipeline.add(insertPos, new Document("$sort", new Document("_id", 1)));
}

if (skip > 0 && lastObjectId == null) {
activePipeline.add(new Document("$skip", skip));
}
if (limit > 0) {
activePipeline.add(new Document("$limit", limit));
}

AggregateIterable<Document> iterable = newAggregateIterable(activePipeline);
if (batchSize > 0) {
iterable.batchSize(batchSize);
}
mongoCursor = iterable.iterator();
}

protected FindIterable<Document> newFindIterable(Bson query, Bson projection, QueryOptions options) {
return this.collection.nativeQuery().nativeFind(null, query, projection, options);
}

protected AggregateIterable<Document> newAggregateIterable(List<Bson> activePipeline) {
return collection.nativeQuery().getDbCollection().aggregate(activePipeline);
}

public Object getLastId() {
return lastId;
}
Expand Down
Loading
Loading