Skip to content

Commit d0a1dc7

Browse files
committed
app-server: Replay pending item requests on thread/resume
Replay pending item-scoped client requests after `thread/resume` so per-request state stays in sync across all connections when a client reconnects/resumes. Affected RPCs: - `item/fileChange/requestApproval` - `item/tool/requestUserInput` - `item/commandExecution/requestApproval` Motivation: - Maintain per-request state in sync across all connections when a client reconnects/resumes. Implementation notes: - Track pending item requests in `ThreadState` (item id, request id, payload, delivered connections) so they can be replayed to resumed connections. - Replay uses the original request id and only targets connections that have not received the request yet. - Clear/serialize per-item pending request state around responses and item completion notifications to avoid stale or duplicate replays. - Wire replay into both resume paths so pending requests are resent consistently when a connection is reattached. High-level test plan: - Added automated coverage for pending request tracking/delivery bookkeeping in thread state. - Added automated coverage for replaying an existing pending request to another connection and ensuring it stops replaying after the response resolves. - Verified app-server resume/pending request handling behavior in automated tests. Manual testing: - Tested reconnect/resume with multiple connections. - Confirmed state stayed in sync between connections.
1 parent e8949f4 commit d0a1dc7

File tree

4 files changed

+533
-28
lines changed

4 files changed

+533
-28
lines changed

codex-rs/app-server/src/bespoke_event_handling.rs

