Skip to content

Commit a11da86

Browse files
authored
Make realtime audio test deterministic (#12959)
## Summary\n- add a websocket test-server request waiter so tests can synchronize on recorded client messages\n- use that waiter in the realtime delegation test instead of a fixed audio timeout\n- add temporary timing logs in the test and websocket mock to inspect where the flake stalls
1 parent 90cc4e7 commit a11da86

File tree

2 files changed

+111
-11
lines changed

2 files changed

+111
-11
lines changed

codex-rs/core/tests/common/responses.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use futures::SinkExt;
1212
use futures::StreamExt;
1313
use serde_json::Value;
1414
use tokio::net::TcpListener;
15+
use tokio::sync::Notify;
1516
use tokio::sync::oneshot;
1617
use tokio_tungstenite::accept_hdr_async_with_config;
1718
use tokio_tungstenite::tungstenite::Message;
@@ -335,6 +336,7 @@ pub struct WebSocketTestServer {
335336
uri: String,
336337
connections: Arc<Mutex<Vec<Vec<WebSocketRequest>>>>,
337338
handshakes: Arc<Mutex<Vec<WebSocketHandshake>>>,
339+
request_log_updated: Arc<Notify>,
338340
shutdown: oneshot::Sender<()>,
339341
task: tokio::task::JoinHandle<()>,
340342
}
@@ -356,6 +358,26 @@ impl WebSocketTestServer {
356358
connections.first().cloned().unwrap_or_default()
357359
}
358360

361+
pub async fn wait_for_request(
362+
&self,
363+
connection_index: usize,
364+
request_index: usize,
365+
) -> WebSocketRequest {
366+
loop {
367+
if let Some(request) = self
368+
.connections
369+
.lock()
370+
.unwrap()
371+
.get(connection_index)
372+
.and_then(|connection| connection.get(request_index))
373+
.cloned()
374+
{
375+
return request;
376+
}
377+
self.request_log_updated.notified().await;
378+
}
379+
}
380+
359381
pub fn handshakes(&self) -> Vec<WebSocketHandshake> {
360382
self.handshakes.lock().unwrap().clone()
361383
}
@@ -1069,15 +1091,18 @@ pub async fn start_websocket_server(connections: Vec<Vec<Vec<Value>>>) -> WebSoc
10691091
pub async fn start_websocket_server_with_headers(
10701092
connections: Vec<WebSocketConnectionConfig>,
10711093
) -> WebSocketTestServer {
1094+
let start = std::time::Instant::now();
10721095
let listener = TcpListener::bind("127.0.0.1:0")
10731096
.await
10741097
.expect("bind websocket server");
10751098
let addr = listener.local_addr().expect("websocket server address");
10761099
let uri = format!("ws://{addr}");
10771100
let connections_log = Arc::new(Mutex::new(Vec::new()));
10781101
let handshakes_log = Arc::new(Mutex::new(Vec::new()));
1102+
let request_log_updated = Arc::new(Notify::new());
10791103
let requests = Arc::clone(&connections_log);
10801104
let handshakes = Arc::clone(&handshakes_log);
1105+
let request_log = Arc::clone(&request_log_updated);
10811106
let connections = Arc::new(Mutex::new(VecDeque::from(connections)));
10821107
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
10831108

@@ -1159,9 +1184,51 @@ pub async fn start_websocket_server_with_headers(
11591184
let mut log = requests.lock().unwrap();
11601185
if let Some(connection_log) = log.get_mut(connection_index) {
11611186
connection_log.push(WebSocketRequest { body });
1187+
let request_index = connection_log.len() - 1;
1188+
let request = &connection_log[request_index];
1189+
let request_body = request.body_json();
1190+
eprintln!(
1191+
"[ws test server +{}ms] connection={} received request={} type={:?} role={:?} text={:?} data={:?}",
1192+
start.elapsed().as_millis(),
1193+
connection_index,
1194+
request_index,
1195+
request_body.get("type").and_then(Value::as_str),
1196+
request_body
1197+
.get("item")
1198+
.and_then(|item| item.get("role"))
1199+
.and_then(Value::as_str),
1200+
request_body
1201+
.get("item")
1202+
.and_then(|item| item.get("content"))
1203+
.and_then(Value::as_array)
1204+
.and_then(|content| content.first())
1205+
.and_then(|content| content.get("text"))
1206+
.and_then(Value::as_str),
1207+
request_body
1208+
.get("item")
1209+
.and_then(|item| item.get("content"))
1210+
.and_then(Value::as_array)
1211+
.and_then(|content| content.first())
1212+
.and_then(|content| content.get("data"))
1213+
.and_then(Value::as_str),
1214+
);
11621215
}
1216+
request_log.notify_waiters();
11631217
}
11641218

1219+
eprintln!(
1220+
"[ws test server +{}ms] connection={} sending batch_size={} event_types={:?} audio_data={:?}",
1221+
start.elapsed().as_millis(),
1222+
connection_index,
1223+
request_events.len(),
1224+
request_events
1225+
.iter()
1226+
.map(|event| event.get("type").and_then(Value::as_str))
1227+
.collect::<Vec<_>>(),
1228+
request_events
1229+
.iter()
1230+
.find_map(|event| event.get("delta").and_then(Value::as_str)),
1231+
);
11651232
for event in &request_events {
11661233
let Ok(payload) = serde_json::to_string(event) else {
11671234
continue;
@@ -1184,6 +1251,7 @@ pub async fn start_websocket_server_with_headers(
11841251
uri,
11851252
connections: connections_log,
11861253
handshakes: handshakes_log,
1254+
request_log_updated,
11871255
shutdown: shutdown_tx,
11881256
task,
11891257
}

codex-rs/core/tests/suite/realtime_conversation.rs

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,7 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R
715715
async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()>
716716
{
717717
skip_if_no_network!(Ok(()));
718+
let start = std::time::Instant::now();
718719

719720
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
720721
let first_chunks = vec![
@@ -806,18 +807,45 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
806807
_ => None,
807808
})
808809
.await;
810+
eprintln!(
811+
"[realtime test +{}ms] saw trigger text={:?}",
812+
start.elapsed().as_millis(),
813+
"delegate now"
814+
);
809815

810-
let audio_out = tokio::time::timeout(
811-
Duration::from_millis(500),
812-
wait_for_event_match(&test.codex, |msg| match msg {
813-
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
814-
payload: RealtimeEvent::AudioOut(frame),
815-
}) => Some(frame.clone()),
816-
_ => None,
817-
}),
818-
)
819-
.await
820-
.expect("timed out waiting for realtime audio after echoed user-role message");
816+
let mirrored_request = realtime_server.wait_for_request(0, 1).await;
817+
let mirrored_request_body = mirrored_request.body_json();
818+
eprintln!(
819+
"[realtime test +{}ms] saw mirrored request type={:?} role={:?} text={:?} data={:?}",
820+
start.elapsed().as_millis(),
821+
mirrored_request_body["type"].as_str(),
822+
mirrored_request_body["item"]["role"].as_str(),
823+
mirrored_request_body["item"]["content"][0]["text"].as_str(),
824+
mirrored_request_body["item"]["content"][0]["data"].as_str(),
825+
);
826+
assert_eq!(
827+
mirrored_request_body["type"].as_str(),
828+
Some("conversation.item.create")
829+
);
830+
assert_eq!(
831+
mirrored_request_body["item"]["content"][0]["text"].as_str(),
832+
Some("assistant says hi")
833+
);
834+
835+
let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
836+
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
837+
payload: RealtimeEvent::AudioOut(frame),
838+
}) => Some(frame.clone()),
839+
_ => None,
840+
})
841+
.await;
842+
eprintln!(
843+
"[realtime test +{}ms] saw audio out data={} sample_rate={} num_channels={}",
844+
start.elapsed().as_millis(),
845+
audio_out.data,
846+
audio_out.sample_rate,
847+
audio_out.num_channels
848+
);
821849
assert_eq!(audio_out.data, "AQID");
822850

823851
let completion = completions
@@ -828,6 +856,10 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
828856
completion
829857
.await
830858
.expect("delegated turn request did not complete");
859+
eprintln!(
860+
"[realtime test +{}ms] delegated completion resolved",
861+
start.elapsed().as_millis()
862+
);
831863
wait_for_event(&test.codex, |event| {
832864
matches!(event, EventMsg::TurnComplete(_))
833865
})

0 commit comments

Comments
 (0)