Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions codex-rs/core/tests/common/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::SinkExt;
use futures::StreamExt;
use serde_json::Value;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::sync::oneshot;
use tokio_tungstenite::accept_hdr_async_with_config;
use tokio_tungstenite::tungstenite::Message;
Expand Down Expand Up @@ -335,6 +336,7 @@ pub struct WebSocketTestServer {
uri: String,
connections: Arc<Mutex<Vec<Vec<WebSocketRequest>>>>,
handshakes: Arc<Mutex<Vec<WebSocketHandshake>>>,
request_log_updated: Arc<Notify>,
shutdown: oneshot::Sender<()>,
task: tokio::task::JoinHandle<()>,
}
Expand All @@ -356,6 +358,26 @@ impl WebSocketTestServer {
connections.first().cloned().unwrap_or_default()
}

pub async fn wait_for_request(
&self,
connection_index: usize,
request_index: usize,
) -> WebSocketRequest {
loop {
if let Some(request) = self
.connections
.lock()
.unwrap()
.get(connection_index)
.and_then(|connection| connection.get(request_index))
.cloned()
{
return request;
}
self.request_log_updated.notified().await;
}
}

pub fn handshakes(&self) -> Vec<WebSocketHandshake> {
self.handshakes.lock().unwrap().clone()
}
Expand Down Expand Up @@ -1076,8 +1098,10 @@ pub async fn start_websocket_server_with_headers(
let uri = format!("ws://{addr}");
let connections_log = Arc::new(Mutex::new(Vec::new()));
let handshakes_log = Arc::new(Mutex::new(Vec::new()));
let request_log_updated = Arc::new(Notify::new());
let requests = Arc::clone(&connections_log);
let handshakes = Arc::clone(&handshakes_log);
let request_log = Arc::clone(&request_log_updated);
let connections = Arc::new(Mutex::new(VecDeque::from(connections)));
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();

Expand Down Expand Up @@ -1160,6 +1184,7 @@ pub async fn start_websocket_server_with_headers(
if let Some(connection_log) = log.get_mut(connection_index) {
connection_log.push(WebSocketRequest { body });
}
request_log.notify_waiters();
}

for event in &request_events {
Expand All @@ -1184,6 +1209,7 @@ pub async fn start_websocket_server_with_headers(
uri,
connections: connections_log,
handshakes: handshakes_log,
request_log_updated,
shutdown: shutdown_tx,
task,
}
Expand Down
28 changes: 17 additions & 11 deletions codex-rs/core/tests/suite/realtime_conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,17 +807,23 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
})
.await;

let audio_out = tokio::time::timeout(
Duration::from_millis(500),
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) => Some(frame.clone()),
_ => None,
}),
)
.await
.expect("timed out waiting for realtime audio after echoed user-role message");
let mirrored_request = realtime_server.wait_for_request(0, 1).await;
assert_eq!(
mirrored_request.body_json()["type"].as_str(),
Some("conversation.item.create")
);
assert_eq!(
mirrored_request.body_json()["item"]["content"][0]["text"].as_str(),
Some("assistant says hi")
);

let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::AudioOut(frame),
}) => Some(frame.clone()),
_ => None,
})
.await;
assert_eq!(audio_out.data, "AQID");

let completion = completions
Expand Down
Loading