Skip to content

Features/refine conversation functionality #361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class Conversation

public string Channel { get; set; } = ConversationChannel.OpenAPI;

public int DialogCount { get; set; }

public DateTime UpdatedTime { get; set; } = DateTime.UtcNow;
public DateTime CreatedTime { get; set; } = DateTime.UtcNow;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class RichContentJsonConverter : JsonConverter<IRichMessage>
if (root.TryGetProperty("rich_type", out JsonElement element))
{
var richType = element.GetString();
res = parser.ParseRichMessage(richType, jsonText, options);
res = parser.ParseRichMessage(richType, jsonText, root, options);
}

return res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class TemplateMessageJsonConverter : JsonConverter<ITemplateMessage>
if (root.TryGetProperty("template_type", out JsonElement element))
{
var templateType = element.GetString();
res = parser.ParseTemplateMessage(templateType, jsonText, options);
res = parser.ParseTemplateMessage(templateType, jsonText, root, options);
}

return res;
Expand Down
26 changes: 24 additions & 2 deletions src/Infrastructure/BotSharp.Abstraction/Messaging/MessageParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public MessageParser()
{
}

public IRichMessage? ParseRichMessage(string richType, string jsonText, JsonSerializerOptions options)
public IRichMessage? ParseRichMessage(string richType, string jsonText, JsonElement root, JsonSerializerOptions options)
{
IRichMessage? res = null;

Expand All @@ -36,11 +36,22 @@ public MessageParser()
{
res = JsonSerializer.Deserialize<TextMessage>(jsonText, options);
}
else if (richType == RichTypeEnum.GenericTemplate)
{
if (root.TryGetProperty("element_type", out var element))
{
var elementType = element.GetString();
if (elementType == typeof(GenericElement).Name)
{
res = JsonSerializer.Deserialize<GenericTemplateMessage<GenericElement>>(jsonText, options);
}
}
}

return res;
}

public ITemplateMessage? ParseTemplateMessage(string templateType, string jsonText, JsonSerializerOptions options)
public ITemplateMessage? ParseTemplateMessage(string templateType, string jsonText, JsonElement root, JsonSerializerOptions options)
{
ITemplateMessage? res = null;

Expand All @@ -60,6 +71,17 @@ public MessageParser()
{
res = JsonSerializer.Deserialize<ProductTemplateMessage>(jsonText, options);
}
else if (templateType == TemplateTypeEnum.Generic)
{
if (root.TryGetProperty("element_type", out var element))
{
var elementType = element.GetString();
if (elementType == typeof(GenericElement).Name)
{
res = JsonSerializer.Deserialize<GenericTemplateMessage<GenericElement>>(jsonText, options);
}
}
}

return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void Append(string conversationId, RoleDialogModel dialog)
AgentId = agentId,
MessageId = dialog.MessageId,
FunctionName = dialog.FunctionName,
CreateTime = dialog.CreatedAt
CreateTime = DateTime.UtcNow
};

var content = dialog.Content.RemoveNewLine();
Expand All @@ -65,7 +65,7 @@ public void Append(string conversationId, RoleDialogModel dialog)
MessageId = dialog.MessageId,
SenderId = dialog.SenderId,
FunctionName = dialog.FunctionName,
CreateTime = dialog.CreatedAt
CreateTime = DateTime.UtcNow
};

