Skip to content

Commit f26854a

Browse files
committed
Not atomic?
1 parent 8a572ed commit f26854a

File tree

1 file changed

+10
-12
lines changed

1 file changed

+10
-12
lines changed

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RunLengthEncoder.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99

1010
import static io.confluent.csid.utils.StringUtils.msg;
1111
import static io.confluent.parallelconsumer.OffsetEncoding.*;
12-
//import static io.confluent.parallelconsumer.OffsetEncoding.RunLengthCompressed;
1312

1413
class RunLengthEncoder extends OffsetEncoder {
1514

1615
// todo do these need to be atomic?
17-
private final AtomicInteger currentRunLengthCount;
18-
private final AtomicBoolean previousRunLengthState;
16+
private int currentRunLengthCount = 0;
17+
private boolean previousRunLengthState = false;
1918

2019
private final List<Integer> runLengthEncodingIntegers;
2120

@@ -28,8 +27,6 @@ class RunLengthEncoder extends OffsetEncoder {
2827
public RunLengthEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder, Version newVersion) {
2928
super(offsetSimultaneousEncoder);
3029
// run length setup
31-
currentRunLengthCount = new AtomicInteger();
32-
previousRunLengthState = new AtomicBoolean(false);
3330
runLengthEncodingIntegers = new ArrayList<>();
3431
version = newVersion;
3532
}
@@ -62,7 +59,7 @@ public void encodeCompletedOffset(final int rangeIndex) {
6259

6360
@Override
6461
public byte[] serialise() throws EncodingNotSupportedException {
65-
runLengthEncodingIntegers.add(currentRunLengthCount.get()); // add tail
62+
runLengthEncodingIntegers.add(currentRunLengthCount); // add tail
6663

6764
int entryWidth = switch (version) {
6865
case v1 -> Short.BYTES;
@@ -81,7 +78,8 @@ public byte[] serialise() throws EncodingNotSupportedException {
8178
case v2 -> {
8279
runLengthEncodedByteBuffer.putInt(runlength);
8380
}
84-
};
81+
}
82+
;
8583
}
8684

8785
byte[] array = runLengthEncodedByteBuffer.array();
@@ -101,13 +99,13 @@ protected byte[] getEncodedBytes() {
10199

102100
private void encodeRunLength(final boolean currentIsComplete) {
103101
// run length
104-
boolean currentOffsetMatchesOurRunLengthState = previousRunLengthState.get() == currentIsComplete;
102+
boolean currentOffsetMatchesOurRunLengthState = previousRunLengthState == currentIsComplete;
105103
if (currentOffsetMatchesOurRunLengthState) {
106-
currentRunLengthCount.getAndIncrement();
104+
currentRunLengthCount++;
107105
} else {
108-
previousRunLengthState.set(currentIsComplete);
109-
runLengthEncodingIntegers.add(currentRunLengthCount.get());
110-
currentRunLengthCount.set(1); // reset to 1
106+
previousRunLengthState = currentIsComplete;
107+
runLengthEncodingIntegers.add(currentRunLengthCount);
108+
currentRunLengthCount = 1; // reset to 1
111109
}
112110
}
113111
}

0 commit comments

Comments
 (0)