Lines changed: 120 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::error_code::INTERNAL_ERROR_CODE;
66
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
77
use crate::outgoing_message::ClientRequestResult;
88
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
9+
use crate::thread_state::PendingItemRequest;
910
use crate::thread_state::ThreadState;
1011
use crate::thread_state::TurnSummary;
1112
use crate::thread_status::ThreadWatchActiveGuard;
@@ -229,9 +230,13 @@ pub(crate) async fn apply_bespoke_event_handling(
229230
reason,
230231
grant_root,
231232
};
232-
let rx = outgoing
233-
.send_request(ServerRequestPayload::FileChangeRequestApproval(params))
234-
.await;
233+
let rx = send_and_track_pending_item_request(
234+
item_id.clone(),
235+
ServerRequestPayload::FileChangeRequestApproval(params),
236+
&outgoing,
237+
&thread_state,
238+
)
239+
.await;
235240
tokio::spawn(async move {
236241
on_file_change_request_approval_response(
237242
event_turn_id,
@@ -338,11 +343,13 @@ pub(crate) async fn apply_bespoke_event_handling(
338343
command_actions,
339344
proposed_execpolicy_amendment: proposed_execpolicy_amendment_v2,
340345
};
341-
let rx = outgoing
342-
.send_request(ServerRequestPayload::CommandExecutionRequestApproval(
343-
params,
344-
))
345-
.await;
346+
let rx = send_and_track_pending_item_request(
347+
call_id.clone(),
348+
ServerRequestPayload::CommandExecutionRequestApproval(params),
349+
&outgoing,
350+
&thread_state,
351+
)
352+
.await;
346353
tokio::spawn(async move {
347354
on_command_execution_request_approval_response(
348355
event_turn_id,
@@ -389,17 +396,24 @@ pub(crate) async fn apply_bespoke_event_handling(
389396
let params = ToolRequestUserInputParams {
390397
thread_id: conversation_id.to_string(),
391398
turn_id: request.turn_id,
392-
item_id: request.call_id,
399+
item_id: request.call_id.clone(),
393400
questions,
394401
};
395-
let rx = outgoing
396-
.send_request(ServerRequestPayload::ToolRequestUserInput(params))
397-
.await;
402+
let item_id = request.call_id;
403+
let rx = send_and_track_pending_item_request(
404+
item_id.clone(),
405+
ServerRequestPayload::ToolRequestUserInput(params),
406+
&outgoing,
407+
&thread_state,
408+
)
409+
.await;
398410
tokio::spawn(async move {
399411
on_request_user_input_response(
400412
event_turn_id,
413+
item_id,
401414
rx,
402415
conversation,
416+
thread_state,
403417
user_input_guard,
404418
)
405419
.await;
@@ -477,9 +491,17 @@ pub(crate) async fn apply_bespoke_event_handling(
477491
event_turn_id.clone(),
478492
)
479493
.await;
480-
outgoing
481-
.send_server_notification(ServerNotification::ItemCompleted(notification))
482-
.await;
494+
let ThreadItem::McpToolCall { id: item_id, .. } = &notification.item else {
495+
unreachable!();
496+
};
497+
let item_id = item_id.clone();
498+
ThreadState::with_pending_item_request(&thread_state, &item_id, |_| async move {
499+
outgoing
500+
.send_server_notification(ServerNotification::ItemCompleted(notification))
501+
.await;
502+
(None, ())
503+
})
504+
.await;
483505
}
484506
EventMsg::CollabAgentSpawnBegin(begin_event) => {
485507
let item = ThreadItem::CollabAgentToolCall {
@@ -1166,9 +1188,17 @@ pub(crate) async fn apply_bespoke_event_handling(
11661188
turn_id: event_turn_id.clone(),
11671189
item,
11681190
};
1169-
outgoing
1170-
.send_server_notification(ServerNotification::ItemCompleted(notification))
1171-
.await;
1191+
let item_id = match &notification.item {
1192+
ThreadItem::CommandExecution { id, .. } => id.clone(),
1193+
_ => unreachable!(),
1194+
};
1195+
ThreadState::with_pending_item_request(&thread_state, &item_id, |_| async move {
1196+
outgoing
1197+
.send_server_notification(ServerNotification::ItemCompleted(notification))
1198+
.await;
1199+
(None, ())
1200+
})
1201+
.await;
11721202
}
11731203
// If this is a TurnAborted, reply to any pending interrupt requests.
11741204
EventMsg::TurnAborted(turn_aborted_event) => {
@@ -1377,6 +1407,46 @@ async fn emit_turn_completed_with_status(
13771407
.await;
13781408
}
13791409

1410+
fn closed_request_receiver() -> oneshot::Receiver<ClientRequestResult> {
1411+
let (_sender, receiver) = oneshot::channel();
1412+
receiver
1413+
}
1414+
1415+
async fn send_and_track_pending_item_request(
1416+
item_id: String,
1417+
payload: ServerRequestPayload,
1418+
outgoing: &ThreadScopedOutgoingMessageSender,
1419+
thread_state: &Arc<Mutex<ThreadState>>,
1420+
) -> oneshot::Receiver<ClientRequestResult> {
1421+
let item_id_for_state = item_id.clone();
1422+
ThreadState::with_pending_item_request(
1423+
thread_state,
1424+
&item_id_for_state,
1425+
|pending_item_request| async move {
1426+
let payload_for_send = payload.clone();
1427+
let Some((request_id, receiver)) =
1428+
outgoing.send_request_with_id(payload_for_send).await
1429+
else {
1430+
return (pending_item_request, closed_request_receiver());
1431+
};
1432+
(
1433+
Some(PendingItemRequest::new(
1434+
item_id,
1435+
request_id,
1436+
payload,
1437+
outgoing.connection_ids(),
1438+
)),
1439+
receiver,
1440+
)
1441+
},
1442+
)
1443+
.await
1444+
}
1445+
1446+
async fn clear_pending_item_request(item_id: &str, thread_state: &Arc<Mutex<ThreadState>>) {
1447+
ThreadState::with_pending_item_request(thread_state, item_id, |_| async { (None, ()) }).await;
1448+
}
1449+
13801450
async fn complete_file_change_item(
13811451
conversation_id: ThreadId,
13821452
item_id: String,
@@ -1386,10 +1456,9 @@ async fn complete_file_change_item(
13861456
outgoing: &ThreadScopedOutgoingMessageSender,
13871457
thread_state: &Arc<Mutex<ThreadState>>,
13881458
) {
1389-
let mut state = thread_state.lock().await;
1390-
state.turn_summary.file_change_started.remove(&item_id);
1391-
drop(state);
1392-
1459+
let item_id_for_state = item_id.clone();
1460+
let item_id_for_turn_summary = item_id.clone();
1461+
let thread_state_for_turn_summary = Arc::clone(thread_state);
13931462
let item = ThreadItem::FileChange {
13941463
id: item_id,
13951464
changes,
@@ -1400,9 +1469,20 @@ async fn complete_file_change_item(
14001469
turn_id,
14011470
item,
14021471
};
1403-
outgoing
1404-
.send_server_notification(ServerNotification::ItemCompleted(notification))
1405-
.await;
1472+
ThreadState::with_pending_item_request(thread_state, &item_id_for_state, |_| async move {
1473+
{
1474+
let mut state = thread_state_for_turn_summary.lock().await;
1475+
state
1476+
.turn_summary
1477+
.file_change_started
1478+
.remove(&item_id_for_turn_summary);
1479+
}
1480+
outgoing
1481+
.send_server_notification(ServerNotification::ItemCompleted(notification))
1482+
.await;
1483+
(None, ())
1484+
})
1485+
.await;
14061486
}
14071487

14081488
#[allow(clippy::too_many_arguments)]
@@ -1416,7 +1496,9 @@ async fn complete_command_execution_item(
14161496
command_actions: Vec<V2ParsedCommand>,
14171497
status: CommandExecutionStatus,
14181498
outgoing: &ThreadScopedOutgoingMessageSender,
1499+
thread_state: &Arc<Mutex<ThreadState>>,
14191500
) {
1501+
let item_id_for_state = item_id.clone();
14201502
let item = ThreadItem::CommandExecution {
14211503
id: item_id,
14221504
command,
@@ -1433,9 +1515,13 @@ async fn complete_command_execution_item(
14331515
turn_id,
14341516
item,
14351517
};
1436-
outgoing
1437-
.send_server_notification(ServerNotification::ItemCompleted(notification))
1438-
.await;
1518+
ThreadState::with_pending_item_request(thread_state, &item_id_for_state, |_| async move {
1519+
outgoing
1520+
.send_server_notification(ServerNotification::ItemCompleted(notification))
1521+
.await;
1522+
(None, ())
1523+
})
1524+
.await;
14391525
}
14401526

14411527
async fn maybe_emit_raw_response_item_completed(
@@ -1659,12 +1745,15 @@ async fn on_exec_approval_response(
16591745

16601746
async fn on_request_user_input_response(
16611747
event_turn_id: String,
1748+
item_id: String,
16621749
receiver: oneshot::Receiver<ClientRequestResult>,
16631750
conversation: Arc<CodexThread>,
1751+
thread_state: Arc<Mutex<ThreadState>>,
16641752
user_input_guard: ThreadWatchActiveGuard,
16651753
) {
16661754
let response = receiver.await;
16671755
drop(user_input_guard);
1756+
clear_pending_item_request(&item_id, &thread_state).await;
16681757
let value = match response {
16691758
Ok(Ok(value)) => value,
16701759
Ok(Err(err)) => {
@@ -1785,6 +1874,7 @@ async fn on_file_change_request_approval_response(
17851874
) {
17861875
let response = receiver.await;
17871876
drop(permission_guard);
1877+
clear_pending_item_request(&item_id, &thread_state).await;
17881878
let (decision, completion_status) = match response {
17891879
Ok(Ok(value)) => {
17901880
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
@@ -1850,6 +1940,7 @@ async fn on_command_execution_request_approval_response(
18501940
) {
18511941
let response = receiver.await;
18521942
drop(permission_guard);
1943+
clear_pending_item_request(&item_id, &thread_state).await;
18531944
let (decision, completion_status) = match response {
18541945
Ok(Ok(value)) => {
18551946
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
@@ -1925,6 +2016,7 @@ async fn on_command_execution_request_approval_response(
19252016
completion_item.command_actions,
19262017
status,
19272018
&outgoing,
2019+
&thread_state,
19282020
)
19292021
.await;
19302022
}

codex-rs/app-server/src/codex_message_processor.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2938,6 +2938,12 @@ impl CodexMessageProcessor {
29382938
thread_id,
29392939
err.message
29402940
);
2941+
} else {
2942+
self.replay_pending_item_requests_to_connection(
2943+
thread_id,
2944+
request_id.connection_id,
2945+
)
2946+
.await;
29412947
}
29422948

29432949
let Some(mut thread) = self
@@ -5985,6 +5991,48 @@ impl CodexMessageProcessor {
59855991
}
59865992
});
59875993
}
5994+
5995+
async fn replay_pending_item_requests_to_connection(
5996+
&mut self,
5997+
thread_id: ThreadId,
5998+
connection_id: ConnectionId,
5999+
) {
6000+
let thread_state = self.thread_state_manager.thread_state(thread_id);
6001+
let pending_item_ids =
6002+
ThreadState::pending_item_ids_for_connection(&thread_state, connection_id).await;
6003+
6004+
for item_id in pending_item_ids {
6005+
let outgoing = self.outgoing.clone();
6006+
ThreadState::with_pending_item_request(
6007+
&thread_state,
6008+
&item_id,
6009+
|pending_item_request| async move {
6010+
let Some(mut pending_request) = pending_item_request else {
6011+
return (None, ());
6012+
};
6013+
if !pending_request.needs_delivery_to(connection_id) {
6014+
return (Some(pending_request), ());
6015+
}
6016+
6017+
let request_id = pending_request.request_id.clone();
6018+
let sent = outgoing
6019+
.send_existing_request_to_connections(
6020+
&[connection_id],
6021+
request_id.clone(),
6022+
pending_request.payload.clone(),
6023+
)
6024+
.await;
6025+
if sent {
6026+
pending_request
6027+
.mark_delivered_if_request_matches(&request_id, connection_id);
6028+
}
6029+
(Some(pending_request), ())
6030+
},
6031+
)
6032+
.await;
6033+
}
6034+
}
6035+
59886036
async fn git_diff_to_origin(&self, request_id: ConnectionRequestId, cwd: PathBuf) {
59896037
let diff = git_diff_to_remote(&cwd).await;
59906038
match diff {
@@ -6373,6 +6421,38 @@ async fn handle_pending_thread_resume_request(
63736421
};
63746422
outgoing.send_response(request_id, response).await;
63756423
thread_state.lock().await.add_connection(connection_id);
6424+
6425+
let pending_item_ids =
6426+
ThreadState::pending_item_ids_for_connection(thread_state, connection_id).await;
6427+
for item_id in pending_item_ids {
6428+
let outgoing = outgoing.clone();
6429+
ThreadState::with_pending_item_request(
6430+
thread_state,
6431+
&item_id,
6432+
|pending_item_request| async move {
6433+
let Some(mut pending_request) = pending_item_request else {
6434+
return (None, ());
6435+
};
6436+
if !pending_request.needs_delivery_to(connection_id) {
6437+
return (Some(pending_request), ());
6438+
}
6439+
6440+
let request_id = pending_request.request_id.clone();
6441+
let sent = outgoing
6442+
.send_existing_request_to_connections(
6443+
&[connection_id],
6444+
request_id.clone(),
6445+
pending_request.payload.clone(),
6446+
)
6447+
.await;
6448+
if sent {
6449+
pending_request.mark_delivered_if_request_matches(&request_id, connection_id);
6450+
}
6451+
(Some(pending_request), ())
6452+
},
6453+
)
6454+
.await;
6455+
}
63766456
}
63776457

63786458
async fn load_thread_for_running_resume_response(

0 commit comments

Comments
 (0)