var content = dialog.Content.RemoveNewLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void AppendConversationDialogs(string conversationId, List<DialogElement>
var conv = JsonSerializer.Deserialize<Conversation>(json, _options);
if (conv != null)
{
conv.DialogCount += dialogs.Count();
conv.UpdatedTime = DateTime.UtcNow;
File.WriteAllText(convFile, JsonSerializer.Serialize(conv, _options));
}
Expand Down Expand Up @@ -353,23 +354,40 @@ public List<string> GetIdleConversations(int batchSize, int messageLimit, int bu
batchSize = batchLimit;
}

if (bufferHours <= 0)
{
bufferHours = 12;
}

if (messageLimit <= 0)
{
messageLimit = 2;
}

foreach (var d in Directory.GetDirectories(dir))
{
var convFile = Path.Combine(d, CONVERSATION_FILE);
if (!File.Exists(convFile))
{
Directory.Delete(d, true);
continue;
}

var json = File.ReadAllText(convFile);
var conv = JsonSerializer.Deserialize<Conversation>(json, _options);
if (conv == null || conv.UpdatedTime > utcNow.AddHours(-bufferHours))

if (conv == null)
{
Directory.Delete(d, true);
continue;
}

var dialogs = GetConversationDialogs(conv.Id);
if (dialogs.Count <= messageLimit)
if (conv.UpdatedTime > utcNow.AddHours(-bufferHours))
{
continue;
}

if (conv.DialogCount <= messageLimit)
{
ids.Add(conv.Id);
if (ids.Count >= batchSize)
Expand Down Expand Up @@ -398,7 +416,7 @@ public bool TruncateConversation(string conversationId, string messageId, bool c
if (foundIdx < 0) return false;

// Handle truncated dialogs
var isSaved = HandleTruncatedDialogs(dialogDir, dialogs, foundIdx);
var isSaved = HandleTruncatedDialogs(convDir, dialogDir, dialogs, foundIdx);
if (!isSaved) return false;

// Handle truncated states
Expand Down Expand Up @@ -489,10 +507,18 @@ private List<StateKeyValue> CollectConversationStates(string stateFile)
return states ?? new List<StateKeyValue>();
}

private bool HandleTruncatedDialogs(string dialogDir, List<DialogElement> dialogs, int foundIdx)
private bool HandleTruncatedDialogs(string convDir, string dialogDir, List<DialogElement> dialogs, int foundIdx)
{
var truncatedDialogs = dialogs.Where((x, idx) => idx < foundIdx).ToList();
var isSaved = SaveTruncatedDialogs(dialogDir, truncatedDialogs);
var convFile = Path.Combine(convDir, CONVERSATION_FILE);
var convJson = File.ReadAllText(convFile);
var conv = JsonSerializer.Deserialize<Conversation>(convJson, _options);
if (conv != null)
{
conv.DialogCount = truncatedDialogs.Count;
File.WriteAllText(convFile, JsonSerializer.Serialize(conv, _options));
}
return isSaved;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@ public ConversationTimeoutService(IServiceProvider services, ILogger<Conversatio

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Conversation Timeout Service is running.");
_logger.LogInformation("Conversation Timeout Service is running...");

_ = Task.Run(async () =>
{
await DoWork(stoppingToken);
});
}

private async Task DoWork(CancellationToken stoppingToken)
{
_logger.LogInformation("Conversation Timeout Service is doing work...");

try
{
while (true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class ConversationDocument : MongoBase
public string Title { get; set; }
public string Channel { get; set; }
public string Status { get; set; }
public int DialogCount { get; set; }
public DateTime CreatedTime { get; set; }
public DateTime UpdatedTime { get; set; }
public DateTime Breakpoint { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void AppendConversationDialogs(string conversationId, List<DialogElement>
var filterDialog = Builders<ConversationDialogDocument>.Filter.Eq(x => x.ConversationId, conversationId);
var dialogElements = dialogs.Select(x => DialogMongoElement.ToMongoElement(x)).ToList();
var updateDialog = Builders<ConversationDialogDocument>.Update.PushEach(x => x.Dialogs, dialogElements);
var updateConv = Builders<ConversationDocument>.Update.Set(x => x.UpdatedTime, DateTime.UtcNow);
var updateConv = Builders<ConversationDocument>.Update.Set(x => x.UpdatedTime, DateTime.UtcNow)
.Inc(x => x.DialogCount, dialogs.Count);

_dc.ConversationDialogs.UpdateOne(filterDialog, updateDialog);
_dc.Conversations.UpdateOne(filterConv, updateConv);
Expand Down Expand Up @@ -168,13 +169,16 @@ public ConversationState GetConversationStates(string conversationId)

public void UpdateConversationStates(string conversationId, List<StateKeyValue> states)
{
if (string.IsNullOrEmpty(conversationId) || states.IsNullOrEmpty()) return;
if (string.IsNullOrEmpty(conversationId) || states == null) return;

var filterConv = Builders<ConversationDocument>.Filter.Eq(x => x.Id, conversationId);
var filterStates = Builders<ConversationStateDocument>.Filter.Eq(x => x.ConversationId, conversationId);
var saveStates = states.Select(x => StateMongoElement.ToMongoElement(x)).ToList();
var updateStates = Builders<ConversationStateDocument>.Update.Set(x => x.States, saveStates);
var updateConv = Builders<ConversationDocument>.Update.Set(x => x.UpdatedTime, DateTime.UtcNow);

_dc.ConversationStates.UpdateOne(filterStates, updateStates);
_dc.Conversations.UpdateOne(filterConv, updateConv);
}

public void UpdateConversationStatus(string conversationId, string status)
Expand Down Expand Up @@ -220,6 +224,7 @@ public Conversation GetConversation(string conversationId)
Status = conv.Status,
Dialogs = dialogElements,
States = curStates,
DialogCount = conv.DialogCount,
CreatedTime = conv.CreatedTime,
UpdatedTime = conv.UpdatedTime,
Breakpoint = conv.Breakpoint
Expand Down Expand Up @@ -308,6 +313,7 @@ public PagedItems<Conversation> GetConversations(ConversationFilter filter)
Title = conv.Title,
Channel = conv.Channel,
Status = conv.Status,
DialogCount = conv.DialogCount,
CreatedTime = conv.CreatedTime,
UpdatedTime = conv.UpdatedTime,
Breakpoint = conv.Breakpoint
Expand Down Expand Up @@ -335,6 +341,7 @@ public List<Conversation> GetLastConversations()
Title = c.Title,
Channel = c.Channel,
Status = c.Status,
DialogCount = c.DialogCount,
CreatedTime = c.CreatedTime,
UpdatedTime = c.UpdatedTime,
Breakpoint = c.Breakpoint
Expand All @@ -353,11 +360,21 @@ public List<string> GetIdleConversations(int batchSize, int messageLimit, int bu
batchSize = batchLimit;
}

if (bufferHours <= 0)
{
bufferHours = 12;
}

if (messageLimit <= 0)
{
messageLimit = 2;
}

while (true)
{
var skip = (page - 1) * batchSize;
var candidates = _dc.Conversations.AsQueryable()
.Where(x => x.UpdatedTime <= utcNow.AddHours(-bufferHours))
.Where(x => (x.DialogCount <= messageLimit) && x.UpdatedTime <= utcNow.AddHours(-bufferHours))
.Skip(skip)
.Take(batchSize)
.Select(x => x.Id)
Expand All @@ -367,13 +384,8 @@ public List<string> GetIdleConversations(int batchSize, int messageLimit, int bu
{
break;
}

var targets = _dc.ConversationDialogs.AsQueryable()
.Where(x => candidates.Contains(x.ConversationId) && x.Dialogs != null && x.Dialogs.Count <= messageLimit)
.Select(x => x.ConversationId)
.ToList();

conversationIds = conversationIds.Concat(targets).ToList();

conversationIds = conversationIds.Concat(candidates).Distinct().ToList();
if (conversationIds.Count >= batchSize)
{
break;
Expand Down Expand Up @@ -420,10 +432,16 @@ public bool TruncateConversation(string conversationId, string messageId, bool c
_dc.ConversationStates.ReplaceOne(stateFilter, foundStates);
}

// Save
// Save dialogs
foundDialog.Dialogs = truncatedDialogs;
_dc.ConversationDialogs.ReplaceOne(dialogFilter, foundDialog);

// Update conversation
var convFilter = Builders<ConversationDocument>.Filter.Eq(x => x.Id, conversationId);
var updateConv = Builders<ConversationDocument>.Update.Set(x => x.UpdatedTime, DateTime.UtcNow)
.Set(x => x.DialogCount, truncatedDialogs.Count);
_dc.Conversations.UpdateOne(convFilter, updateConv);

// Remove logs
if (cleanLog)
{
Expand Down