From 4433bce1dc0b8088caab9c05206bd30dbf833abb Mon Sep 17 00:00:00 2001 From: Nurlan Mikayilov Date: Fri, 28 Mar 2025 12:54:50 +0400 Subject: [PATCH 1/5] fix: #1344 convert destObj to the correct property type before setting value --- src/WorkflowCore.DSL/Services/DefinitionLoader.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs index c5cc5e083..13ad32174 100644 --- a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs +++ b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs @@ -405,7 +405,7 @@ void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) stack.Push(child); } - stepProperty.SetValue(pStep, destObj); + stepProperty.SetValue(pStep, destObj.ToObject(stepProperty.PropertyType)); } return acn; } From 8508e54329e9857ec0e7d80c9f4eb82db71baa2d Mon Sep 17 00:00:00 2001 From: Nurlan Mikayilov Date: Sat, 29 Mar 2025 11:27:24 +0400 Subject: [PATCH 2/5] test: add integration test for handling complex input property in workflow - Added `should_execute_json_workflow_with_complex_input_type` test to verify handling of complex input properties. - Ensured `AssignTask` step correctly sets `Assignee` information in `FlowData`. - Included JSON workflow definition with complex input property. - Verified `Assignee` and `UnitInfo` properties are correctly set in the test. --- .../Scenarios/StoredJsonScenario.cs | 18 +++++++++++++ .../DataTypes/FlowData.cs | 10 +++++++ .../Steps/AssignTask.cs | 27 +++++++++++++++++++ .../Steps/AssigneeInfo.cs | 20 ++++++++++++++ test/WorkflowCore.TestAssets/Utils.cs | 6 +++++ .../WorkflowCore.TestAssets.csproj | 4 +++ .../def-complex-input-property.json | 24 +++++++++++++++++ 7 files changed, 109 insertions(+) create mode 100644 test/WorkflowCore.TestAssets/DataTypes/FlowData.cs create mode 100644 test/WorkflowCore.TestAssets/Steps/AssignTask.cs create mode 100644 test/WorkflowCore.TestAssets/Steps/AssigneeInfo.cs create mode 100644 test/WorkflowCore.TestAssets/def-complex-input-property.json diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs index 383eb39a4..a7bb49718 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs @@ -84,5 +84,23 @@ public void should_execute_json_workflow_with_dynamic_data() data["Counter6"].Should().Be(1); data["Counter10"].Should().Be(1); } + + + [Fact] + public void should_execute_json_workflow_with_complex_input_type() + { + var initialData = new FlowData(); + var workflowId = StartWorkflow(TestAssets.Utils.GetTestDefinitionJsonComplexInputProperty(), initialData); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + var data = GetData(workflowId); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + data.Assignee.Should().NotBeNull(); + data.Assignee.Name.Should().Be("John Doe"); + data.Assignee.UnitInfo.Should().NotBeNull(); + data.Assignee.UnitInfo.Name.Should().Be("IT Department"); + + } } } diff --git a/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs b/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs new file mode 100644 index 000000000..6d4ab9368 --- /dev/null +++ b/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs @@ -0,0 +1,10 @@ +using System; +using System.Linq; +using WorkflowCore.TestAssets.Steps; + +namespace WorkflowCore.TestAssets.DataTypes; + +public class FlowData +{ + public AssigneeInfo Assignee { get; set; } = new(); +} \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/Steps/AssignTask.cs b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs new file mode 100644 index 000000000..3128c7613 --- /dev/null +++ b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs @@ -0,0 +1,27 @@ +using System; +using System.Linq; +using WorkflowCore.Interface; +using WorkflowCore.Models; +using WorkflowCore.TestAssets.DataTypes; + +namespace WorkflowCore.TestAssets.Steps; + +public class AssignTask : StepBody +{ + public AssigneeInfo Assignee { get; set; } + + public override ExecutionResult Run(IStepExecutionContext context) + { + if (context.Workflow.Data is FlowData flowData) + { + flowData.Assignee = new AssigneeInfo + { + Id = Assignee.Id, + Name = Assignee.Name, + MemberType = Assignee.MemberType, + UnitInfo = Assignee.UnitInfo + }; + } + return ExecutionResult.Next(); + } +} \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/Steps/AssigneeInfo.cs b/test/WorkflowCore.TestAssets/Steps/AssigneeInfo.cs new file mode 100644 index 000000000..42901dbab --- /dev/null +++ b/test/WorkflowCore.TestAssets/Steps/AssigneeInfo.cs @@ -0,0 +1,20 @@ +using System; +using System.Linq; + +namespace WorkflowCore.TestAssets.Steps; + +public class AssigneeInfo +{ + public int Id { get; set; } + public string Name { get; set; } + public int MemberType { get; set; } + + public UnitInfo UnitInfo { get; set; } +} + +public class UnitInfo +{ + public int Id { get; set; } + public string Name { get; set; } + public int UnitType { get; set; } +} \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/Utils.cs b/test/WorkflowCore.TestAssets/Utils.cs index b7a7919cd..7fbbe562a 100644 --- a/test/WorkflowCore.TestAssets/Utils.cs +++ b/test/WorkflowCore.TestAssets/Utils.cs @@ -39,6 +39,12 @@ public static string GetTestDefinitionJsonMissingInputProperty() { return File.ReadAllText("stored-def-missing-input-property.json"); } + + + public static string GetTestDefinitionJsonComplexInputProperty() + { + return File.ReadAllText("def-complex-input-property.json"); + } } } diff --git a/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj b/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj index cbba0dccb..f4d354184 100644 --- a/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj +++ b/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj @@ -13,6 +13,7 @@ + @@ -31,6 +32,9 @@ Always + + Always + diff --git a/test/WorkflowCore.TestAssets/def-complex-input-property.json b/test/WorkflowCore.TestAssets/def-complex-input-property.json new file mode 100644 index 000000000..7a2583c0c --- /dev/null +++ b/test/WorkflowCore.TestAssets/def-complex-input-property.json @@ -0,0 +1,24 @@ +{ + "Id": "FlowWithComplexInputType", + "Version": 1, + "DataType": "WorkflowCore.TestAssets.DataTypes.FlowData, WorkflowCore.TestAssets", + "Steps": [ + { + "Id": "AssignTask", + "StepType": "WorkflowCore.TestAssets.Steps.AssignTask, WorkflowCore.TestAssets", + "Inputs": { + "Assignee": { + "id": 1, + "name": "John Doe", + "memberType": 1, + "unitInfo": { + "id": 1, + "name": "IT Department", + "unitType": 1 + } + } + }, + "Reason": "3" + } + ] +} \ No newline at end of file From 4be6bddd061a0cb06f2ab17121196597bfabf122 Mon Sep 17 00:00:00 2001 From: Nurlan Mikayilov Date: Wed, 9 Apr 2025 15:34:35 +0400 Subject: [PATCH 3/5] feat: add support for list and array of complex input types in workflow --- .../Services/DefinitionLoader.cs | 88 ++++++++++++++++++- .../Scenarios/StoredJsonScenario.cs | 30 +++++++ .../DataTypes/FlowData.cs | 3 + .../Steps/AssignTask.cs | 24 +++-- test/WorkflowCore.TestAssets/Utils.cs | 5 ++ .../WorkflowCore.TestAssets.csproj | 3 + .../def-complex-input-property.json | 3 +- .../def-list-of-complex-input-property.json | 57 ++++++++++++ 8 files changed, 201 insertions(+), 12 deletions(-) create mode 100644 test/WorkflowCore.TestAssets/def-list-of-complex-input-property.json diff --git a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs index 13ad32174..bff2b1725 100644 --- a/src/WorkflowCore.DSL/Services/DefinitionLoader.cs +++ b/src/WorkflowCore.DSL/Services/DefinitionLoader.cs @@ -201,13 +201,21 @@ private void AttachInputs(StepSourceV1 source, Type dataType, Type stepType, Wor continue; } - if ((input.Value is IDictionary) || (input.Value is IDictionary)) + if (input.Value is IDictionary || input.Value is IDictionary) { var acn = BuildObjectInputAction(input, dataParameter, contextParameter, environmentVarsParameter, stepProperty); step.Inputs.Add(new ActionParameter(acn)); continue; } + if (input.Value is IEnumerable list) + { + var acn = BuildListInputAction(list, dataParameter, contextParameter, environmentVarsParameter, + stepProperty); + step.Inputs.Add(new ActionParameter(acn)); + continue; + } + throw new ArgumentException($"Unknown type for input {input.Key} on {source.Id}"); } } @@ -253,7 +261,7 @@ private void AttachDirectlyOutput(KeyValuePair output, WorkflowS Action acn = (pStep, pData) => { - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ; + object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); propertyInfo.SetValue(pData, resolvedValue, new object[] { output.Key }); }; @@ -306,7 +314,7 @@ private void AttachNestedOutput(KeyValuePair output, WorkflowSte { var targetExpr = Expression.Lambda(memberExpression, dataParameter); object data = targetExpr.Compile().DynamicInvoke(pData); - object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ; + object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); propertyInfo.SetValue(data, resolvedValue, new object[] { items[1] }); }; @@ -379,6 +387,80 @@ void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) return acn; } + private static Action BuildListInputAction( + IEnumerable input, + ParameterExpression dataParameter, + ParameterExpression contextParameter, + ParameterExpression environmentVarsParameter, + PropertyInfo stepProperty) + { + void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) + { + if (input == null) + throw new ArgumentNullException(nameof(input)); + + var itemType = stepProperty.PropertyType.IsArray + ? stepProperty.PropertyType.GetElementType() + : stepProperty.PropertyType.GetGenericArguments().FirstOrDefault(); + + if (itemType == null) + throw new InvalidOperationException("Unable to determine the item type for stepProperty."); + + var processedItems = new List(); + + foreach (var item in input) + { + var obj = JObject.FromObject(item); + var stack = new Stack(); + stack.Push(obj); + + while (stack.Count > 0) + { + var current = stack.Pop(); + foreach (var prop in current.Properties().ToList()) + { + if (prop.Name.StartsWith("@")) + { + var expr = DynamicExpressionParser.ParseLambda( + new[] { dataParameter, contextParameter, environmentVarsParameter }, + typeof(object), + prop.Value.ToString()); + + var resolved = expr.Compile().DynamicInvoke(pData, pContext, + Environment.GetEnvironmentVariables()); + current.Remove(prop.Name); + current.Add(prop.Name.TrimStart('@'), JToken.FromObject(resolved)); + } + } + + foreach (var child in current.Children()) + stack.Push(child); + } + + processedItems.Add(obj.ToObject(itemType)); + } + + if (stepProperty.PropertyType.IsArray) + { + var array = Array.CreateInstance(itemType, processedItems.Count); + for (var i = 0; i < processedItems.Count; i++) + array.SetValue(processedItems[i], i); + stepProperty.SetValue(pStep, array); + } + else + { + var listInstance = Activator.CreateInstance(typeof(List<>).MakeGenericType(itemType)); + var addMethod = listInstance.GetType().GetMethod("Add"); + foreach (var item in processedItems) + addMethod?.Invoke(listInstance, new[] { item }); + + stepProperty.SetValue(pStep, listInstance); + } + } + + return acn; + } + private static Action BuildObjectInputAction(KeyValuePair input, ParameterExpression dataParameter, ParameterExpression contextParameter, ParameterExpression environmentVarsParameter, PropertyInfo stepProperty) { void acn(IStepBody pStep, object pData, IStepExecutionContext pContext) diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs index a7bb49718..f1e35ee98 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StoredJsonScenario.cs @@ -102,5 +102,35 @@ public void should_execute_json_workflow_with_complex_input_type() data.Assignee.UnitInfo.Name.Should().Be("IT Department"); } + + [Fact] + public void should_execute_json_workflow_with_list_of_complex_input_type() + { + var initialData = new FlowData(); + var workflowId = StartWorkflow(TestAssets.Utils.GetTestDefinitionJsonListOfComplexInputProperty(), initialData); + WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30)); + + var data = GetData(workflowId); + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + UnhandledStepErrors.Count.Should().Be(0); + + data.AssigneeList.Should().NotBeNullOrEmpty(); + data.AssigneeList.Count.Should().Be(2); + + data.Assignee.Should().NotBeNull(); + data.Assignee.Name.Should().Be("John Doe"); + data.Assignee.UnitInfo?.Name.Should().Be("IT Department"); + + data.AssigneeList[0]?.Name.Should().Be(@"Nurlan Mikayilov"); + data.AssigneeList[0]?.UnitInfo?.Name.Should().Be("IT Department"); + + data.AssigneeList[1]?.Name.Should().Be(@"Jala Mammadova"); + data.AssigneeList[1]?.UnitInfo?.Name.Should().Be("General Department"); + + data.AssigneeArray[0]?.Name.Should().Be(@"Amin Nabiyev"); + data.AssigneeArray[0]?.UnitInfo?.Name.Should().Be("IT Department"); + + + } } } diff --git a/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs b/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs index 6d4ab9368..eaac2981b 100644 --- a/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs +++ b/test/WorkflowCore.TestAssets/DataTypes/FlowData.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using WorkflowCore.TestAssets.Steps; @@ -7,4 +8,6 @@ namespace WorkflowCore.TestAssets.DataTypes; public class FlowData { public AssigneeInfo Assignee { get; set; } = new(); + public List AssigneeList { get; set; } = []; + public AssigneeInfo[] AssigneeArray { get; set; } = []; } \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/Steps/AssignTask.cs b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs index 3128c7613..3f977939e 100644 --- a/test/WorkflowCore.TestAssets/Steps/AssignTask.cs +++ b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using WorkflowCore.Interface; using WorkflowCore.Models; @@ -8,19 +9,28 @@ namespace WorkflowCore.TestAssets.Steps; public class AssignTask : StepBody { - public AssigneeInfo Assignee { get; set; } + public AssigneeInfo? Assignee { get; set; } + public List AssigneeList { get; set; } + + public AssigneeInfo[] AssigneeArray { get; set; } = []; public override ExecutionResult Run(IStepExecutionContext context) { if (context.Workflow.Data is FlowData flowData) { - flowData.Assignee = new AssigneeInfo + if (Assignee != null) { - Id = Assignee.Id, - Name = Assignee.Name, - MemberType = Assignee.MemberType, - UnitInfo = Assignee.UnitInfo - }; + flowData.Assignee = new AssigneeInfo + { + Id = Assignee.Id, + Name = Assignee.Name, + MemberType = Assignee.MemberType, + UnitInfo = Assignee.UnitInfo + }; + } + + flowData.AssigneeList.AddRange(AssigneeList); + flowData.AssigneeArray = AssigneeArray.ToArray(); } return ExecutionResult.Next(); } diff --git a/test/WorkflowCore.TestAssets/Utils.cs b/test/WorkflowCore.TestAssets/Utils.cs index 7fbbe562a..adf6df2c9 100644 --- a/test/WorkflowCore.TestAssets/Utils.cs +++ b/test/WorkflowCore.TestAssets/Utils.cs @@ -45,6 +45,11 @@ public static string GetTestDefinitionJsonComplexInputProperty() { return File.ReadAllText("def-complex-input-property.json"); } + + public static string GetTestDefinitionJsonListOfComplexInputProperty() + { + return File.ReadAllText("def-list-of-complex-input-property.json"); + } } } diff --git a/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj b/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj index f4d354184..4c7a862a8 100644 --- a/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj +++ b/test/WorkflowCore.TestAssets/WorkflowCore.TestAssets.csproj @@ -17,6 +17,9 @@ + + Always + Always diff --git a/test/WorkflowCore.TestAssets/def-complex-input-property.json b/test/WorkflowCore.TestAssets/def-complex-input-property.json index 7a2583c0c..8849f0b27 100644 --- a/test/WorkflowCore.TestAssets/def-complex-input-property.json +++ b/test/WorkflowCore.TestAssets/def-complex-input-property.json @@ -17,8 +17,7 @@ "unitType": 1 } } - }, - "Reason": "3" + } } ] } \ No newline at end of file diff --git a/test/WorkflowCore.TestAssets/def-list-of-complex-input-property.json b/test/WorkflowCore.TestAssets/def-list-of-complex-input-property.json new file mode 100644 index 000000000..f4508fba7 --- /dev/null +++ b/test/WorkflowCore.TestAssets/def-list-of-complex-input-property.json @@ -0,0 +1,57 @@ +{ + "Id": "FlowWithListOfComplexInputType", + "Version": 1, + "DataType": "WorkflowCore.TestAssets.DataTypes.FlowData, WorkflowCore.TestAssets", + "Steps": [ + { + "Id": "AssignTask", + "StepType": "WorkflowCore.TestAssets.Steps.AssignTask, WorkflowCore.TestAssets", + "Inputs": { + "AssigneeList": [ + { + "id": 1, + "name": "Nurlan Mikayilov", + "memberType": 1, + "unitInfo": { + "id": 2, + "name": "IT Department", + "unitType": 1 + } + }, + { + "id": 2, + "name": "Jala Mammadova", + "memberType": 2, + "unitInfo": { + "id": 1, + "name": "General Department", + "unitType": 1 + } + } + ], + "AssigneeArray": [ + { + "id": 3, + "name": "Amin Nabiyev", + "memberType": 1, + "unitInfo": { + "id": 2, + "name": "IT Department", + "unitType": 1 + } + } + ], + "Assignee": { + "id": 1, + "name": "John Doe", + "memberType": 1, + "unitInfo": { + "id": 1, + "name": "IT Department", + "unitType": 1 + } + } + } + } + ] +} \ No newline at end of file From 176a4b4e0425ba12cd26f6fcc062f4844a503fb3 Mon Sep 17 00:00:00 2001 From: Nurlan Mikayilov Date: Wed, 9 Apr 2025 15:54:42 +0400 Subject: [PATCH 4/5] fix: initialize AssigneeList and AssigneeArray to empty collections --- test/WorkflowCore.TestAssets/Steps/AssignTask.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/WorkflowCore.TestAssets/Steps/AssignTask.cs b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs index 3f977939e..eb2159494 100644 --- a/test/WorkflowCore.TestAssets/Steps/AssignTask.cs +++ b/test/WorkflowCore.TestAssets/Steps/AssignTask.cs @@ -9,8 +9,8 @@ namespace WorkflowCore.TestAssets.Steps; public class AssignTask : StepBody { - public AssigneeInfo? Assignee { get; set; } - public List AssigneeList { get; set; } + public AssigneeInfo Assignee { get; set; } + public List AssigneeList { get; set; } = []; public AssigneeInfo[] AssigneeArray { get; set; } = []; From 9caaf03b91c968016319adc0abc797abaa3b5cd4 Mon Sep 17 00:00:00 2001 From: Nurlan Mikayilov Date: Fri, 4 Jul 2025 09:07:12 +0400 Subject: [PATCH 5/5] feat: upgrade RabbitMQ client to version 7.1.2 and refactor connection methods to use async/await --- .../ServiceCollectionExtensions.cs | 19 ++++-- .../Services/RabbitMQProvider.cs | 60 +++++++++---------- ...orkflowCore.QueueProviders.RabbitMQ.csproj | 2 +- 3 files changed, 44 insertions(+), 37 deletions(-) diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs index 4474e403a..86a1c5b7c 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/ServiceCollectionExtensions.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection.Extensions; using WorkflowCore.Interface; using WorkflowCore.Models; @@ -19,8 +20,7 @@ public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, IConnect if (options == null) throw new ArgumentNullException(nameof(options)); if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory)); - return options - .UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(name)); + return options.UseRabbitMQ(async (sp, name) => await connectionFactory.CreateConnectionAsync(name)); } public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, @@ -31,16 +31,23 @@ public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, if (connectionFactory == null) throw new ArgumentNullException(nameof(connectionFactory)); if (hostnames == null) throw new ArgumentNullException(nameof(hostnames)); - return options - .UseRabbitMQ((sp, name) => connectionFactory.CreateConnection(hostnames.ToList(), name)); + return options.UseRabbitMQ(async (sp, name) => await connectionFactory.CreateConnectionAsync(hostnames.ToList(), name)); } - - public static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, RabbitMqConnectionFactory rabbitMqConnectionFactory) + + private static WorkflowOptions UseRabbitMQ(this WorkflowOptions options, Func> rabbitMqConnectionFactory) { if (options == null) throw new ArgumentNullException(nameof(options)); if (rabbitMqConnectionFactory == null) throw new ArgumentNullException(nameof(rabbitMqConnectionFactory)); options.Services.AddSingleton(rabbitMqConnectionFactory); + + options.Services.AddSingleton( + sp => (provider, name) => + { + var connection = rabbitMqConnectionFactory(provider, name).GetAwaiter().GetResult(); + return connection; + }); + options.Services.TryAddSingleton(); options.UseQueueProvider(RabbitMqQueueProviderFactory); diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs index 8a4ab33bb..199c308d7 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/Services/RabbitMQProvider.cs @@ -1,5 +1,4 @@ -using Newtonsoft.Json; -using RabbitMQ.Client; +using RabbitMQ.Client; using System; using System.Linq; using System.Text; @@ -17,9 +16,8 @@ public class RabbitMQProvider : IQueueProvider private readonly IRabbitMqQueueNameProvider _queueNameProvider; private readonly RabbitMqConnectionFactory _rabbitMqConnectionFactory; private readonly IServiceProvider _serviceProvider; - - private IConnection _connection = null; - private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }; + + private IConnection _connection; public bool IsDequeueBlocking => false; @@ -37,11 +35,13 @@ public async Task QueueWork(string id, QueueType queue) if (_connection == null) throw new InvalidOperationException("RabbitMQ provider not running"); - using (var channel = _connection.CreateModel()) + using (var channel = await _connection.CreateChannelAsync()) { - channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, autoDelete: false, arguments: null); + await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), durable: true, exclusive: false, + autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(id); - channel.BasicPublish(exchange: "", routingKey: _queueNameProvider.GetQueueName(queue), basicProperties: null, body: body); + + await channel.BasicPublishAsync("", _queueNameProvider.GetQueueName(queue), false,body); } } @@ -50,34 +50,35 @@ public async Task DequeueWork(QueueType queue, CancellationToken cancell if (_connection == null) throw new InvalidOperationException("RabbitMQ provider not running"); - using (var channel = _connection.CreateModel()) + using (var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken)) { - channel.QueueDeclare(queue: _queueNameProvider.GetQueueName(queue), - durable: true, - exclusive: false, - autoDelete: false, - arguments: null); + await channel.QueueDeclareAsync(queue: _queueNameProvider.GetQueueName(queue), + durable: true, + exclusive: false, + autoDelete: false, + arguments: null, cancellationToken: cancellationToken); + + await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, + cancellationToken: cancellationToken); - channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); + var msg = await channel.BasicGetAsync(_queueNameProvider.GetQueueName(queue), false, cancellationToken); - var msg = channel.BasicGet(_queueNameProvider.GetQueueName(queue), false); - if (msg != null) + if (msg == null) { - var data = Encoding.UTF8.GetString(msg.Body.ToArray()); - channel.BasicAck(msg.DeliveryTag, false); - return data; + return null; } - return null; + + var data = Encoding.UTF8.GetString(msg.Body.ToArray()); + await channel.BasicAckAsync(msg.DeliveryTag, false, cancellationToken); + return data; } } - + public void Dispose() { - if (_connection != null) - { - if (_connection.IsOpen) - _connection.Close(); - } + if (_connection == null) return; + if (_connection.IsOpen) + _connection.CloseAsync(); } public async Task Start() @@ -89,11 +90,10 @@ public async Task Stop() { if (_connection != null) { - _connection.Close(); + await _connection.CloseAsync(); _connection = null; } } - } #pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously -} +} \ No newline at end of file diff --git a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj index df02daa76..8c7707439 100644 --- a/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj +++ b/src/providers/WorkflowCore.QueueProviders.RabbitMQ/WorkflowCore.QueueProviders.RabbitMQ.csproj @@ -23,7 +23,7 @@ - +