Skip to content

Commit 695957a

Browse files
authored
Unify rollout reconstruction with resume/fork TurnContext hydration (#12612)
## Summary This PR unifies rollout history reconstruction and resume/fork metadata hydration under a single `Session::reconstruct_history_from_rollout` implementation. The key change from main is that replay metadata now comes from the same reconstruction pass that rebuilds model-visible history, instead of doing a second bespoke rollout scan to recover `previous_model` / `reference_context_item`. ## What Changed ### Unified reconstruction output `reconstruct_history_from_rollout` now returns a single `RolloutReconstruction` bundle containing: - rebuilt `history` - `previous_model` - `reference_context_item` Resume and fork both consume that shared output directly. ### Reverse replay core The reconstruction logic moved into `codex-rs/core/src/codex/rollout_reconstruction.rs` and now scans rollout items newest-to-oldest. That reverse pass: - derives `previous_model` - derives whether `reference_context_item` is preserved or cleared - stops early once it has both resume metadata and a surviving `replacement_history` checkpoint History materialization is still bridged eagerly for now by replaying only the surviving suffix forward, which keeps the history result stable while moving the control flow toward the future lazy reverse loader design. ### Removed bespoke context lookup This deletes `last_rollout_regular_turn_context_lookup` and its separate compaction-aware scan. The previous model / baseline metadata is now computed from the same replay state that rebuilds history, so resume/fork cannot drift from the reconstructed transcript view. ### `TurnContextItem` persistence contract `TurnContextItem` is now treated as the replay source of truth for durable model-visible baselines. This PR keeps the following contract explicit: - persist `TurnContextItem` for the first real user turn so resume can recover `previous_model` - persist it for later turns that emit model-visible context updates - if mid-turn compaction reinjects full initial context into replacement history, persist a fresh `TurnContextItem` after `Compacted` so resume/fork can re-establish the baseline from the rewritten history - do not treat manual compaction or pre-sampling compaction as creating a new durable baseline on their own ## Behavior Preserved - rollback replay stays aligned with `drop_last_n_user_turns` - rollback skips only user turns - incomplete active user turns are dropped before older finalized turns when rollback applies - unmatched aborts do not consume the current active turn - missing abort IDs still conservatively clear stale compaction state - compaction clears `reference_context_item` until a later `TurnContextItem` re-establishes it - `previous_model` still comes from the newest surviving user turn that established one ## Tests Targeted validation run for the current branch shape: - `cd codex-rs && cargo test -p codex-core --lib codex::rollout_reconstruction_tests -- --nocapture` - `cd codex-rs && just fmt` The branch also extracts the rollout reconstruction tests into `codex-rs/core/src/codex/rollout_reconstruction_tests.rs` so this logic has a dedicated home instead of living inline in `codex.rs`.
1 parent 6046ca1 commit 695957a

File tree

7 files changed

+1796
-463
lines changed

7 files changed

+1796
-463
lines changed

codex-rs/core/src/codex.rs

Lines changed: 242 additions & 437 deletions
Large diffs are not rendered by default.
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
use super::*;
2+
3+
// Return value of `Session::reconstruct_history_from_rollout`, bundling the rebuilt history with
4+
// the resume/fork hydration metadata derived from the same replay.
5+
#[derive(Debug)]
6+
pub(super) struct RolloutReconstruction {
7+
pub(super) history: Vec<ResponseItem>,
8+
pub(super) previous_model: Option<String>,
9+
pub(super) reference_context_item: Option<TurnContextItem>,
10+
}
11+
12+
#[derive(Debug, Default)]
13+
enum TurnReferenceContextItem {
14+
/// No `TurnContextItem` has been seen for this replay span yet.
15+
///
16+
/// This differs from `Cleared`: `NeverSet` means there is no evidence this turn ever
17+
/// established a baseline, while `Cleared` means a baseline existed and a later compaction
18+
/// invalidated it. Only the latter must emit an explicit clearing segment for resume/fork
19+
/// hydration.
20+
#[default]
21+
NeverSet,
22+
/// A previously established baseline was invalidated by later compaction.
23+
Cleared,
24+
/// The latest baseline established by this replay span.
25+
Latest(Box<TurnContextItem>),
26+
}
27+
28+
#[derive(Debug, Default)]
29+
struct ActiveReplaySegment<'a> {
30+
turn_id: Option<String>,
31+
counts_as_user_turn: bool,
32+
previous_model: Option<String>,
33+
reference_context_item: TurnReferenceContextItem,
34+
base_replacement_history: Option<&'a [ResponseItem]>,
35+
}
36+
37+
fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&str>) -> bool {
38+
active_turn_id
39+
.is_none_or(|turn_id| item_turn_id.is_none_or(|item_turn_id| item_turn_id == turn_id))
40+
}
41+
42+
fn finalize_active_segment<'a>(
43+
active_segment: ActiveReplaySegment<'a>,
44+
base_replacement_history: &mut Option<&'a [ResponseItem]>,
45+
previous_model: &mut Option<String>,
46+
reference_context_item: &mut TurnReferenceContextItem,
47+
pending_rollback_turns: &mut usize,
48+
) {
49+
// Thread rollback drops the newest surviving real user-message boundaries. In replay, that
50+
// means skipping the next finalized segments that contain a non-contextual
51+
// `EventMsg::UserMessage`.
52+
if *pending_rollback_turns > 0 {
53+
if active_segment.counts_as_user_turn {
54+
*pending_rollback_turns -= 1;
55+
}
56+
return;
57+
}
58+
59+
// A surviving replacement-history checkpoint is a complete history base. Once we
60+
// know the newest surviving one, older rollout items do not affect rebuilt history.
61+
if base_replacement_history.is_none()
62+
&& let Some(segment_base_replacement_history) = active_segment.base_replacement_history
63+
{
64+
*base_replacement_history = Some(segment_base_replacement_history);
65+
}
66+
67+
// `previous_model` comes from the newest surviving user turn that established one.
68+
if previous_model.is_none() && active_segment.counts_as_user_turn {
69+
*previous_model = active_segment.previous_model;
70+
}
71+
72+
// `reference_context_item` comes from the newest surviving user turn baseline, or
73+
// from a surviving compaction that explicitly cleared that baseline.
74+
if matches!(reference_context_item, TurnReferenceContextItem::NeverSet)
75+
&& (active_segment.counts_as_user_turn
76+
|| matches!(
77+
active_segment.reference_context_item,
78+
TurnReferenceContextItem::Cleared
79+
))
80+
{
81+
*reference_context_item = active_segment.reference_context_item;
82+
}
83+
}
84+
85+
impl Session {
86+
pub(super) async fn reconstruct_history_from_rollout(
87+
&self,
88+
turn_context: &TurnContext,
89+
rollout_items: &[RolloutItem],
90+
) -> RolloutReconstruction {
91+
// Replay metadata should already match the shape of the future lazy reverse loader, even
92+
// while history materialization still uses an eager bridge. Scan newest-to-oldest,
93+
// stopping once a surviving replacement-history checkpoint and the required resume metadata
94+
// are both known; then replay only the buffered surviving tail forward to preserve exact
95+
// history semantics.
96+
let mut base_replacement_history: Option<&[ResponseItem]> = None;
97+
let mut previous_model = None;
98+
let mut reference_context_item = TurnReferenceContextItem::NeverSet;
99+
// Rollback is "drop the newest N user turns". While scanning in reverse, that becomes
100+
// "skip the next N user-turn segments we finalize".
101+
let mut pending_rollback_turns = 0usize;
102+
// Borrowed suffix of rollout items newer than the newest surviving replacement-history
103+
// checkpoint. If no such checkpoint exists, this remains the full rollout.
104+
let mut rollout_suffix = rollout_items;
105+
// Reverse replay accumulates rollout items into the newest in-progress turn segment until
106+
// we hit its matching `TurnStarted`, at which point the segment can be finalized.
107+
let mut active_segment: Option<ActiveReplaySegment<'_>> = None;
108+
109+
for (index, item) in rollout_items.iter().enumerate().rev() {
110+
match item {
111+
RolloutItem::Compacted(compacted) => {
112+
let active_segment =
113+
active_segment.get_or_insert_with(ActiveReplaySegment::default);
114+
// Looking backward, compaction clears any older baseline unless a newer
115+
// `TurnContextItem` in this same segment has already re-established it.
116+
if matches!(
117+
active_segment.reference_context_item,
118+
TurnReferenceContextItem::NeverSet
119+
) {
120+
active_segment.reference_context_item = TurnReferenceContextItem::Cleared;
121+
}
122+
if active_segment.base_replacement_history.is_none()
123+
&& let Some(replacement_history) = &compacted.replacement_history
124+
{
125+
active_segment.base_replacement_history = Some(replacement_history);
126+
rollout_suffix = &rollout_items[index + 1..];
127+
}
128+
}
129+
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
130+
pending_rollback_turns = pending_rollback_turns
131+
.saturating_add(usize::try_from(rollback.num_turns).unwrap_or(usize::MAX));
132+
}
133+
RolloutItem::EventMsg(EventMsg::TurnComplete(event)) => {
134+
let active_segment =
135+
active_segment.get_or_insert_with(ActiveReplaySegment::default);
136+
// Reverse replay often sees `TurnComplete` before any turn-scoped metadata.
137+
// Capture the turn id early so later `TurnContext` / abort items can match it.
138+
if active_segment.turn_id.is_none() {
139+
active_segment.turn_id = Some(event.turn_id.clone());
140+
}
141+
}
142+
RolloutItem::EventMsg(EventMsg::TurnAborted(event)) => {
143+
if let Some(active_segment) = active_segment.as_mut() {
144+
if active_segment.turn_id.is_none()
145+
&& let Some(turn_id) = &event.turn_id
146+
{
147+
active_segment.turn_id = Some(turn_id.clone());
148+
}
149+
} else if let Some(turn_id) = &event.turn_id {
150+
active_segment = Some(ActiveReplaySegment {
151+
turn_id: Some(turn_id.clone()),
152+
..Default::default()
153+
});
154+
}
155+
}
156+
RolloutItem::EventMsg(EventMsg::UserMessage(_)) => {
157+
let active_segment =
158+
active_segment.get_or_insert_with(ActiveReplaySegment::default);
159+
active_segment.counts_as_user_turn = true;
160+
}
161+
RolloutItem::TurnContext(ctx) => {
162+
let active_segment =
163+
active_segment.get_or_insert_with(ActiveReplaySegment::default);
164+
// `TurnContextItem` can attach metadata to an existing segment, but only a
165+
// real `UserMessage` event should make the segment count as a user turn.
166+
if active_segment.turn_id.is_none() {
167+
active_segment.turn_id = ctx.turn_id.clone();
168+
}
169+
if turn_ids_are_compatible(
170+
active_segment.turn_id.as_deref(),
171+
ctx.turn_id.as_deref(),
172+
) {
173+
active_segment.previous_model = Some(ctx.model.clone());
174+
if matches!(
175+
active_segment.reference_context_item,
176+
TurnReferenceContextItem::NeverSet
177+
) {
178+
active_segment.reference_context_item =
179+
TurnReferenceContextItem::Latest(Box::new(ctx.clone()));
180+
}
181+
}
182+
}
183+
RolloutItem::EventMsg(EventMsg::TurnStarted(event)) => {
184+
// `TurnStarted` is the oldest boundary of the active reverse segment.
185+
if active_segment.as_ref().is_some_and(|active_segment| {
186+
turn_ids_are_compatible(
187+
active_segment.turn_id.as_deref(),
188+
Some(event.turn_id.as_str()),
189+
)
190+
}) && let Some(active_segment) = active_segment.take()
191+
{
192+
finalize_active_segment(
193+
active_segment,
194+
&mut base_replacement_history,
195+
&mut previous_model,
196+
&mut reference_context_item,
197+
&mut pending_rollback_turns,
198+
);
199+
}
200+
}
201+
RolloutItem::ResponseItem(_)
202+
| RolloutItem::EventMsg(_)
203+
| RolloutItem::SessionMeta(_) => {}
204+
}
205+
206+
if base_replacement_history.is_some()
207+
&& previous_model.is_some()
208+
&& !matches!(reference_context_item, TurnReferenceContextItem::NeverSet)
209+
{
210+
// At this point we have both eager resume metadata values and the replacement-
211+
// history base for the surviving tail, so older rollout items cannot affect this
212+
// result.
213+
break;
214+
}
215+
}
216+
217+
if let Some(active_segment) = active_segment.take() {
218+
finalize_active_segment(
219+
active_segment,
220+
&mut base_replacement_history,
221+
&mut previous_model,
222+
&mut reference_context_item,
223+
&mut pending_rollback_turns,
224+
);
225+
}
226+
227+
let mut history = ContextManager::new();
228+
let mut saw_legacy_compaction_without_replacement_history = false;
229+
if let Some(base_replacement_history) = base_replacement_history {
230+
history.replace(base_replacement_history.to_vec());
231+
}
232+
// Materialize exact history semantics from the replay-derived suffix. The eventual lazy
233+
// design should keep this same replay shape, but drive it from a resumable reverse source
234+
// instead of an eagerly loaded `&[RolloutItem]`.
235+
for item in rollout_suffix {
236+
match item {
237+
RolloutItem::ResponseItem(response_item) => {
238+
history.record_items(
239+
std::iter::once(response_item),
240+
turn_context.truncation_policy,
241+
);
242+
}
243+
RolloutItem::Compacted(compacted) => {
244+
if let Some(replacement_history) = &compacted.replacement_history {
245+
// This should actually never happen, because the reverse loop above (to build rollout_suffix)
246+
// should stop before any compaction that has Some replacement_history
247+
history.replace(replacement_history.clone());
248+
} else {
249+
saw_legacy_compaction_without_replacement_history = true;
250+
// Legacy rollouts without `replacement_history` should rebuild the
251+
// historical TurnContext at the correct insertion point from persisted
252+
// `TurnContextItem`s. These are rare enough that we currently just clear
253+
// `reference_context_item`, reinject canonical context at the end of the
254+
// resumed conversation, and accept the temporary out-of-distribution
255+
// prompt shape.
256+
// TODO(ccunningham): if we drop support for None replacement_history compaction items,
257+
// we can get rid of this second loop entirely and just build `history` directly in the first loop.
258+
let user_messages = collect_user_messages(history.raw_items());
259+
let rebuilt = compact::build_compacted_history(
260+
Vec::new(),
261+
&user_messages,
262+
&compacted.message,
263+
);
264+
history.replace(rebuilt);
265+
}
266+
}
267+
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
268+
history.drop_last_n_user_turns(rollback.num_turns);
269+
}
270+
RolloutItem::EventMsg(_)
271+
| RolloutItem::TurnContext(_)
272+
| RolloutItem::SessionMeta(_) => {}
273+
}
274+
}
275+
276+
let reference_context_item = match reference_context_item {
277+
TurnReferenceContextItem::NeverSet | TurnReferenceContextItem::Cleared => None,
278+
TurnReferenceContextItem::Latest(turn_reference_context_item) => {
279+
Some(*turn_reference_context_item)
280+
}
281+
};
282+
let reference_context_item = if saw_legacy_compaction_without_replacement_history {
283+
None
284+
} else {
285+
reference_context_item
286+
};
287+
288+
RolloutReconstruction {
289+
history: history.raw_items().to_vec(),
290+
previous_model,
291+
reference_context_item,
292+
}
293+
}
294+
}

0 commit comments

Comments
 (0)