Skip to content

Commit a6b94d0

Browse files
committed
KCBC-108 Support FIT KV range scan
Change-Id: I2129f3614dfb0e56ab464290835fa4b92fc65fd3 Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/183650 Tested-by: David Nault <[email protected]> Reviewed-by: Graham Pople <[email protected]>
1 parent da53970 commit a6b94d0

File tree

4 files changed

+355
-0
lines changed

4 files changed

+355
-0
lines changed
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2022 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.couchbase.client.performer.kotlin
18+
19+
import com.couchbase.client.core.msg.kv.MutationToken
20+
import com.couchbase.client.kotlin.codec.RawStringTranscoder
21+
import com.couchbase.client.kotlin.kv.Expiry
22+
import com.couchbase.client.kotlin.kv.GetResult
23+
import com.couchbase.client.kotlin.kv.KvScanConsistency
24+
import com.couchbase.client.kotlin.kv.MutationState
25+
import com.couchbase.client.kotlin.kv.ScanTerm
26+
import com.couchbase.client.kotlin.kv.ScanType
27+
import com.fasterxml.jackson.databind.node.ObjectNode
28+
import com.google.protobuf.ByteString
29+
import com.couchbase.client.protocol.run.Result as FitRunResult
30+
import com.couchbase.client.protocol.sdk.kv.rangescan.Range as FitRange
31+
import com.couchbase.client.protocol.sdk.kv.rangescan.Scan as FitScan
32+
import com.couchbase.client.protocol.sdk.kv.rangescan.ScanResult as FitScanResult
33+
import com.couchbase.client.protocol.sdk.kv.rangescan.ScanTermChoice as FitScanTermChoice
34+
import com.couchbase.client.protocol.sdk.kv.rangescan.ScanType as FitScanType
35+
import com.couchbase.client.protocol.shared.MutationState as FitMutationState
36+
import com.couchbase.client.protocol.shared.MutationToken as FitMutationToken
37+
import com.couchbase.client.protocol.streams.Error as FitStreamError
38+
import com.couchbase.client.protocol.streams.Signal as FitStreamSignal
39+
40+
fun FitScanType.toKotlin(): ScanType = when {
41+
hasRange() -> with(range) {
42+
when {
43+
hasFromTo() -> fromTo.toKotlin()
44+
hasDocIdPrefix() -> ScanType.prefix(docIdPrefix)
45+
else -> throw UnsupportedOperationException("Unsupported scan range: $this")
46+
}
47+
}
48+
49+
hasSampling() -> with(sampling) {
50+
if (hasSeed()) ScanType.sample(limit, seed)
51+
else ScanType.sample(limit)
52+
}
53+
54+
else -> throw UnsupportedOperationException("Unsupported scan type: $this")
55+
}
56+
57+
fun FitRange.toKotlin(): ScanType {
58+
val from = from.toKotlin()
59+
val to = to.toKotlin()
60+
return when {
61+
from != null && to != null -> ScanType.range(from = from, to = to)
62+
from != null -> ScanType.range(from = from)
63+
to != null -> ScanType.range(to = to)
64+
else -> ScanType.range()
65+
}
66+
}
67+
68+
fun FitScanTermChoice.toKotlin(): ScanTerm? = when {
69+
hasDefault() -> null
70+
hasMaximum() -> ScanTerm.Maximum
71+
hasMinimum() -> ScanTerm.Minimum
72+
hasTerm() -> with(term) {
73+
val exclusive = hasExclusive() && exclusive
74+
when {
75+
hasAsString() -> ScanTerm(asString, exclusive)
76+
hasAsBytes() -> ScanTerm(asBytes.toByteArray(), exclusive)
77+
else -> throw UnsupportedOperationException("Unsupported ScanTermChoice $this")
78+
}
79+
}
80+
81+
else -> throw UnsupportedOperationException("Unsupported ScanTermChoice $this")
82+
}
83+
84+
fun FitMutationState.toKotlin(): KvScanConsistency {
85+
val result = MutationState()
86+
tokensList.forEach { mt: FitMutationToken ->
87+
result.add(
88+
MutationToken(
89+
mt.partitionId.toShort(),
90+
mt.partitionUuid,
91+
mt.sequenceNumber,
92+
mt.bucketName,
93+
)
94+
)
95+
}
96+
return KvScanConsistency.consistentWith(result)
97+
}
98+
99+
fun processScanResult(request: FitScan, documentOrId: Any): FitRunResult = try {
100+
val builder = FitScanResult.newBuilder()
101+
.setStreamId(request.streamConfig.streamId)
102+
103+
when (documentOrId) {
104+
is String -> builder.id = documentOrId
105+
106+
is GetResult -> with(documentOrId) {
107+
builder.id = id
108+
builder.cas = cas
109+
110+
if (expiry is Expiry.Absolute) {
111+
builder.expiryTime = (expiry as Expiry.Absolute).instant.epochSecond
112+
}
113+
114+
if (request.hasContentAs()) {
115+
val bytes: ByteArray = when {
116+
request.contentAs.hasAsString() -> contentAs<String>(RawStringTranscoder).toByteArray()
117+
request.contentAs.hasAsByteArray() -> content.bytes
118+
request.contentAs.hasAsJson() -> contentAs<ObjectNode>().toString().toByteArray()
119+
else -> throw UnsupportedOperationException("Unknown contentAs: ${request.contentAs}")
120+
}
121+
122+
builder.content = ByteString.copyFrom(bytes)
123+
}
124+
}
125+
126+
else -> throw AssertionError("Expected String or GetResult, but got ${documentOrId::class.java}")
127+
}
128+
129+
FitRunResult.newBuilder()
130+
.setSdk(
131+
com.couchbase.client.protocol.sdk.Result.newBuilder()
132+
.setRangeScanResult(builder.build())
133+
)
134+
.build()
135+
136+
} catch (err: RuntimeException) {
137+
FitRunResult.newBuilder()
138+
.setStream(
139+
FitStreamSignal.newBuilder()
140+
.setError(
141+
FitStreamError.newBuilder()
142+
.setException(convertExceptionKt(err))
143+
.setStreamId(request.streamConfig.streamId)
144+
)
145+
)
146+
.build()
147+
}

