Skip to content

Commit 4a5ec1c

Browse files
authored
Merge pull request #915 from hchen2020/master
refactor realtime code.
2 parents a205b9b + 89fc99f commit 4a5ec1c

File tree

10 files changed

+254
-195
lines changed

10 files changed

+254
-195
lines changed

src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Task Connect(RealtimeHubConnection conn,
2424
Task Disconnect();
2525

2626
Task<RealtimeSession> CreateSession(Agent agent, List<RoleDialogModel> conversations);
27-
Task UpdateSession(RealtimeHubConnection conn);
27+
Task UpdateSession(RealtimeHubConnection conn, bool turnDetection = true);
2828
Task InsertConversationItem(RoleDialogModel message);
2929
Task RemoveConversationItem(string itemId);
3030
Task TriggerModelInference(string? instructions = null);

src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1+
using System.Collections.Concurrent;
2+
13
namespace BotSharp.Abstraction.Realtime.Models;
24

35
public class RealtimeHubConnection
46
{
57
public string Event { get; set; } = null!;
68
public string StreamId { get; set; } = null!;
9+
public string? LastAssistantItem { get; set; } = null!;
10+
public long LatestMediaTimestamp { get; set; }
11+
public long? ResponseStartTimestamp { get; set; }
12+
public string KeypadInputBuffer { get; set; } = string.Empty;
13+
public ConcurrentQueue<string> MarkQueue { get; set; } = new();
714
public string CurrentAgentId { get; set; } = null!;
815
public string ConversationId { get; set; } = null!;
916
public string Data { get; set; } = string.Empty;

src/Infrastructure/BotSharp.Core/Realtime/RealtimeHub.cs

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
using BotSharp.Abstraction.MLTasks;
55
using BotSharp.Abstraction.Conversations.Enums;
66
using BotSharp.Abstraction.Routing.Models;
7+
using NetTopologySuite.Index.HPRtree;
8+
using BotSharp.Abstraction.Agents.Models;
9+
using Microsoft.Identity.Client.Extensions.Msal;
10+
using Microsoft.AspNetCore.Cors.Infrastructure;
711

812
namespace BotSharp.Core.Realtime;
913

@@ -46,9 +50,14 @@ public async Task Listen(WebSocket userWebSocket,
4650
{
4751
await completer.AppenAudioBuffer(conn.Data);
4852
}
53+
else if (conn.Event == "user_dtmf_received")
54+
{
55+
await HandleUserDtmfReceived(completer, conn);
56+
}
4957
else if (conn.Event == "user_disconnected")
5058
{
5159
await completer.Disconnect();
60+
await HandleUserDisconnected(conn);
5261
}
5362
} while (!result.CloseStatus.HasValue);
5463

@@ -58,8 +67,6 @@ public async Task Listen(WebSocket userWebSocket,
5867
private async Task ConnectToModel(IRealTimeCompletion completer, WebSocket userWebSocket, RealtimeHubConnection conn)
5968
{
6069
var hookProvider = _services.GetRequiredService<ConversationHookProvider>();
61-
var storage = _services.GetRequiredService<IConversationStorage>();
62-
6370
var convService = _services.GetRequiredService<IConversationService>();
6471
convService.SetConversationId(conn.ConversationId, []);
6572
var conversation = await convService.GetConversation(conn.ConversationId);
@@ -92,8 +99,8 @@ private async Task ConnectToModel(IRealTimeCompletion completer, WebSocket userW
9299
await completer.Connect(conn,
93100
onModelReady: async () =>
94101
{
95-
// Control initial session
96-
await completer.UpdateSession(conn);
102+
// Control initial session, prevent initial response interruption
103+
await completer.UpdateSession(conn, turnDetection: false);
97104

98105
// Add dialog history
99106
foreach (var item in dialogs)
@@ -103,17 +110,41 @@ await completer.Connect(conn,
103110

104111
if (dialogs.LastOrDefault()?.Role == AgentRole.Assistant)
105112
{
106-
// await completer.TriggerModelInference($"Rephase your last response:\r\n{dialogs.LastOrDefault()?.Content}");
113+
await completer.TriggerModelInference($"Rephase your last response:\r\n{dialogs.LastOrDefault()?.Content}");
107114
}
108115
else
109116
{
110117
await completer.TriggerModelInference("Reply based on the conversation context.");
111118
}
119+
120+
// Start turn detection
121+
await Task.Delay(1000 * 8);
122+
await completer.UpdateSession(conn, turnDetection: true);
112123
},
113124
onModelAudioDeltaReceived: async audioDeltaData =>
114125
{
126+
// If this is the first delta of a new response, set the start timestamp
127+
if (!conn.ResponseStartTimestamp.HasValue)
128+
{
129+
conn.ResponseStartTimestamp = conn.LatestMediaTimestamp;
130+
_logger.LogDebug($"Setting start timestamp for new response: {conn.ResponseStartTimestamp}ms");
131+
}
132+
115133
var data = conn.OnModelMessageReceived(audioDeltaData);
116134
await SendEventToUser(userWebSocket, data);
135+
136+
// Send mark messages to Media Streams so we know if and when AI response playback is finished
137+
if (!string.IsNullOrEmpty(conn.StreamId))
138+
{
139+
var markEvent = new
140+
{
141+
@event = "mark",
142+
streamSid = conn.StreamId,
143+
mark = new { name = "responsePart" }
144+
};
145+
await SendEventToUser(userWebSocket, markEvent);
146+
conn.MarkQueue.Enqueue("responsePart");
147+
}
117148
},
118149
onModelAudioResponseDone: async () =>
119150
{
@@ -160,16 +191,18 @@ await completer.Connect(conn,
160191
await completer.TriggerModelInference("Reply based on the function's output.");
161192
}
162193
}
163-
// append output audio transcript to conversation
164-
storage.Append(conn.ConversationId, message);
165-
dialogs.Add(message);
166-
167-
foreach (var hook in hookProvider.HooksOrderByPriority)
194+
else
168195
{
169-
hook.SetAgent(agent)
170-
.SetConversation(conversation);
196+
// append output audio transcript to conversation
197+
dialogs.Add(message);
198+
199+
foreach (var hook in hookProvider.HooksOrderByPriority)
200+
{
201+
hook.SetAgent(agent)
202+
.SetConversation(conversation);
171203

172-
await hook.OnResponseGenerated(message);
204+
await hook.OnResponseGenerated(message);
205+
}
173206
}
174207
}
175208
},
@@ -180,7 +213,6 @@ await completer.Connect(conn,
180213
onInputAudioTranscriptionCompleted: async message =>
181214
{
182215
// append input audio transcript to conversation
183-
storage.Append(conn.ConversationId, message);
184216
dialogs.Add(message);
185217

186218
foreach (var hook in hookProvider.HooksOrderByPriority)
@@ -193,11 +225,56 @@ await completer.Connect(conn,
193225
},
194226
onUserInterrupted: async () =>
195227
{
228+
// Reset states
229+
conn.MarkQueue.Clear();
230+
conn.LastAssistantItem = null;
231+
conn.ResponseStartTimestamp = null;
232+
196233
var data = conn.OnModelUserInterrupted();
197234
await SendEventToUser(userWebSocket, data);
198235
});
199236
}
200237

238+
private async Task HandleUserDtmfReceived(IRealTimeCompletion completer, RealtimeHubConnection conn)
239+
{
240+
var routing = _services.GetRequiredService<IRoutingService>();
241+
var hookProvider = _services.GetRequiredService<ConversationHookProvider>();
242+
var agentService = _services.GetRequiredService<IAgentService>();
243+
var agent = await agentService.LoadAgent(conn.CurrentAgentId);
244+
var dialogs = routing.Context.GetDialogs();
245+
var convService = _services.GetRequiredService<IConversationService>();
246+
var conversation = await convService.GetConversation(conn.ConversationId);
247+
248+
var message = new RoleDialogModel(AgentRole.User, conn.Data)
249+
{
250+
CurrentAgentId = routing.Context.GetCurrentAgentId()
251+
};
252+
dialogs.Add(message);
253+
254+
foreach (var hook in hookProvider.HooksOrderByPriority)
255+
{
256+
hook.SetAgent(agent)
257+
.SetConversation(conversation);
258+
259+
await hook.OnMessageReceived(message);
260+
}
261+
262+
await completer.InsertConversationItem(message);
263+
await completer.TriggerModelInference("Reply based on the user input");
264+
}
265+
266+
private async Task HandleUserDisconnected(RealtimeHubConnection conn)
267+
{
268+
// Save dialog history
269+
var routing = _services.GetRequiredService<IRoutingService>();
270+
var storage = _services.GetRequiredService<IConversationStorage>();
271+
var dialogs = routing.Context.GetDialogs();
272+
foreach (var item in dialogs)
273+
{
274+
storage.Append(conn.ConversationId, item);
275+
}
276+
}
277+
201278
private async Task SendEventToUser(WebSocket webSocket, object message)
202279
{
203280
var data = JsonSerializer.Serialize(message);

src/Infrastructure/BotSharp.Core/Translation/TranslationResponseHook.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using BotSharp.Abstraction.Agents;
2+
using BotSharp.Abstraction.Conversations.Enums;
23
using BotSharp.Abstraction.Infrastructures.Enums;
34
using BotSharp.Abstraction.Translation;
45
using System;
@@ -26,6 +27,11 @@ public override async Task OnResponseGenerated(RoleDialogModel message)
2627
{
2728
return;
2829
}
30+
31+
if (_states.GetState("channel") == ConversationChannel.Phone)
32+
{
33+
return;
34+
}
2935

3036
// Handle multi-language for output
3137
var agentService = _services.GetRequiredService<IAgentService>();

src/Infrastructure/BotSharp.Logger/Hooks/VerboseLogHook.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public async Task BeforeGenerating(Agent agent, List<RoleDialogModel> conversati
3535

3636
public async Task AfterGenerated(RoleDialogModel message, TokenStatsModel tokenStats)
3737
{
38-
if (!_convSettings.ShowVerboseLog) return;
38+
if (!_convSettings.ShowVerboseLog || string.IsNullOrEmpty(tokenStats.Prompt)) return;
3939

4040
var agentService = _services.GetRequiredService<IAgentService>();
4141
var agent = await agentService.LoadAgent(message.CurrentAgentId);

src/Plugins/BotSharp.Plugin.OpenAI/Models/Realtime/RealtimeSessionBody.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class RealtimeSessionBody
4747
public FunctionDef[] Tools { get; set; } = [];
4848

4949
[JsonPropertyName("turn_detection")]
50-
public RealtimeSessionTurnDetection TurnDetection { get; set; } = new();
50+
public RealtimeSessionTurnDetection? TurnDetection { get; set; } = new();
5151
}
5252

5353
public class RealtimeSessionTurnDetection

src/Plugins/BotSharp.Plugin.OpenAI/Models/Realtime/RealtimeSessionRequest.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,27 @@
1+
using BotSharp.Abstraction.Functions.Models;
2+
13
namespace BotSharp.Plugin.OpenAI.Models.Realtime;
24

3-
public class RealtimeSessionCreationRequest : RealtimeSessionBody
5+
public class RealtimeSessionCreationRequest
46
{
7+
[JsonPropertyName("model")]
8+
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
9+
public string Model { get; set; } = null!;
10+
11+
[JsonPropertyName("modalities")]
12+
public string[] Modalities { get; set; } = ["audio", "text"];
13+
14+
[JsonPropertyName("instructions")]
15+
public string Instructions { get; set; } = null!;
16+
17+
[JsonPropertyName("tool_choice")]
18+
public string ToolChoice { get; set; } = "auto";
19+
20+
[JsonPropertyName("tools")]
21+
public FunctionDef[] Tools { get; set; } = [];
522

23+
[JsonPropertyName("turn_detection")]
24+
public RealtimeSessionTurnDetection TurnDetection { get; set; } = new();
625
}
726

827
/// <summary>

0 commit comments

Comments
 (0)