Skip to content

Use dapr/durabletask-java #1336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ private void registerWorkflowsAndActivities(ApplicationContext applicationContex
workflowRuntimeBuilder.registerActivity(activity);
}

try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
LOGGER.info("Starting workflow runtime ... ");
runtime.start(false);
}
WorkflowRuntime runtime = workflowRuntimeBuilder.build();
LOGGER.info("Starting workflow runtime ... ");
runtime.start(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package io.dapr.examples.unittesting;

import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(ToUpperCaseActivity.class);

// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(ReverseActivity.class);

// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DemoContinueAsNewWorker {
/**
* The main method of this app.
Expand All @@ -25,13 +28,14 @@ public class DemoContinueAsNewWorker {
*/
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoContinueAsNewWorkflow.class);
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().
registerWorkflow(DemoContinueAsNewWorkflow.class)
.withExecutorService(Executors.newFixedThreadPool(3));
builder.registerActivity(CleanUpActivity.class);

// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(DenyActivity.class);

// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(CountWordsActivity.class);

// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package io.dapr.examples.workflows.faninout;

import com.microsoft.durabletask.Task;
import io.dapr.durabletask.Task;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<!--
manually declare durabletask-client's jackson dependencies for workflows sdk
which conflict with dapr-sdk's jackson dependencies
https://github.com/microsoft/durabletask-java/blob/main/client/build.gradle#L16
https://github.com/dapr/durabletask-java/blob/main/client/build.gradle#L16
-->
<jackson.version>2.16.1</jackson.version>
<gpg.skip>true</gpg.skip>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,9 @@ static void daprProperties(DynamicPropertyRegistry registry) {
*/
@BeforeEach
public void init() {
try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
System.out.println("Start workflow runtime");
runtime.start(false);
}
WorkflowRuntime runtime = workflowRuntimeBuilder.build();
System.out.println("Start workflow runtime");
runtime.start(false);
}

@Test
Expand Down
6 changes: 3 additions & 3 deletions sdk-workflows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft</groupId>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.0</version>
<version>1.5.2</version>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies
which conflict with dapr-sdk's jackson dependencies
https://github.com/microsoft/durabletask-java/blob/main/client/build.gradle#L16
https://github.com/dapr/durabletask-java/blob/main/client/build.gradle#L16
-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

package io.dapr.workflows;

import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskFailedException;
import org.slf4j.Logger;

import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

package io.dapr.workflows.client;

import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.PurgeResult;
import io.dapr.config.Properties;
import io.dapr.durabletask.DurableTaskClient;
import io.dapr.durabletask.DurableTaskGrpcClientBuilder;
import io.dapr.durabletask.NewOrchestrationInstanceOptions;
import io.dapr.durabletask.OrchestrationMetadata;
import io.dapr.durabletask.PurgeResult;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskActivityContext;
import io.dapr.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivityContext;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.RetryPolicy;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.RetryPolicy;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.FailureDetails;
import io.dapr.durabletask.FailureDetails;
import io.dapr.workflows.client.WorkflowFailureDetails;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.FailureDetails;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.OrchestrationMetadata;
import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.client.WorkflowFailureDetails;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskActivity;
import com.microsoft.durabletask.TaskActivityFactory;
import io.dapr.durabletask.TaskActivity;
import io.dapr.durabletask.TaskActivityFactory;
import io.dapr.workflows.WorkflowActivity;

import java.lang.reflect.Constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskActivity;
import com.microsoft.durabletask.TaskActivityFactory;
import io.dapr.durabletask.TaskActivity;
import io.dapr.durabletask.TaskActivityFactory;
import io.dapr.workflows.WorkflowActivity;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.durabletask.TaskOrchestration;
import io.dapr.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;

import java.lang.reflect.Constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.durabletask.TaskOrchestration;
import io.dapr.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,34 @@

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.DurableTaskGrpcWorker;
import io.dapr.durabletask.DurableTaskGrpcWorker;
import io.grpc.ManagedChannel;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Contains methods to register workflows and activities.
*/
public class WorkflowRuntime implements AutoCloseable {

private DurableTaskGrpcWorker worker;
private final DurableTaskGrpcWorker worker;
private final ManagedChannel managedChannel;
private final ExecutorService executorService;

public WorkflowRuntime(DurableTaskGrpcWorker worker) {
/**
* Constructor.
*
* @param worker grpcWorker processing activities.
* @param managedChannel grpc channel.
* @param executorService executor service responsible for running the threads.
*/
public WorkflowRuntime(DurableTaskGrpcWorker worker,
ManagedChannel managedChannel,
ExecutorService executorService) {
this.worker = worker;
this.managedChannel = managedChannel;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artur-ciocanu @siri-varma the DaprWorkflowsIT (in the sdk-tests) is failing to connect and it is trying to access the wrong port: https://github.com/dapr/java-sdk/actions/runs/14909864380/job/41881277939?pr=1336 I have the feeling that having this managedChannel here is causing the mappings to go wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking right now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

70c0141
496f918

@salaboy found the issue. These two commits will fix it.

this.executorService = executorService;
}

/**
Expand All @@ -50,11 +67,31 @@
/**
* {@inheritDoc}
*/
@Override
public void close() {
if (this.worker != null) {
this.worker.close();
this.worker = null;
this.shutDownWorkerPool();
this.closeSideCarChannel();
}

private void closeSideCarChannel() {
this.managedChannel.shutdown();

try {
if (!this.managedChannel.awaitTermination(60, TimeUnit.SECONDS)) {
this.managedChannel.shutdownNow();

Check warning on line 80 in sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java#L80

Added line #L80 was not covered by tests
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();

Check warning on line 83 in sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java#L82-L83

Added lines #L82 - L83 were not covered by tests
}
}

private void shutDownWorkerPool() {
this.executorService.shutdown();
try {
if (!this.executorService.awaitTermination(60, TimeUnit.SECONDS)) {
this.executorService.shutdownNow();

Check warning on line 91 in sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java#L91

Added line #L91 was not covered by tests
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();

Check warning on line 94 in sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java#L93-L94

Added lines #L93 - L94 were not covered by tests
}
}
}
Loading
Loading