kotlin-fit-performer/src/main/kotlin/com/couchbase/client/performer/kotlin/KotlinPerformer.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class KotlinPerformer : CorePerformer() {
4444
override fun customisePerformerCaps(response: PerformerCapsFetchResponse.Builder) {
4545
response.setPerformerUserAgent("kotlin")
4646
.addSdkImplementationCaps(Caps.SDK_PRESERVE_EXPIRY)
47+
.addSdkImplementationCaps(Caps.SDK_KV_RANGE_SCAN)
4748
}
4849

4950
override fun clusterConnectionCreate(

kotlin-fit-performer/src/main/kotlin/com/couchbase/client/performer/kotlin/KotlinSdkCommandExecutor.kt

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,49 @@
1717
package com.couchbase.client.performer.kotlin
1818

1919
import com.couchbase.client.core.error.CouchbaseException
20+
import com.couchbase.client.core.error.InvalidArgumentException
2021
import com.couchbase.client.kotlin.CommonOptions
2122
import com.couchbase.client.kotlin.codec.JacksonJsonSerializer
2223
import com.couchbase.client.kotlin.codec.JsonTranscoder
2324
import com.couchbase.client.kotlin.codec.RawBinaryTranscoder
2425
import com.couchbase.client.kotlin.codec.RawJsonTranscoder
2526
import com.couchbase.client.kotlin.codec.RawStringTranscoder
27+
import com.couchbase.client.kotlin.kv.DEFAULT_SCAN_BATCH_ITEM_LIMIT
28+
import com.couchbase.client.kotlin.kv.DEFAULT_SCAN_BATCH_SIZE_LIMIT
2629
import com.couchbase.client.kotlin.kv.Durability
2730
import com.couchbase.client.kotlin.kv.Expiry
2831
import com.couchbase.client.kotlin.kv.GetResult
32+
import com.couchbase.client.kotlin.kv.KvScanConsistency
2933
import com.couchbase.client.kotlin.kv.MutationResult
3034
import com.couchbase.client.kotlin.kv.PersistTo
3135
import com.couchbase.client.kotlin.kv.ReplicateTo
36+
import com.couchbase.client.kotlin.kv.ScanSort
37+
import com.couchbase.client.kotlin.util.StorageSize.Companion.bytes
3238
import com.couchbase.client.performer.core.commands.SdkCommandExecutor
3339
import com.couchbase.client.performer.core.perf.Counters
3440
import com.couchbase.client.performer.core.perf.PerRun
3541
import com.couchbase.client.performer.core.util.ErrorUtil
3642
import com.couchbase.client.performer.core.util.TimeUtil
3743
import com.couchbase.client.performer.kotlin.util.ClusterConnection
44+
import com.couchbase.client.protocol.sdk.kv.rangescan.ScanOptions
45+
import com.couchbase.client.protocol.sdk.kv.rangescan.ScanSort.KV_RANGE_SCAN_SORT_ASCENDING
46+
import com.couchbase.client.protocol.sdk.kv.rangescan.ScanSort.KV_RANGE_SCAN_SORT_NONE
3847
import com.couchbase.client.protocol.shared.CouchbaseExceptionEx
3948
import com.couchbase.client.protocol.shared.Exception
4049
import com.couchbase.client.protocol.shared.ExceptionOther
4150
import com.couchbase.client.protocol.shared.MutationToken
4251
import com.couchbase.client.protocol.shared.Transcoder
52+
import com.couchbase.client.protocol.streams.Created
53+
import com.couchbase.client.protocol.streams.Type.STREAM_KV_RANGE_SCAN
54+
import com.couchbase.stream.FluxStreamer
4355
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module
4456
import com.fasterxml.jackson.module.kotlin.KotlinModule
4557
import com.fasterxml.jackson.module.kotlin.jacksonTypeRef
4658
import com.fasterxml.jackson.module.kotlin.jsonMapper
4759
import com.google.protobuf.ByteString
60+
import kotlinx.coroutines.reactive.asPublisher
4861
import kotlinx.coroutines.runBlocking
62+
import reactor.core.publisher.Flux
4963
import java.time.Instant
5064
import kotlin.time.Duration.Companion.milliseconds
5165
import kotlin.time.Duration.Companion.seconds
@@ -58,6 +72,7 @@ import com.couchbase.client.protocol.shared.Durability as FitDurability
5872
import com.couchbase.client.protocol.shared.Expiry as FitExpiry
5973
import com.couchbase.client.protocol.shared.PersistTo as FitPersistTo
6074
import com.couchbase.client.protocol.shared.ReplicateTo as FitReplicateTo
75+
import com.couchbase.client.protocol.streams.Signal as FitSignal
6176

6277
/**
6378
* Performs each requested SDK operation
@@ -223,6 +238,73 @@ class KotlinSdkCommandExecutor(
223238
result.elapsedNanos = System.nanoTime() - start
224239
if (op.returnResult) populateResult(result, r)
225240
else setSuccess(result)
241+
} else if (op.hasRangeScan()) {
242+
val request = op.rangeScan
243+
val collection = connection.collection(request.collection)
244+
result.initiated = TimeUtil.getTimeNow()
245+
246+
val options = request.options
247+
val idsOnly = options.hasIdsOnly() && options.idsOnly
248+
249+
fun ScanOptions.ktCommon() = createCommon(hasTimeoutMsecs(), timeoutMsecs)
250+
251+
fun ScanOptions.ktSort() =
252+
if (!hasSort()) ScanSort.NONE
253+
else when (sort) {
254+
KV_RANGE_SCAN_SORT_NONE -> ScanSort.NONE
255+
KV_RANGE_SCAN_SORT_ASCENDING -> ScanSort.ASCENDING
256+
else -> throw UnsupportedOperationException("Unsupported scan sort: $sort")
257+
}
258+
259+
fun ScanOptions.ktConsistency() =
260+
if (!hasConsistentWith()) KvScanConsistency.notBounded()
261+
else options.consistentWith.toKotlin()
262+
263+
fun ScanOptions.ktBatchItemLimit() = if (hasBatchItemLimit()) batchItemLimit else DEFAULT_SCAN_BATCH_ITEM_LIMIT
264+
265+
fun ScanOptions.ktBatchSizeLimit() = if (hasBatchByteLimit()) batchByteLimit.bytes else DEFAULT_SCAN_BATCH_SIZE_LIMIT
266+
267+
val start = System.nanoTime()
268+
val flow =
269+
if (idsOnly) collection.scanIds(
270+
type = request.scanType.toKotlin(),
271+
common = options.ktCommon(),
272+
sort = options.ktSort(),
273+
consistency = options.ktConsistency(),
274+
batchItemLimit = options.ktBatchItemLimit(),
275+
batchSizeLimit = options.ktBatchSizeLimit(),
276+
)
277+
else collection.scanDocuments(
278+
type = request.scanType.toKotlin(),
279+
common = options.ktCommon(),
280+
sort = options.ktSort(),
281+
consistency = options.ktConsistency(),
282+
batchItemLimit = options.ktBatchItemLimit(),
283+
batchSizeLimit = options.ktBatchSizeLimit(),
284+
)
285+
result.elapsedNanos = System.nanoTime() - start
286+
287+
val results = Flux.from(flow.asPublisher())
288+
289+
val streamer: FluxStreamer<Any> = // "Any" is GetResult or String (document ID)
290+
FluxStreamer(
291+
results,
292+
perRun,
293+
request.streamConfig.streamId,
294+
request.streamConfig,
295+
) { documentOrId ->
296+
processScanResult(request, documentOrId)
297+
}
298+
299+
perRun.streamerOwner().addAndStart(streamer)
300+
result.setStream(
301+
FitSignal.newBuilder()
302+
.setCreated(
303+
Created.newBuilder()
304+
.setType(STREAM_KV_RANGE_SCAN)
305+
.setStreamId(streamer.streamId())
306+
)
307+
)
226308
} else {
227309
throw UnsupportedOperationException(IllegalArgumentException("Unknown operation"))
228310
}
@@ -314,6 +396,14 @@ class KotlinSdkCommandExecutor(
314396
}
315397

316398
fun convertExceptionKt(raw: Throwable): Exception {
399+
if (raw is IllegalArgumentException) {
400+
// When there's no meaningful error context, the Kotlin SDK sometimes throws
401+
// a standard IllegalArgumentException. "Promote" these to the
402+
// InvalidArgumentException expected by the FIT driver.
403+
// TODO: Reconsider throwing standard IllegalArgumentException
404+
return convertExceptionKt(InvalidArgumentException(raw.message, raw.cause, null))
405+
}
406+
317407
if (raw is CouchbaseException || raw is UnsupportedOperationException) {
318408
val out = CouchbaseExceptionEx.newBuilder()
319409
.setName(raw.javaClass.simpleName)

0 commit comments

Comments
 (0)