-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37294][state] Support state migration between disabling and enabling ttl in HeapKeyedStateBackend #26651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
TypeSerializer<SV> newSerializer, | ||
TtlTimeProvider ttlTimeProvider) { | ||
|
||
Preconditions.checkArgument(priorSerializer instanceof TtlAwareSerializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in the variable names we use prior and new and previous and current. I suggest being consistent;
previousSerializer previousAwareSerializer
currentSerializer currentTtlAwareSerializer
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
Show resolved
Hide resolved
+ previousStateSerializer | ||
+ ")."); | ||
} else if (stateCompatibility.isCompatibleAfterMigration()) { | ||
migrateStateValues(stateDesc, previousStateSerializer, newStateSerializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the state is not compatible, I suggest we should at least log for that case - or should there be an error in that case?
@@ -299,6 +292,56 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable( | |||
return stateTable; | |||
} | |||
|
|||
/** Only triggering state migration when the state TTL is turned on or off is supported. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the text is supported
- I am not sure what this means.
TypeSerializer<List<V>> newSerializer, | ||
TtlTimeProvider ttlTimeProvider) { | ||
|
||
Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like there is suplication of code, can we re arrange the code so the following is not repeated for the different cases
priorSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer);
Preconditions.checkArgument(
newSerializer instanceof TtlAwareSerializer.TtlAwareListSerializer);
TtlAwareSerializer<V, ?> priorTtlAwareElementSerializer =
((TtlAwareSerializer.TtlAwareListSerializer<V>) priorSerializer)
.getElementSerializer();
TtlAwareSerializer<V, ?> newTtlAwareElementSerializer =
((TtlAwareSerializer.TtlAwareListSerializer<V>) newSerializer)
.getElementSerializer();
Cool, I've closed another PR. |
@Zakelly Kindly remind for review. |
"State should be an AbstractRocksDBState but is " + state); | ||
} | ||
AbstractHeapState<K, N, V> heapState = (AbstractHeapState<K, N, V>) state; | ||
TtlAwareSerializer<V, ?> previousTtlAwareSerializer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable previousTtlAwareSerializer
is not used. You can safely delete this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, overall looks good
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
Show resolved
Hide resolved
|
||
while (iterator.hasNext()) { | ||
final StateEntry<K, N, V> entry = iterator.next(); | ||
stateTable.put( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It there any concurrency issue when performing stateTable.put()
over the iteration?
If so, I'd suggest a new inner transform method to in-place update all the value of inner entries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't found any concurrency issues in unit tests or my testing jobs yet. Additionally, I debug the implementation of StateTable.put and implementation of CopyOnWriteStateMap.put, it locates the existing StateMapEntry and modifies its state data without creating a new StateMapEntry or updating modCount.
Although I haven't identified concurrency issues so far, to avoid potential problems, I still changed to using the transform method to update entries. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hejufang Thanks for the update!
I checked your change but I was thinking could we remove the iteration? I mean there is no need we iterate keys out then perform transform one by one. We may introduce an single internal method that could do iterate all kvs and transform, just like applyToAllKeys
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Zakelly Thanks for your suggestion. I've added a transformAll method to traverse all data and perform transformations. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hejufang I'm afraid the current implementation does not avoid the potential concurrency issue and will achieve low performance.
To be more specific, the CopyOnWriteStateMap$StateEntryIterator
will check the expectedModCount
in each next()
call, but the modCount
will update when putEntry()
during transform()
. So will a ConcurrentModificationException
thrown? Correct me if I'm wrong.
In current transformAll
implementation, we do entry iteration and then invoke transform
, the getMapForKeyGroup
will calculate the key group again and get the state map, which is redundant since the entry is iterated from the corresponding map. I'm worried about the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Zakelly Thank you for your reply. I have reviewed the implementations of StateTable and StateMap and conducted some tests. Here is my answer to your questions.
Will iterating over StateMap using an iterator and performing transform throw a ConcurrentModificationException?
No, it won't. This is because the putEntry method locates an already existing StateMapEntry instead of creating a new one. It returns before updating modCount (see CopyOnWriteStateMap#putEntry at line 399), so when transforming existing data, modCount does not change, and therefore no ConcurrentModificationException is thrown.
Are there other concurrency issues?
Yes.
I verified that the following code will iterate over duplicate data:
for (int i = 0; i < keyGroupedStateMaps.length; i++) {
Iterator<StateEntry<K, N, S>> iterator = keyGroupedStateMaps[i].iterator();
while (iterator.hasNext()) {
StateEntry<K, N, S> entry = iterator.next();
keyGroupedStateMaps[i].transform(
entry.getKey(), entry.getNamespace(), value, transformation);
}
}
This happens because when iterating directly over CopyOnWriteStateMap.iterator(), calling transform internally invokes putEntry, which triggers computeHashForOperationAndDoIncrementalRehash. This causes data to be moved between two tables (primaryTable and incrementalRehashTable). If the iterator is traversing primaryTable when transform triggers rehashing, some data will move to incrementalRehashTable. The iterator may later visit these entries in incrementalRehashTable, causing duplicate processing.
This issue is easy to reproduce by adding more data to StateMap in a unit test. For instance, StateBackendMigrationTestBase.testKeyedValueStateUpgrade, by adding the following code to insert more entries:
for (int i = 0; i < 10000; i++) {
backend.setCurrentKey(i);
valueState.update(new TestType("test" + i, i * 1000));
}
Running HashMapStateBackendMigrationTest.testStateMigrationAfterChangingTTLFromDisablingToEnabling then results in errors.
Why does using StateTable.iterator() not iterate duplicate entries?
Because StateTable.iterator() returns a Spliterator. Before traversal, the Spliterator reads the original iterator (StateMap.iterator()) and preloads the data into a cache (java.util.stream.StreamSpliterators.AbstractWrappingSpliterator.buffer). This effectively creates a snapshot of all entries. Therefore, even if subsequent transforms cause data to move, the iterator will not visit duplicate entries.
Considering that recalculating the key group involves some performance overhead, I optimized the code referencing StateTable.iterator() logic: First, read all data before executing any transform, then update the data:
for (StateMap<K, N, S> stateMap : keyGroupedStateMaps) {
List<StateEntry<K, N, S>> entries =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0),
false)
.collect(Collectors.toList());
for (StateEntry<K, N, S> entry : entries) {
stateMap.transform(entry.getKey(), entry.getNamespace(), value, transformation);
}
}
Is there any other possible approach?
I think another possibility is to add a transformAll interface to StateMap, which directly iterates over both primaryTable and incrementalRehashTable inside CopyOnWriteStateMap. However, I'm unsure if this introduces other potential issues, and it would also require adding implementations to CopyOnWriteSkipListStateMap, which could be complex. So I prefer the approach above which I have verified in my test jobs. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the investigation and update! Current implementation LGTM as it avoids the recalculation of key group as well as the concurrency issue.
8a983a7
to
819a153
Compare
…abling ttl in HeapKeyedStateBackend Co-authored-by: hejufang <[email protected]> Co-authored-by: Xiangyu Feng <[email protected]>
@Zakelly kindly remind |
Thanks for the update! I'll take a look this week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the investigation and update! Current implementation LGTM as it avoids the recalculation of key group as well as the concurrency issue.
[FLINK-37294][state] Support state migration between disabling and enabling ttl in HeapKeyedStateBackend
What is the purpose of the change
Support state migration between disabling and enabling ttl in HeapKeyedStateBackend
Brief change log
Add
migrateTtlValue
inAbstractHeapState
. When the state TTL switch changes, trigger the migration of state data.Verifying this change
This change is already covered by existing tests, such as
StateBackendMigrationTestBase#testStateMigrationAfterChangingTTLFromEnablingToDisabling
andStateBackendMigrationTestBase#testStateMigrationAfterChangingTTLFromDisablingToEnabling
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation