Skip to content

Commit 5197c21

Browse files
author
Marcelo Vanzin
committed
SHS-NG M1: Add "max" and "last" to kvstore iterators.
This makes it easier for callers to control the end of iteration, making it easier to write Scala code that automatically closes underlying iterator resources. Before, code had to use Scala's "takeWhile", convert the result to a list, and manually close the iterators; with these two parameters, that can be avoided in a bunch of cases, with iterators auto-closing when the last element is reached.
1 parent 7b87021 commit 5197c21

File tree

3 files changed

+167
-40
lines changed

3 files changed

+167
-40
lines changed

common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ public abstract class KVStoreView<T> implements Iterable<T> {
4343
boolean ascending = true;
4444
String index = KVIndex.NATURAL_INDEX_NAME;
4545
Object first = null;
46+
Object last = null;
4647
long skip = 0L;
48+
long max = Long.MAX_VALUE;
4749

4850
public KVStoreView(Class<T> type) {
4951
this.type = type;
@@ -74,7 +76,25 @@ public KVStoreView<T> first(Object value) {
7476
}
7577

7678
/**
77-
* Skips a number of elements in the resulting iterator.
79+
* Stops iteration at the given value of the chosen index.
80+
*/
81+
public KVStoreView<T> last(Object value) {
82+
this.last = value;
83+
return this;
84+
}
85+
86+
/**
87+
* Stops iteration after a number of elements has been retrieved.
88+
*/
89+
public KVStoreView<T> max(long max) {
90+
Preconditions.checkArgument(max > 0L, "max must be positive.");
91+
this.max = max;
92+
return this;
93+
}
94+
95+
/**
96+
* Skips a number of elements at the start of iteration. Skipped elements are not accounted
97+
* when using {@link #max(long)}.
7898
*/
7999
public KVStoreView<T> skip(long n) {
80100
this.skip = n;

common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBIterator.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
3838
private final LevelDBTypeInfo.Index index;
3939
private final byte[] indexKeyPrefix;
4040
private final byte[] end;
41+
private final long max;
4142

4243
private boolean checkedNext;
4344
private T next;
4445
private boolean closed;
46+
private long count;
4547

4648
LevelDBIterator(LevelDB db, KVStoreView<T> params) throws Exception {
4749
this.db = db;
@@ -51,6 +53,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
5153
this.ti = db.getTypeInfo(type);
5254
this.index = ti.index(params.index);
5355
this.indexKeyPrefix = index.keyPrefix();
56+
this.max = params.max;
5457

5558
byte[] firstKey;
5659
if (params.first != null) {
@@ -66,14 +69,27 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
6669
}
6770
it.seek(firstKey);
6871

72+
byte[] end = null;
6973
if (ascending) {
70-
this.end = index.end();
74+
end = params.last != null ? index.end(params.last) : index.end();
7175
} else {
72-
this.end = null;
76+
if (params.last != null) {
77+
end = index.start(params.last);
78+
}
7379
if (it.hasNext()) {
74-
it.next();
80+
// When descending, the caller may have set up the start of iteration at a non-existant
81+
// entry that is guaranteed to be after the desired entry. For example, if you have a
82+
// compound key (a, b) where b is a, integer, you may seek to the end of the elements that
83+
// have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not
84+
// exist in the database. So need to check here whether the next value actually belongs to
85+
// the set being returned by the iterator before advancing.
86+
byte[] nextKey = it.peekNext().getKey();
87+
if (compare(nextKey, indexKeyPrefix) <= 0) {
88+
it.next();
89+
}
7590
}
7691
}
92+
this.end = end;
7793

7894
if (params.skip > 0) {
7995
skip(params.skip);
@@ -147,6 +163,10 @@ public synchronized void close() throws IOException {
147163
}
148164

149165
private T loadNext() {
166+
if (count >= max) {
167+
return null;
168+
}
169+
150170
try {
151171
while (true) {
152172
boolean hasNext = ascending ? it.hasNext() : it.hasPrev();
@@ -173,11 +193,16 @@ private T loadNext() {
173193
return null;
174194
}
175195

176-
// If there's a known end key and it's found, stop.
177-
if (end != null && Arrays.equals(nextKey, end)) {
178-
return null;
196+
// If there's a known end key and iteration has gone past it, stop.
197+
if (end != null) {
198+
int comp = compare(nextKey, end) * (ascending ? 1 : -1);
199+
if (comp > 0) {
200+
return null;
201+
}
179202
}
180203

204+
count++;
205+
181206
// Next element is part of the iteration, return it.
182207
if (index == null || index.isCopy()) {
183208
return db.serializer.deserialize(nextEntry.getValue(), type);
@@ -228,4 +253,17 @@ private byte[] stitch(byte[]... comps) {
228253
return dest;
229254
}
230255

256+
private int compare(byte[] a, byte[] b) {
257+
int diff = 0;
258+
int minLen = Math.min(a.length, b.length);
259+
for (int i = 0; i < minLen; i++) {
260+
diff += (a[i] - b[i]);
261+
if (diff != 0) {
262+
return diff;
263+
}
264+
}
265+
266+
return a.length - b.length;
267+
}
268+
231269
}

common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java

Lines changed: 102 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -152,111 +152,170 @@ public static void cleanup() throws Exception {
152152

153153
@Test
154154
public void naturalIndex() throws Exception {
155-
testIteration(NATURAL_ORDER, view(), null);
155+
testIteration(NATURAL_ORDER, view(), null, null);
156156
}
157157

158158
@Test
159159
public void refIndex() throws Exception {
160-
testIteration(REF_INDEX_ORDER, view().index("id"), null);
160+
testIteration(REF_INDEX_ORDER, view().index("id"), null, null);
161161
}
162162

163163
@Test
164164
public void copyIndex() throws Exception {
165-
testIteration(COPY_INDEX_ORDER, view().index("name"), null);
165+
testIteration(COPY_INDEX_ORDER, view().index("name"), null, null);
166166
}
167167

168168
@Test
169169
public void numericIndex() throws Exception {
170-
testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null);
170+
testIteration(NUMERIC_INDEX_ORDER, view().index("int"), null, null);
171171
}
172172

173173
@Test
174174
public void naturalIndexDescending() throws Exception {
175-
testIteration(NATURAL_ORDER, view().reverse(), null);
175+
testIteration(NATURAL_ORDER, view().reverse(), null, null);
176176
}
177177

178178
@Test
179179
public void refIndexDescending() throws Exception {
180-
testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null);
180+
testIteration(REF_INDEX_ORDER, view().index("id").reverse(), null, null);
181181
}
182182

183183
@Test
184184
public void copyIndexDescending() throws Exception {
185-
testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null);
185+
testIteration(COPY_INDEX_ORDER, view().index("name").reverse(), null, null);
186186
}
187187

188188
@Test
189189
public void numericIndexDescending() throws Exception {
190-
testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null);
190+
testIteration(NUMERIC_INDEX_ORDER, view().index("int").reverse(), null, null);
191191
}
192192

193193
@Test
194194
public void naturalIndexWithStart() throws Exception {
195-
CustomType1 first = pickFirst();
196-
testIteration(NATURAL_ORDER, view().first(first.key), first);
195+
CustomType1 first = pickLimit();
196+
testIteration(NATURAL_ORDER, view().first(first.key), first, null);
197197
}
198198

199199
@Test
200200
public void refIndexWithStart() throws Exception {
201-
CustomType1 first = pickFirst();
202-
testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first);
201+
CustomType1 first = pickLimit();
202+
testIteration(REF_INDEX_ORDER, view().index("id").first(first.id), first, null);
203203
}
204204

205205
@Test
206206
public void copyIndexWithStart() throws Exception {
207-
CustomType1 first = pickFirst();
208-
testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first);
207+
CustomType1 first = pickLimit();
208+
testIteration(COPY_INDEX_ORDER, view().index("name").first(first.name), first, null);
209209
}
210210

211211
@Test
212212
public void numericIndexWithStart() throws Exception {
213-
CustomType1 first = pickFirst();
214-
testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first);
213+
CustomType1 first = pickLimit();
214+
testIteration(NUMERIC_INDEX_ORDER, view().index("int").first(first.num), first, null);
215215
}
216216

217217
@Test
218218
public void naturalIndexDescendingWithStart() throws Exception {
219-
CustomType1 first = pickFirst();
220-
testIteration(NATURAL_ORDER, view().reverse().first(first.key), first);
219+
CustomType1 first = pickLimit();
220+
testIteration(NATURAL_ORDER, view().reverse().first(first.key), first, null);
221221
}
222222

223223
@Test
224224
public void refIndexDescendingWithStart() throws Exception {
225-
CustomType1 first = pickFirst();
226-
testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first);
225+
CustomType1 first = pickLimit();
226+
testIteration(REF_INDEX_ORDER, view().reverse().index("id").first(first.id), first, null);
227227
}
228228

229229
@Test
230230
public void copyIndexDescendingWithStart() throws Exception {
231-
CustomType1 first = pickFirst();
232-
testIteration(COPY_INDEX_ORDER, view().reverse().index("name").first(first.name),
233-
first);
231+
CustomType1 first = pickLimit();
232+
testIteration(COPY_INDEX_ORDER, view().reverse().index("name").first(first.name), first, null);
234233
}
235234

236235
@Test
237236
public void numericIndexDescendingWithStart() throws Exception {
238-
CustomType1 first = pickFirst();
239-
testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").first(first.num),
240-
first);
237+
CustomType1 first = pickLimit();
238+
testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").first(first.num), first, null);
241239
}
242240

243241
@Test
244242
public void naturalIndexWithSkip() throws Exception {
245-
testIteration(NATURAL_ORDER, view().skip(RND.nextInt(allEntries.size() / 2)), null);
243+
testIteration(NATURAL_ORDER, view().skip(RND.nextInt(allEntries.size() / 2)), null, null);
246244
}
247245

248246
@Test
249247
public void refIndexWithSkip() throws Exception {
250248
testIteration(REF_INDEX_ORDER, view().index("id").skip(RND.nextInt(allEntries.size() / 2)),
251-
null);
249+
null, null);
252250
}
253251

