Skip to content

Commit 9969345

Browse files
committed
Refactor terminal stages
1 parent 44135a6 commit 9969345

File tree

4 files changed

+28
-17
lines changed

4 files changed

+28
-17
lines changed

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/Aggregation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ public void validate(Document query) {
265265
}
266266
}
267267

268-
private boolean isLastStage(TerminalStage aggregationStage) {
268+
private boolean isLastStage(TerminalStage stage) {
269269
Assert.notEmpty(stages);
270-
return stages.indexOf(aggregationStage) == stages.size() - 1;
270+
return stages.indexOf(stage) == stages.size() - 1;
271271
}
272272

273273
public boolean isModifying() {

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/MergeStage.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import de.bwaldvogel.mongo.exception.MongoServerError;
2727
import de.bwaldvogel.mongo.exception.TypeMismatchException;
2828

29-
public class MergeStage implements TerminalStage {
29+
public class MergeStage extends TerminalStage {
3030

3131
private static final Set<String> KNOWN_KEYS = new HashSet<>(Arrays.asList("into", "on", "whenMatched", "whenNotMatched"));
3232

@@ -166,7 +166,7 @@ public String name() {
166166
}
167167

168168
@Override
169-
public Stream<Document> apply(Stream<Document> stream) {
169+
public void applyLast(Stream<Document> stream) {
170170
MongoCollection<?> collection = targetCollectionSupplier.get();
171171

172172
stream.forEach(document -> {
@@ -178,11 +178,7 @@ public Stream<Document> apply(Stream<Document> stream) {
178178
case merge:
179179
Document mergedDocument = existingDocument.clone();
180180
mergedDocument.merge(document);
181-
if (!mergedDocument.get("_id").equals(existingDocument.get("_id"))) {
182-
throw new ImmutableFieldException("$merge failed to update the matching document, did you attempt to modify the _id or the shard key?" +
183-
" :: caused by :: " +
184-
"Performing an update on the path '_id' would modify the immutable field '_id'");
185-
}
181+
assertIdHasNotChanged(existingDocument, mergedDocument);
186182
replaceDocument(collection, existingDocument, mergedDocument);
187183
break;
188184
case replace:
@@ -211,8 +207,14 @@ public Stream<Document> apply(Stream<Document> stream) {
211207
}
212208
}
213209
});
210+
}
214211

215-
return Stream.empty();
212+
private static void assertIdHasNotChanged(Document one, Document other) {
213+
if (!one.get("_id").equals(other.get("_id"))) {
214+
throw new ImmutableFieldException("$merge failed to update the matching document, did you attempt to modify the _id or the shard key?" +
215+
" :: caused by :: " +
216+
"Performing an update on the path '_id' would modify the immutable field '_id'");
217+
}
216218
}
217219

218220
private Document getJoinQuery(Document document) {

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/OutStage.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package de.bwaldvogel.mongo.backend.aggregation.stage;
22

3-
import java.util.Collections;
43
import java.util.stream.Stream;
54

65
import org.slf4j.Logger;
@@ -12,7 +11,7 @@
1211
import de.bwaldvogel.mongo.exception.IllegalOperationException;
1312
import de.bwaldvogel.mongo.oplog.NoopOplog;
1413

15-
public class OutStage implements TerminalStage {
14+
public class OutStage extends TerminalStage {
1615

1716
private static final Logger log = LoggerFactory.getLogger(OutStage.class);
1817

@@ -30,20 +29,19 @@ public String name() {
3029
}
3130

3231
@Override
33-
public Stream<Document> apply(Stream<Document> stream) {
32+
public void applyLast(Stream<Document> stream) {
3433
if (this.collectionName.contains("$")) {
3534
throw new IllegalOperationException("error with target namespace: Invalid collection name: " + this.collectionName);
3635
}
3736
String tempCollectionName = "_tmp" + System.currentTimeMillis() + "_" + this.collectionName;
3837
MongoCollection<?> tempCollection = database.createCollectionOrThrowIfExists(tempCollectionName);
39-
stream.forEach(document -> tempCollection.insertDocuments(Collections.singletonList(document)));
38+
stream.forEach(tempCollection::addDocument);
4039
MongoCollection<?> existingCollection = database.resolveCollection(this.collectionName, false);
4140
if (existingCollection != null) {
4241
log.info("Dropping existing collection {}", existingCollection);
4342
database.dropCollection(this.collectionName, NoopOplog.get());
4443
}
4544
database.moveCollection(database, tempCollection, this.collectionName);
46-
return Stream.empty();
4745
}
4846

4947
@Override
Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
package de.bwaldvogel.mongo.backend.aggregation.stage;
22

3-
public interface TerminalStage extends AggregationStage {
3+
import java.util.stream.Stream;
44

5-
String name();
5+
import de.bwaldvogel.mongo.bson.Document;
66

7+
public abstract class TerminalStage implements AggregationStage {
8+
9+
public abstract String name();
10+
11+
abstract void applyLast(Stream<Document> stream);
12+
13+
@Override
14+
public final Stream<Document> apply(Stream<Document> stream) {
15+
applyLast(stream);
16+
return Stream.empty();
17+
}
718
}

0 commit comments

Comments
 (0)