Skip to content

Commit edc57ef

Browse files
committed
Implement support for whenMatched pipelines ($merge aggregation) #152
1 parent 9969345 commit edc57ef

23 files changed

+304
-68
lines changed

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/Aggregation.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import de.bwaldvogel.mongo.backend.aggregation.stage.LookupWithPipelineStage;
3030
import de.bwaldvogel.mongo.backend.aggregation.stage.MatchStage;
3131
import de.bwaldvogel.mongo.backend.aggregation.stage.MergeStage;
32-
import de.bwaldvogel.mongo.backend.aggregation.stage.OrderByStage;
32+
import de.bwaldvogel.mongo.backend.aggregation.stage.SortStage;
3333
import de.bwaldvogel.mongo.backend.aggregation.stage.OutStage;
3434
import de.bwaldvogel.mongo.backend.aggregation.stage.ProjectStage;
3535
import de.bwaldvogel.mongo.backend.aggregation.stage.RedactStage;
@@ -104,7 +104,7 @@ public static Aggregation fromPipeline(List<Document> pipeline, DatabaseResolver
104104
break;
105105
case "$sort":
106106
Document orderBy = (Document) stage.get(stageOperation);
107-
aggregation.addStage(new OrderByStage(orderBy));
107+
aggregation.addStage(new SortStage(orderBy));
108108
break;
109109
case "$project":
110110
Document projection = (Document) stage.get(stageOperation);
@@ -150,7 +150,7 @@ public static Aggregation fromPipeline(List<Document> pipeline, DatabaseResolver
150150
case "$sortByCount":
151151
Object expression = stage.get(stageOperation);
152152
aggregation.addStage(new GroupStage(new Document(ID_FIELD, expression).append("count", new Document("$sum", 1))));
153-
aggregation.addStage(new OrderByStage(new Document("count", -1).append(ID_FIELD, 1)));
153+
aggregation.addStage(new SortStage(new Document("count", -1).append(ID_FIELD, 1)));
154154
break;
155155
case "$bucket":
156156
Document bucket = (Document) stage.get(stageOperation);
@@ -270,6 +270,10 @@ private boolean isLastStage(TerminalStage stage) {
270270
return stages.indexOf(stage) == stages.size() - 1;
271271
}
272272

273+
public List<AggregationStage> getStages() {
274+
return stages;
275+
}
276+
273277
public boolean isModifying() {
274278
return stages.stream().anyMatch(AggregationStage::isModifying);
275279
}

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/Expression.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,18 +1773,34 @@ static Object evaluate(Object expression, Document document) {
17731773
}
17741774
String value = ((String) expression).substring(1);
17751775
if (value.startsWith("$")) {
1776-
if (value.equals("$ROOT")) {
1777-
return document;
1778-
} else if (value.startsWith("$ROOT.")) {
1779-
String subKey = value.substring("$ROOT.".length());
1780-
return Utils.getSubdocumentValue(document, subKey);
1781-
}
1782-
Object subdocumentValue = Utils.getSubdocumentValue(document, value);
1783-
if (!(subdocumentValue instanceof Missing)) {
1784-
return subdocumentValue;
1785-
}
1786-
String variable = value.substring(1);
1787-
throw new MongoServerError(17276, "Use of undefined variable: " + variable);
1776+
final String variableName;
1777+
if (value.contains(".")) {
1778+
variableName = value.substring(0, value.indexOf('.'));
1779+
} else {
1780+
variableName = value;
1781+
}
1782+
1783+
Object variableValue = Utils.getSubdocumentValue(document, variableName);
1784+
if (variableValue instanceof Missing) {
1785+
if (variableName.equals("$ROOT")) {
1786+
variableValue = document;
1787+
} else {
1788+
throw new MongoServerError(17276, "Use of undefined variable: " + variableName.substring(1));
1789+
}
1790+
}
1791+
1792+
if (!value.equals(variableName)) {
1793+
String path = value.substring(variableName.length() + 1);
1794+
final Document evaluatedVariableValue;
1795+
if (variableValue instanceof Document) {
1796+
evaluatedVariableValue = (Document) variableValue;
1797+
} else {
1798+
evaluatedVariableValue = (Document) evaluate(variableValue, document);
1799+
}
1800+
return Utils.getSubdocumentValue(evaluatedVariableValue, path);
1801+
} else {
1802+
return variableValue;
1803+
}
17881804
}
17891805
return Utils.getSubdocumentValueCollectionAware(document, value);
17901806
} else if (expression instanceof Document) {

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/AbstractLookupStage.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,23 @@
99

1010
abstract class AbstractLookupStage implements AggregationStage {
1111

12-
protected String stageName = "$lookup";
1312
static final String FROM = "from";
1413
static final String AS = "as";
1514

15+
@Override
16+
public String name() {
17+
return "$lookup";
18+
}
19+
1620
String readStringConfigurationProperty(Document configuration, String name) {
1721
Object value = configuration.get(name);
1822
if (value == null) {
19-
throw new FailedToParseException("missing '" + name + "' option to " + stageName + " stage specification: " + configuration);
23+
throw new FailedToParseException("missing '" + name + "' option to " + name() + " stage specification: " + configuration);
2024
}
2125
if (value instanceof String) {
2226
return (String) value;
2327
}
24-
throw new FailedToParseException("'" + name + "' option to " + stageName + " must be a string, but was type " + Utils.describeType(value));
28+
throw new FailedToParseException("'" + name + "' option to " + name() + " must be a string, but was type " + Utils.describeType(value));
2529
}
2630

2731
Document readOptionalDocumentArgument(Document configuration, String name) {
@@ -32,13 +36,13 @@ Document readOptionalDocumentArgument(Document configuration, String name) {
3236
if (value instanceof Document) {
3337
return (Document) value;
3438
}
35-
throw new FailedToParseException(stageName + " argument '" + name + ": " + Json.toJsonValue(value) + "' must be an object, is type " + Utils.describeType(value));
39+
throw new FailedToParseException(name() + " argument '" + name + ": " + Json.toJsonValue(value) + "' must be an object, is type " + Utils.describeType(value));
3640
}
3741

3842
void ensureAllConfigurationPropertiesAreKnown(Document configuration, Set<String> configurationKeys) {
3943
for (String name : configuration.keySet()) {
4044
if (!configurationKeys.contains(name)) {
41-
String message = "unknown argument to " + stageName + ": " + name;
45+
String message = "unknown argument to " + name() + ": " + name;
4246
throw new FailedToParseException(message);
4347
}
4448
}

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/AddFieldsStage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ public AddFieldsStage(Document addFields) {
1919
this.addFields = addFields;
2020
}
2121

22+
@Override
23+
public String name() {
24+
return "$addFields";
25+
}
26+
2227
@Override
2328
public Stream<Document> apply(Stream<Document> stream) {
2429
return stream.map(this::projectDocument);

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/AggregationStage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
public interface AggregationStage {
88

9+
String name();
10+
911
Stream<Document> apply(Stream<Document> stream);
1012

1113
default boolean isModifying() {

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/BucketStage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ public BucketStage(Document document) {
3737
output = getAndValidateOutput(document.get("output"));
3838
}
3939

40+
@Override
41+
public String name() {
42+
return "$bucket";
43+
}
44+
4045
private void validateValuePresent(Object value) {
4146
if (value == null) {
4247
throw new MongoServerError(40198, "$bucket requires 'groupBy' and 'boundaries' to be specified.");

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/FacetStage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ public FacetStage(Document facetsConfiguration, DatabaseResolver databaseResolve
2626
}
2727
}
2828

29+
@Override
30+
public String name() {
31+
return "$facet";
32+
}
33+
2934
@Override
3035
public Stream<Document> apply(Stream<Document> stream) {
3136
List<Document> allDocuments = stream.collect(Collectors.toList());

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/GraphLookupStage.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class GraphLookupStage extends AbstractLookupStage {
4343
private final MongoCollection<?> collection;
4444

4545
public GraphLookupStage(Document configuration, MongoDatabase mongoDatabase) {
46-
stageName = "$graphLookup";
4746
String from = readStringConfigurationProperty(configuration, FROM);
4847
collection = mongoDatabase.resolveCollection(from, false);
4948
readStringConfigurationProperty(configuration, FROM);
@@ -56,6 +55,11 @@ public GraphLookupStage(Document configuration, MongoDatabase mongoDatabase) {
5655
ensureAllConfigurationPropertiesAreKnown(configuration, CONFIGURATION_KEYS);
5756
}
5857

58+
@Override
59+
public String name() {
60+
return "$graphLookup";
61+
}
62+
5963
Integer readOptionalIntegerConfigurationProperty(Document configuration, String name) {
6064
Object value = configuration.get(name);
6165
if (value == null) {

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/GroupStage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public GroupStage(Document groupQuery) {
3232
accumulatorSuppliers = Accumulator.parse(groupQuery);
3333
}
3434

35+
@Override
36+
public String name() {
37+
return "$group";
38+
}
39+
3540
@Override
3641
public Stream<Document> apply(Stream<Document> stream) {
3742
Map<Object, Collection<Accumulator>> accumulatorsPerKey = new TreeMap<>(ValueComparator.asc());

core/src/main/java/de/bwaldvogel/mongo/backend/aggregation/stage/IndexStatsStage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public IndexStatsStage(MongoCollection<?> collection) {
1616
this.collection = collection;
1717
}
1818

19+
@Override
20+
public String name() {
21+
return "$indexStats";
22+
}
23+
1924
@Override
2025
public Stream<Document> apply(Stream<Document> stream) {
2126
return collection.getIndexes().stream()

0 commit comments

Comments
 (0)