254252
@Test
255253
public void copyIndexWithSkip() throws Exception {
256254
testIteration(COPY_INDEX_ORDER, view().index("name").skip(RND.nextInt(allEntries.size() / 2)),
257-
null);
255+
null, null);
258256
}
259257

258+
@Test
259+
public void naturalIndexWithMax() throws Exception {
260+
testIteration(NATURAL_ORDER, view().max(RND.nextInt(allEntries.size() / 2)), null, null);
261+
}
262+
263+
@Test
264+
public void copyIndexWithMax() throws Exception {
265+
testIteration(COPY_INDEX_ORDER, view().index("name").max(RND.nextInt(allEntries.size() / 2)),
266+
null, null);
267+
}
268+
269+
@Test
270+
public void naturalIndexWithLast() throws Exception {
271+
CustomType1 last = pickLimit();
272+
testIteration(NATURAL_ORDER, view().last(last.key), null, last);
273+
}
274+
275+
@Test
276+
public void refIndexWithLast() throws Exception {
277+
CustomType1 last = pickLimit();
278+
testIteration(REF_INDEX_ORDER, view().index("id").last(last.id), null, last);
279+
}
280+
281+
@Test
282+
public void copyIndexWithLast() throws Exception {
283+
CustomType1 last = pickLimit();
284+
testIteration(COPY_INDEX_ORDER, view().index("name").last(last.name), null, last);
285+
}
286+
287+
@Test
288+
public void numericIndexWithLast() throws Exception {
289+
CustomType1 last = pickLimit();
290+
testIteration(NUMERIC_INDEX_ORDER, view().index("int").last(last.num), null, last);
291+
}
292+
293+
@Test
294+
public void naturalIndexDescendingWithLast() throws Exception {
295+
CustomType1 last = pickLimit();
296+
testIteration(NATURAL_ORDER, view().reverse().last(last.key), null, last);
297+
}
298+
299+
@Test
300+
public void refIndexDescendingWithLast() throws Exception {
301+
CustomType1 last = pickLimit();
302+
testIteration(REF_INDEX_ORDER, view().reverse().index("id").last(last.id), null, last);
303+
}
304+
305+
@Test
306+
public void copyIndexDescendingWithLast() throws Exception {
307+
CustomType1 last = pickLimit();
308+
testIteration(COPY_INDEX_ORDER, view().reverse().index("name").last(last.name),
309+
null, last);
310+
}
311+
312+
@Test
313+
public void numericIndexDescendingWithLast() throws Exception {
314+
CustomType1 last = pickLimit();
315+
testIteration(NUMERIC_INDEX_ORDER, view().reverse().index("int").last(last.num),
316+
null, last);
317+
}
318+
260319
@Test
261320
public void testRefWithIntNaturalKey() throws Exception {
262321
LevelDBSuite.IntKeyType i = new LevelDBSuite.IntKeyType();
@@ -272,8 +331,8 @@ public void testRefWithIntNaturalKey() throws Exception {
272331
}
273332
}
274333

275-
private CustomType1 pickFirst() {
276-
// Picks a first element that has clashes with other elements in the given index.
334+
private CustomType1 pickLimit() {
335+
// Picks an element that has clashes with other elements in the given index.
277336
return clashingEntries.get(RND.nextInt(clashingEntries.size()));
278337
}
279338

@@ -297,22 +356,32 @@ private <T extends Comparable<T>> int compareWithFallback(
297356
private void testIteration(
298357
final BaseComparator order,
299358
final KVStoreView<CustomType1> params,
300-
final CustomType1 first) throws Exception {
359+
final CustomType1 first,
360+
final CustomType1 last) throws Exception {
301361
List<CustomType1> indexOrder = sortBy(order.fallback());
302362
if (!params.ascending) {
303363
indexOrder = Lists.reverse(indexOrder);
304364
}
305365

306366
Iterable<CustomType1> expected = indexOrder;
367+
BaseComparator expectedOrder = params.ascending ? order : order.reverse();
368+
307369
if (first != null) {
308-
final BaseComparator expectedOrder = params.ascending ? order : order.reverse();
309370
expected = Iterables.filter(expected, v -> expectedOrder.compare(first, v) <= 0);
310371
}
311372

373+
if (last != null) {
374+
expected = Iterables.filter(expected, v -> expectedOrder.compare(v, last) <= 0);
375+
}
376+
312377
if (params.skip > 0) {
313378
expected = Iterables.skip(expected, (int) params.skip);
314379
}
315380

381+
if (params.max != Long.MAX_VALUE) {
382+
expected = Iterables.limit(expected, (int) params.max);
383+
}
384+
316385
List<CustomType1> actual = collect(params);
317386
compareLists(expected, actual);
318387
}

0 commit comments

Comments
 (0)