-
Notifications
You must be signed in to change notification settings - Fork 9.1k
Unify rollout reconstruction with resume/fork TurnContext hydration #12612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f1ec573
e519dec
6a4dfc4
098717f
92c85b2
60456cb
d884e02
252106a
b2fb999
69a171c
53f012b
f746620
f5005ae
0e091a8
922aaf8
b0e59da
904093e
25d5ca8
fc1e245
b6270c2
4b3dee6
51dacc5
dd6a25f
912fefe
b9da1e7
687e137
6113ae8
491a532
aa404bd
12744e7
1a0b2f3
bee596b
931fcb2
6d03380
01141fd
5900705
e94181f
c52dcf0
54b1e27
1d32c1e
edcb754
3cd2b57
87362f2
e11a7f2
9710a98
cc25272
023ceda
7fa5958
1f67e04
41fe131
a128d20
1e7fc32
addaed8
e9828f6
be8c465
9b62fe1
aa7b019
ff0efdb
1159a52
c6d346a
5f8e9dd
4aa32da
5d6ca57
25e8ea4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,294 @@ | ||
| use super::*; | ||
|
|
||
| // Return value of `Session::reconstruct_history_from_rollout`, bundling the rebuilt history with | ||
| // the resume/fork hydration metadata derived from the same replay. | ||
| #[derive(Debug)] | ||
| pub(super) struct RolloutReconstruction { | ||
| pub(super) history: Vec<ResponseItem>, | ||
| pub(super) previous_model: Option<String>, | ||
| pub(super) reference_context_item: Option<TurnContextItem>, | ||
| } | ||
|
|
||
| #[derive(Debug, Default)] | ||
| enum TurnReferenceContextItem { | ||
| /// No `TurnContextItem` has been seen for this replay span yet. | ||
| /// | ||
| /// This differs from `Cleared`: `NeverSet` means there is no evidence this turn ever | ||
| /// established a baseline, while `Cleared` means a baseline existed and a later compaction | ||
| /// invalidated it. Only the latter must emit an explicit clearing segment for resume/fork | ||
| /// hydration. | ||
| #[default] | ||
| NeverSet, | ||
| /// A previously established baseline was invalidated by later compaction. | ||
| Cleared, | ||
| /// The latest baseline established by this replay span. | ||
| Latest(Box<TurnContextItem>), | ||
| } | ||
|
|
||
| #[derive(Debug, Default)] | ||
| struct ActiveReplaySegment<'a> { | ||
| turn_id: Option<String>, | ||
| counts_as_user_turn: bool, | ||
| previous_model: Option<String>, | ||
| reference_context_item: TurnReferenceContextItem, | ||
| base_replacement_history: Option<&'a [ResponseItem]>, | ||
| } | ||
|
|
||
| fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&str>) -> bool { | ||
| active_turn_id | ||
| .is_none_or(|turn_id| item_turn_id.is_none_or(|item_turn_id| item_turn_id == turn_id)) | ||
| } | ||
|
|
||
| fn finalize_active_segment<'a>( | ||
| active_segment: ActiveReplaySegment<'a>, | ||
| base_replacement_history: &mut Option<&'a [ResponseItem]>, | ||
| previous_model: &mut Option<String>, | ||
| reference_context_item: &mut TurnReferenceContextItem, | ||
| pending_rollback_turns: &mut usize, | ||
| ) { | ||
| // Thread rollback drops the newest surviving real user-message boundaries. In replay, that | ||
| // means skipping the next finalized segments that contain a non-contextual | ||
| // `EventMsg::UserMessage`. | ||
| if *pending_rollback_turns > 0 { | ||
| if active_segment.counts_as_user_turn { | ||
| *pending_rollback_turns -= 1; | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // A surviving replacement-history checkpoint is a complete history base. Once we | ||
| // know the newest surviving one, older rollout items do not affect rebuilt history. | ||
| if base_replacement_history.is_none() | ||
| && let Some(segment_base_replacement_history) = active_segment.base_replacement_history | ||
| { | ||
| *base_replacement_history = Some(segment_base_replacement_history); | ||
| } | ||
|
|
||
| // `previous_model` comes from the newest surviving user turn that established one. | ||
| if previous_model.is_none() && active_segment.counts_as_user_turn { | ||
| *previous_model = active_segment.previous_model; | ||
| } | ||
|
|
||
| // `reference_context_item` comes from the newest surviving user turn baseline, or | ||
| // from a surviving compaction that explicitly cleared that baseline. | ||
| if matches!(reference_context_item, TurnReferenceContextItem::NeverSet) | ||
| && (active_segment.counts_as_user_turn | ||
| || matches!( | ||
| active_segment.reference_context_item, | ||
| TurnReferenceContextItem::Cleared | ||
| )) | ||
| { | ||
| *reference_context_item = active_segment.reference_context_item; | ||
| } | ||
| } | ||
|
|
||
| impl Session { | ||
| pub(super) async fn reconstruct_history_from_rollout( | ||
| &self, | ||
| turn_context: &TurnContext, | ||
| rollout_items: &[RolloutItem], | ||
| ) -> RolloutReconstruction { | ||
| // Replay metadata should already match the shape of the future lazy reverse loader, even | ||
| // while history materialization still uses an eager bridge. Scan newest-to-oldest, | ||
| // stopping once a surviving replacement-history checkpoint and the required resume metadata | ||
| // are both known; then replay only the buffered surviving tail forward to preserve exact | ||
| // history semantics. | ||
| let mut base_replacement_history: Option<&[ResponseItem]> = None; | ||
| let mut previous_model = None; | ||
| let mut reference_context_item = TurnReferenceContextItem::NeverSet; | ||
| // Rollback is "drop the newest N user turns". While scanning in reverse, that becomes | ||
| // "skip the next N user-turn segments we finalize". | ||
| let mut pending_rollback_turns = 0usize; | ||
| // Borrowed suffix of rollout items newer than the newest surviving replacement-history | ||
| // checkpoint. If no such checkpoint exists, this remains the full rollout. | ||
| let mut rollout_suffix = rollout_items; | ||
| // Reverse replay accumulates rollout items into the newest in-progress turn segment until | ||
| // we hit its matching `TurnStarted`, at which point the segment can be finalized. | ||
| let mut active_segment: Option<ActiveReplaySegment<'_>> = None; | ||
|
|
||
| for (index, item) in rollout_items.iter().enumerate().rev() { | ||
| match item { | ||
| RolloutItem::Compacted(compacted) => { | ||
| let active_segment = | ||
| active_segment.get_or_insert_with(ActiveReplaySegment::default); | ||
| // Looking backward, compaction clears any older baseline unless a newer | ||
| // `TurnContextItem` in this same segment has already re-established it. | ||
| if matches!( | ||
| active_segment.reference_context_item, | ||
| TurnReferenceContextItem::NeverSet | ||
| ) { | ||
| active_segment.reference_context_item = TurnReferenceContextItem::Cleared; | ||
| } | ||
| if active_segment.base_replacement_history.is_none() | ||
| && let Some(replacement_history) = &compacted.replacement_history | ||
| { | ||
| active_segment.base_replacement_history = Some(replacement_history); | ||
| rollout_suffix = &rollout_items[index + 1..]; | ||
charley-oai marked this conversation as resolved.
Show resolved
Hide resolved
charley-oai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { | ||
| pending_rollback_turns = pending_rollback_turns | ||
| .saturating_add(usize::try_from(rollback.num_turns).unwrap_or(usize::MAX)); | ||
| } | ||
| RolloutItem::EventMsg(EventMsg::TurnComplete(event)) => { | ||
| let active_segment = | ||
| active_segment.get_or_insert_with(ActiveReplaySegment::default); | ||
| // Reverse replay often sees `TurnComplete` before any turn-scoped metadata. | ||
| // Capture the turn id early so later `TurnContext` / abort items can match it. | ||
| if active_segment.turn_id.is_none() { | ||
| active_segment.turn_id = Some(event.turn_id.clone()); | ||
| } | ||
| } | ||
| RolloutItem::EventMsg(EventMsg::TurnAborted(event)) => { | ||
| if let Some(active_segment) = active_segment.as_mut() { | ||
| if active_segment.turn_id.is_none() | ||
| && let Some(turn_id) = &event.turn_id | ||
| { | ||
| active_segment.turn_id = Some(turn_id.clone()); | ||
| } | ||
| } else if let Some(turn_id) = &event.turn_id { | ||
| active_segment = Some(ActiveReplaySegment { | ||
| turn_id: Some(turn_id.clone()), | ||
| ..Default::default() | ||
| }); | ||
| } | ||
| } | ||
| RolloutItem::EventMsg(EventMsg::UserMessage(_)) => { | ||
| let active_segment = | ||
| active_segment.get_or_insert_with(ActiveReplaySegment::default); | ||
| active_segment.counts_as_user_turn = true; | ||
| } | ||
| RolloutItem::TurnContext(ctx) => { | ||
| let active_segment = | ||
| active_segment.get_or_insert_with(ActiveReplaySegment::default); | ||
| // `TurnContextItem` can attach metadata to an existing segment, but only a | ||
| // real `UserMessage` event should make the segment count as a user turn. | ||
| if active_segment.turn_id.is_none() { | ||
| active_segment.turn_id = ctx.turn_id.clone(); | ||
| } | ||
| if turn_ids_are_compatible( | ||
| active_segment.turn_id.as_deref(), | ||
| ctx.turn_id.as_deref(), | ||
| ) { | ||
| active_segment.previous_model = Some(ctx.model.clone()); | ||
| if matches!( | ||
| active_segment.reference_context_item, | ||
| TurnReferenceContextItem::NeverSet | ||
| ) { | ||
| active_segment.reference_context_item = | ||
| TurnReferenceContextItem::Latest(Box::new(ctx.clone())); | ||
| } | ||
| } | ||
| } | ||
| RolloutItem::EventMsg(EventMsg::TurnStarted(event)) => { | ||
| // `TurnStarted` is the oldest boundary of the active reverse segment. | ||
| if active_segment.as_ref().is_some_and(|active_segment| { | ||
| turn_ids_are_compatible( | ||
| active_segment.turn_id.as_deref(), | ||
| Some(event.turn_id.as_str()), | ||
| ) | ||
| }) && let Some(active_segment) = active_segment.take() | ||
| { | ||
| finalize_active_segment( | ||
| active_segment, | ||
| &mut base_replacement_history, | ||
| &mut previous_model, | ||
| &mut reference_context_item, | ||
| &mut pending_rollback_turns, | ||
| ); | ||
| } | ||
| } | ||
| RolloutItem::ResponseItem(_) | ||
| | RolloutItem::EventMsg(_) | ||
| | RolloutItem::SessionMeta(_) => {} | ||
| } | ||
|
|
||
| if base_replacement_history.is_some() | ||
| && previous_model.is_some() | ||
| && !matches!(reference_context_item, TurnReferenceContextItem::NeverSet) | ||
| { | ||
| // At this point we have both eager resume metadata values and the replacement- | ||
| // history base for the surviving tail, so older rollout items cannot affect this | ||
| // result. | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if let Some(active_segment) = active_segment.take() { | ||
| finalize_active_segment( | ||
| active_segment, | ||
| &mut base_replacement_history, | ||
| &mut previous_model, | ||
| &mut reference_context_item, | ||
| &mut pending_rollback_turns, | ||
| ); | ||
| } | ||
|
|
||
| let mut history = ContextManager::new(); | ||
| let mut saw_legacy_compaction_without_replacement_history = false; | ||
| if let Some(base_replacement_history) = base_replacement_history { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's needed to keep track of whether we can stop consuming backwards, and then at the beginning of the forward pass we use it to start off the |
||
| history.replace(base_replacement_history.to_vec()); | ||
| } | ||
| // Materialize exact history semantics from the replay-derived suffix. The eventual lazy | ||
| // design should keep this same replay shape, but drive it from a resumable reverse source | ||
| // instead of an eagerly loaded `&[RolloutItem]`. | ||
| for item in rollout_suffix { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Considering we have a separate loop anyway (ideally we won't), can we drop rollout_suffix and add
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, is it because the history is append to the end only?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's hard to do the second as part of the first because of None replacement_history compacts which want the whole history so far (when going left to right) |
||
| match item { | ||
| RolloutItem::ResponseItem(response_item) => { | ||
| history.record_items( | ||
| std::iter::once(response_item), | ||
| turn_context.truncation_policy, | ||
| ); | ||
| } | ||
| RolloutItem::Compacted(compacted) => { | ||
| if let Some(replacement_history) = &compacted.replacement_history { | ||
| // This should actually never happen, because the reverse loop above (to build rollout_suffix) | ||
| // should stop before any compaction that has Some replacement_history | ||
| history.replace(replacement_history.clone()); | ||
| } else { | ||
charley-oai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| saw_legacy_compaction_without_replacement_history = true; | ||
| // Legacy rollouts without `replacement_history` should rebuild the | ||
| // historical TurnContext at the correct insertion point from persisted | ||
| // `TurnContextItem`s. These are rare enough that we currently just clear | ||
| // `reference_context_item`, reinject canonical context at the end of the | ||
| // resumed conversation, and accept the temporary out-of-distribution | ||
| // prompt shape. | ||
| // TODO(ccunningham): if we drop support for None replacement_history compaction items, | ||
| // we can get rid of this second loop entirely and just build `history` directly in the first loop. | ||
| let user_messages = collect_user_messages(history.raw_items()); | ||
| let rebuilt = compact::build_compacted_history( | ||
| Vec::new(), | ||
| &user_messages, | ||
| &compacted.message, | ||
| ); | ||
| history.replace(rebuilt); | ||
| } | ||
| } | ||
| RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => { | ||
| history.drop_last_n_user_turns(rollback.num_turns); | ||
| } | ||
| RolloutItem::EventMsg(_) | ||
| | RolloutItem::TurnContext(_) | ||
| | RolloutItem::SessionMeta(_) => {} | ||
| } | ||
| } | ||
|
|
||
| let reference_context_item = match reference_context_item { | ||
| TurnReferenceContextItem::NeverSet | TurnReferenceContextItem::Cleared => None, | ||
| TurnReferenceContextItem::Latest(turn_reference_context_item) => { | ||
| Some(*turn_reference_context_item) | ||
| } | ||
| }; | ||
| let reference_context_item = if saw_legacy_compaction_without_replacement_history { | ||
| None | ||
| } else { | ||
| reference_context_item | ||
| }; | ||
|
|
||
| RolloutReconstruction { | ||
| history: history.raw_items().to_vec(), | ||
| previous_model, | ||
| reference_context_item, | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the drop_last_n_user_turns is base on user messages, this logic is based on turns. Do they align?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this logic is based on user messages too,
counts_as_user_turnis carefully designed to align withdrop_last_n_user_turnslogic (and I added a bunch of tests to ensure they align)