Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions src/exoscale/vinyl/cursor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,31 @@
(as-list [this] "Transform a cursor or cursor future to a list")
(as-iterator [this] "Transform a cursor or cursor future to an iterator"))

(defn apply-transduce-with-reducer
"Apply reducer (transducer) over cursor. Set completion? to true if final
transducing call needs to be done (stateful transducer)."
[^RecordCursor cursor completion? cont-fn reducer acc]
(.thenApply
(AsyncUtil/whileTrue
(reify Supplier
(get [_]
(-> cursor
.onNext
(.thenApply
(fn/make-fun
(fn [^RecordCursorResult result]
(when (ifn? cont-fn)
(-> result .getContinuation .toBytes cont-fn))
(let [next? (.hasNext result)
new-acc (when next? (swap! acc reducer (.get result)))]
(and (not (reduced? new-acc)) next?))))))))
(.getExecutor cursor))
(fn/make-fun (fn [_]
(unreduced
(cond-> @acc
completion?
reducer))))))

(defn apply-transduce
"A variant of `RecordCursor::reduce` that honors `reduced?` and supports
transducers.
Expand All @@ -23,31 +48,12 @@

When `cont-fn` is given, it will be called on the last seen continuation
byte array for every new element."
([^RecordCursor cursor xform f init cont-fn]
([cursor xform f init cont-fn]
(let [reducer (if (some? xform) (xform f) f)
acc (if (instance? clojure.lang.Atom init)
init
(atom (or init (f))))]
(.thenApply
(AsyncUtil/whileTrue
(reify Supplier
(get [_]
(-> cursor
.onNext
(.thenApply
(fn/make-fun
(fn [^RecordCursorResult result]
(when (ifn? cont-fn)
(-> result .getContinuation .toBytes cont-fn))
(let [next? (.hasNext result)
new-acc (when next? (swap! acc reducer (.get result)))]
(and (not (reduced? new-acc)) next?))))))))
(.getExecutor cursor))
(fn/make-fun (fn [_]
(unreduced
(cond-> @acc
(some? xform)
reducer)))))))
(apply-transduce-with-reducer cursor (some? xform) cont-fn reducer acc)))
([cursor f init cont-fn]
(apply-transduce cursor nil f init cont-fn))
([cursor f init]
Expand Down
48 changes: 44 additions & 4 deletions test/exoscale/vinyl/cursor_test.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
(ns exoscale.vinyl.cursor-test
(:require [clojure.test :refer [deftest are]]
[exoscale.vinyl.cursor :refer [apply-transforms]]
[exoscale.vinyl.store :as store])
(:import com.apple.foundationdb.record.RecordCursor))
(:require [clojure.test :refer [deftest are is]]
[exoscale.vinyl.cursor :refer [apply-transforms apply-transduce-with-reducer]]
[exoscale.vinyl.demostore :as ds :refer [*db*]]
[exoscale.vinyl.store :as store])
(:import [com.apple.foundationdb.record RecordCursor]
[com.apple.foundationdb FDBException]
[java.util Iterator]))

(defn from-list
"Transform a list of items to a `RecordCursor` instance"
[items]
(RecordCursor/fromList (seq items)))

(defn from-iterator
[iterator]
(RecordCursor/fromIterator iterator))

(defrecord FaultyIterator [items pos error-index]
Iterator
(forEachRemaining [_ _action])
(hasNext [_]
(< @pos (count items)))
(next [_]
(if (= @pos @error-index)
(do
(vreset! error-index -1)
(throw (FDBException. "Retryable error" 1007)))
(let [result (nth items @pos)]
(vswap! pos inc)
result)))
(remove [_]))

(defn make-faulty-iterator [items error-index]
(->FaultyIterator items (volatile! 0) (volatile! error-index)))

(defn reduce-plus
[x y]
(let [acc (+ x y)]
Expand All @@ -30,3 +55,18 @@
::store/reduce-init init}))
[0 1 2 3 4 5 6] + 0 28
[0 1 2 3 4 5 6] (completing reduce-plus) 0 15))

(deftest stateful-transducer-test
(is (= [[1 2 3] [4 5 6] [7 8 9] [10]]
(ds/with-build-fdb
(fn [] (let [cursor (from-iterator (make-faulty-iterator [1 2 3 4 5 6 7 8 9 10] 5))
transducer (partition-all 3)
reducer-fn (completing (fn [acc items] (conj acc items)))
reducer (transducer reducer-fn)
acc (atom [])]
@(store/run-async *db* (fn [_store] (apply-transduce-with-reducer
cursor
transducer
nil
reducer
acc)))))))))