Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3c5a843
SPARK-47547 BloomFilter fpp degradation: addressing the int32 truncation
ishnagy May 12, 2025
08cbfeb
SPARK-47547 BloomFilter fpp degradation: fixing test data repetition …
ishnagy May 13, 2025
e3cb08e
SPARK-47547 BloomFilter fpp degradation: scrambling the high 32bytes …
ishnagy May 13, 2025
c4e3f58
SPARK-47547 BloomFilter fpp degradation: random distribution fpp test
ishnagy May 13, 2025
1a0b66f
SPARK-47547 BloomFilter fpp degradation: javadoc for test methods, ch…
ishnagy May 19, 2025
d912b66
SPARK-47547 BloomFilter fpp degradation: make seed serialization back…
ishnagy May 19, 2025
f589e2c
SPARK-47547 BloomFilter fpp degradation: counting discarded odd items…
ishnagy May 19, 2025
f597c76
SPARK-47547 BloomFilter fpp degradation: refactoring FPP counting log…
ishnagy May 19, 2025
4ea633d
SPARK-47547 BloomFilter fpp degradation: checkstyle fix
ishnagy May 19, 2025
6696106
SPARK-47547 BloomFilter fpp degradation: fix test bug
ishnagy May 21, 2025
b75e187
SPARK-47547 BloomFilter fpp degradation: parallelization friendly tes…
ishnagy May 26, 2025
2d8a9f1
SPARK-47547 BloomFilter fpp degradation: parallelization friendly tes…
ishnagy May 26, 2025
4a30794
SPARK-47547 BloomFilter fpp degradation: parallelization friendly tes…
ishnagy May 26, 2025
d9d6980
SPARK-47547 BloomFilter fpp degradation: addressing concerns around d…
ishnagy Jun 16, 2025
39a46c9
SPARK-47547 BloomFilter fpp degradation: cut down test cases to decre…
ishnagy Jun 17, 2025
7f235e7
Merge branch 'master' into SPARK-47547_bloomfilter_fpp_degradation
ishnagy Jun 17, 2025
16be3a9
SPARK-47547 BloomFilter fpp degradation: revert creating a new SlowTe…
ishnagy Jun 17, 2025
e91b5ca
SPARK-47547 BloomFilter fpp degradation: disable progress logging by …
ishnagy Jun 17, 2025
897c1d4
SPARK-47547 BloomFilter fpp degradation: adjust tolerance and fail on…
ishnagy Jun 18, 2025
013bfe4
SPARK-47547 BloomFilter fpp degradation: make V1/V2 distinction in Bl…
ishnagy Jul 6, 2025
6d44c1e
SPARK-47547 BloomFilter fpp degradation: scrambling test input withou…
ishnagy Jul 6, 2025
925bf12
SPARK-47547 BloomFilter fpp degradation: parallelizing BloomFilter re…
ishnagy Jul 6, 2025
6f28882
SPARK-47547 BloomFilter fpp degradation: add seed to equals/hashCode
ishnagy Jul 7, 2025
ed6caac
SPARK-47547 BloomFilter fpp degradation: checkstyle fix
ishnagy Jul 7, 2025
7d4ef74
SPARK-47547 BloomFilter fpp degradation: remove dependency between lo…
ishnagy Jul 7, 2025
c52ead3
Merge branch 'master' into SPARK-47547_bloomfilter_fpp_degradation
ishnagy Jul 7, 2025
0ab8276
SPARK-47547 BloomFilter fpp degradation: running /dev/scalafmt
ishnagy Jul 7, 2025
d2477bf
SPARK-47547 BloomFilter fpp degradation: javadoc comment for the V2 enum
ishnagy Jul 7, 2025
413c4fe
SPARK-47547 BloomFilter fpp degradation: reindent with 2 spaces
ishnagy Jul 7, 2025
4599fcb
SPARK-47547 BloomFilter fpp degradation: (recover empty line in Bloom…
ishnagy Jul 7, 2025
1ee2e13
SPARK-47547 BloomFilter fpp degradation: JEP-361 style switches
ishnagy Jul 7, 2025
c501b2a
SPARK-47547 BloomFilter fpp degradation: removing Objects::equals
ishnagy Jul 7, 2025
1f5cfb6
SPARK-47547 BloomFilter fpp degradation: add missing seed comparison …
ishnagy Jul 7, 2025
f60d55f
SPARK-47547 BloomFilter fpp degradation: checkstyle
ishnagy Jul 8, 2025
0314963
SPARK-47547 BloomFilter fpp degradation: BloomFilterBase abstract par…
ishnagy Jul 11, 2025
f2df338
SPARK-47547 BloomFilter fpp degradation: pull up long and byte hashin…
ishnagy Jul 11, 2025
4aaff83
SPARK-47547 BloomFilter fpp degradation: checkstyle
ishnagy Jul 13, 2025
e214bd7
SPARK-47547 BloomFilter fpp degradation: removing unnecessary line wr…
ishnagy Jul 15, 2025
99f7343
SPARK-47547 BloomFilter fpp degradation: moving junit-pioneer version…
ishnagy Jul 15, 2025
58e3066
SPARK-47547 BloomFilter fpp degradation: (empty line juggling)
ishnagy Jul 15, 2025
c06cb38
SPARK-47547 BloomFilter fpp degradation: pull up common hash scatteri…
ishnagy Jul 15, 2025
b99ef3a
SPARK-47547 BloomFilter fpp degradation: (empty line juggling)
ishnagy Jul 15, 2025
ce3ad76
SPARK-47547 BloomFilter fpp degradation: remove redundant default cas…
ishnagy Jul 15, 2025
626e459
SPARK-47547 BloomFilter fpp degradation: properly capitalize InputStr…
ishnagy Jul 15, 2025
b0f5b45
SPARK-47547 BloomFilter fpp degradation: indenting method parameters …
ishnagy Jul 15, 2025
6849dbe
SPARK-47547 BloomFilter fpp degradation: removing junit-pioneer from …
ishnagy Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.util.sketch;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* A Bloom filter is a space-efficient probabilistic data structure that offers an approximate
Expand Down Expand Up @@ -51,7 +54,22 @@ public enum Version {
* <li>The words/longs (numWords * 64 bit)</li>
* </ul>
*/
V1(1);
V1(1),

/**
* {@code BloomFilter} binary format version 2.
* Fixes the int32 truncation issue with V1 indexes, but by changing the bit pattern,
* it will become incompatible with V1 serializations.
* All values written in big-endian order:
* <ul>
* <li>Version number, always 2 (32 bit)</li>
* <li>Number of hash functions (32 bit)</li>
* <li>Integer seed to initialize hash functions (32 bit) </li>
* <li>Total number of words of the underlying bit array (32 bit)</li>
* <li>The words/longs (numWords * 64 bit)</li>
* </ul>
*/
V2(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to add V2, we need to add a new comment block for V2 because the above comment is for V1 only.

* <li>Version number, always 1 (32 bit)</li>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments in d2477bf


private final int versionNumber;

Expand Down Expand Up @@ -175,14 +193,26 @@ public long cardinality() {
* the stream.
*/
public static BloomFilter readFrom(InputStream in) throws IOException {
return BloomFilterImpl.readFrom(in);
// peek into the InputStream so we can determine the version
BufferedInputStream bin = new BufferedInputStream(in);
bin.mark(4);
int version = ByteBuffer.wrap(bin.readNBytes(4)).getInt();
bin.reset();

return switch (version) {
case 1 -> BloomFilterImpl.readFrom(bin);
case 2 -> BloomFilterImplV2.readFrom(bin);
default -> throw new IllegalArgumentException("Unknown BloomFilter version: " + version);
};
}

/**
* Reads in a {@link BloomFilter} from a byte array.
*/
public static BloomFilter readFrom(byte[] bytes) throws IOException {
return BloomFilterImpl.readFrom(bytes);
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
return readFrom(bis);
}
}

/**
Expand Down Expand Up @@ -256,6 +286,19 @@ public static BloomFilter create(long expectedNumItems, double fpp) {
* pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter.
*/
public static BloomFilter create(long expectedNumItems, long numBits) {
return create(Version.V2, expectedNumItems, numBits, BloomFilterImplV2.DEFAULT_SEED);
}

public static BloomFilter create(long expectedNumItems, long numBits, int seed) {
return create(Version.V2, expectedNumItems, numBits, seed);
}

public static BloomFilter create(
Version version,
long expectedNumItems,
long numBits,
int seed
) {
if (expectedNumItems <= 0) {
throw new IllegalArgumentException("Expected insertions must be positive");
}
Expand All @@ -264,6 +307,11 @@ public static BloomFilter create(long expectedNumItems, long numBits) {
throw new IllegalArgumentException("Number of bits must be positive");
}

return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits);

return switch (version) {
case V1 -> new BloomFilterImpl(numHashFunctions, numBits);
case V2 -> new BloomFilterImplV2(numHashFunctions, numBits, seed);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.sketch;

import java.util.Objects;

abstract class BloomFilterBase extends BloomFilter {

public static final int DEFAULT_SEED = 0;

protected int seed;
protected int numHashFunctions;
protected BitArray bits;

protected BloomFilterBase(int numHashFunctions, long numBits) {
this(numHashFunctions, numBits, DEFAULT_SEED);
}

protected BloomFilterBase(int numHashFunctions, long numBits, int seed) {
this(new BitArray(numBits), numHashFunctions, seed);
}

protected BloomFilterBase(BitArray bits, int numHashFunctions, int seed) {
this.bits = bits;
this.numHashFunctions = numHashFunctions;
this.seed = seed;
}

protected BloomFilterBase() {}

@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}

if (!(other instanceof BloomFilterBase that)) {
return false;
}

return
this.getClass() == that.getClass()
&& this.numHashFunctions == that.numHashFunctions
&& this.seed == that.seed
// TODO: this.bits can be null temporarily, during deserialization,
// should we worry about this?
&& this.bits.equals(that.bits);
}

@Override
public int hashCode() {
return Objects.hash(numHashFunctions, seed, bits);
}

@Override
public double expectedFpp() {
return Math.pow((double) bits.cardinality() / bits.bitSize(), numHashFunctions);
}

@Override
public long bitSize() {
return bits.bitSize();
}

@Override
public boolean put(Object item) {
if (item instanceof String str) {
return putString(str);
} else if (item instanceof byte[] bytes) {
return putBinary(bytes);
} else {
return putLong(Utils.integralToLong(item));
}
}

protected HiLoHash hashLongToIntPair(long item, int seed) {
// Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
// hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
// Note that `CountMinSketch` use a different strategy, it hash the input long element with
// every i to produce n hash values.
// TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
int h1 = Murmur3_x86_32.hashLong(item, seed);
int h2 = Murmur3_x86_32.hashLong(item, h1);
return new HiLoHash(h1, h2);
}

protected HiLoHash hashBytesToIntPair(byte[] item, int seed) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, seed);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
return new HiLoHash(h1, h2);
}

protected abstract boolean scatterHashAndSetAllBits(HiLoHash inputHash);

protected abstract boolean scatterHashAndGetAllBits(HiLoHash inputHash);

@Override
public boolean putString(String item) {
return putBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public boolean putBinary(byte[] item) {
HiLoHash hiLoHash = hashBytesToIntPair(item, seed);
return scatterHashAndSetAllBits(hiLoHash);
}

@Override
public boolean mightContainString(String item) {
return mightContainBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public boolean mightContainBinary(byte[] item) {
HiLoHash hiLoHash = hashBytesToIntPair(item, seed);
return scatterHashAndGetAllBits(hiLoHash);
}

public boolean putLong(long item) {
HiLoHash hiLoHash = hashLongToIntPair(item, seed);
return scatterHashAndSetAllBits(hiLoHash);
}

@Override
public boolean mightContainLong(long item) {
HiLoHash hiLoHash = hashLongToIntPair(item, seed);
return scatterHashAndGetAllBits(hiLoHash);
}

@Override
public boolean mightContain(Object item) {
if (item instanceof String str) {
return mightContainString(str);
} else if (item instanceof byte[] bytes) {
return mightContainBinary(bytes);
} else {
return mightContainLong(Utils.integralToLong(item));
}
}

@Override
public boolean isCompatible(BloomFilter other) {
if (other == null) {
return false;
}

if (!(other instanceof BloomFilterBase that)) {
return false;
}

return
this.getClass() == that.getClass()
&& this.bitSize() == that.bitSize()
&& this.numHashFunctions == that.numHashFunctions
&& this.seed == that.seed;
}

@Override
public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterBase otherImplInstance = checkCompatibilityForMerge(other);

this.bits.putAll(otherImplInstance.bits);
return this;
}

@Override
public BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterBase otherImplInstance = checkCompatibilityForMerge(other);

this.bits.and(otherImplInstance.bits);
return this;
}

@Override
public long cardinality() {
return this.bits.cardinality();
}

protected abstract BloomFilterBase checkCompatibilityForMerge(BloomFilter other)
throws IncompatibleMergeException;

public record HiLoHash(int hi, int lo) {}

}
Loading