Skip to content

Commit 0dfe919

Browse files
committed
Implement prefix sort for strings (albeit inefficiently).
1 parent 66a813e commit 0dfe919

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,56 @@
1717

1818
package org.apache.spark.util.collection.unsafe.sort;
1919

20+
import com.google.common.base.Charsets;
21+
import com.google.common.primitives.Longs;
2022
import org.apache.spark.annotation.Private;
23+
import org.apache.spark.unsafe.types.UTF8String;
2124

2225
@Private
2326
public class PrefixComparators {
2427
private PrefixComparators() {}
2528

29+
public static final StringPrefixComparator STRING = new StringPrefixComparator();
2630
public static final IntPrefixComparator INTEGER = new IntPrefixComparator();
2731
public static final LongPrefixComparator LONG = new LongPrefixComparator();
2832
public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
2933
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
3034

35+
public static final class StringPrefixComparator extends PrefixComparator {
36+
@Override
37+
public int compare(long aPrefix, long bPrefix) {
38+
// TODO: this can certainly be done more efficiently
39+
byte[] a = Longs.toByteArray(aPrefix);
40+
byte[] b = Longs.toByteArray(bPrefix);
41+
for (int i = 0; i < 8; i++) {
42+
if (a[i] == b[i]) continue;
43+
if (a[i] > b[i]) return -1;
44+
else if (a[i] < b[i]) return 1;
45+
}
46+
return 0;
47+
}
48+
49+
public long computePrefix(UTF8String value) {
50+
// TODO: this can certainly be done more efficiently
51+
return value == null ? 0L : computePrefix(value.toString());
52+
}
53+
54+
public long computePrefix(String value) {
55+
// TODO: this can certainly be done more efficiently
56+
if (value == null || value.length() == 0) {
57+
return 0L;
58+
} else {
59+
String first4Chars = value.substring(0, Math.min(3, value.length() - 1));
60+
byte[] utf16Bytes = first4Chars.getBytes(Charsets.UTF_16);
61+
byte[] padded = new byte[8];
62+
if (utf16Bytes.length < 8) {
63+
System.arraycopy(utf16Bytes, 0, padded, 0, utf16Bytes.length);
64+
}
65+
return Longs.fromByteArray(padded);
66+
}
67+
}
68+
}
69+
3170
public static final class IntPrefixComparator extends PrefixComparator {
3271
@Override
3372
public int compare(long aPrefix, long bPrefix) {

sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution
2020

2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.expressions.SortOrder
23-
import org.apache.spark.sql.types.{DoubleType, FloatType, LongType, IntegerType}
23+
import org.apache.spark.sql.types._
24+
import org.apache.spark.unsafe.types.UTF8String
2425
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator}
2526

2627

@@ -36,6 +37,7 @@ object SortPrefixUtils {
3637

3738
def getPrefixComparator(sortOrder: SortOrder): PrefixComparator = {
3839
sortOrder.dataType match {
40+
case StringType => PrefixComparators.STRING
3941
case IntegerType => PrefixComparators.INTEGER
4042
case LongType => PrefixComparators.LONG
4143
case FloatType => PrefixComparators.FLOAT
@@ -46,6 +48,8 @@ object SortPrefixUtils {
4648

4749
def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = {
4850
sortOrder.dataType match {
51+
case StringType => (row: InternalRow) =>
52+
PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String])
4953
case IntegerType => (row: InternalRow) =>
5054
PrefixComparators.INTEGER.computePrefix(sortOrder.child.eval(row).asInstanceOf[Int])
5155
case LongType => (row: InternalRow) => sortOrder.child.eval(row).asInstanceOf[Long]

0 commit comments

Comments
 (0)