From b74c247d01eeec64ce887eb6418039fa623208f1 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 28 Feb 2025 16:32:37 -0500 Subject: [PATCH 1/2] update python examples for workflow; update conversation quickstart to python sdk Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 181 ++++++++++-------- .../quickstarts/conversation-quickstart.md | 57 +++--- .../quickstarts/workflow-quickstart.md | 9 - 3 files changed, 121 insertions(+), 126 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 3345b97b2a8..ae2285d3720 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -36,16 +36,31 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si -Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. +Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input ```python -def hello_act(ctx: WorkflowActivityContext, input): - global counter - counter += input - print(f'New counter value is: {counter}!', flush=True) +@wfr.activity(name='step10') +def step1(ctx, activity_input): + print(f'Step 1: Received input: {activity_input}.') + # Do some work + return activity_input + 1 + + +@wfr.activity +def step2(ctx, activity_input): + print(f'Step 2: Received input: {activity_input}.') + # Do some work + return activity_input * 2 + + +@wfr.activity +def step3(ctx, activity_input): + print(f'Step 3: Received input: {activity_input}.') + # Do some work + return activity_input ^ 2 ``` -[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59) +[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py) {{% /codetab %}} @@ -226,16 +241,19 @@ Next, register and call the activites in a workflow. -The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. +The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. ```python -def hello_world_wf(ctx: DaprWorkflowContext, input): - print(f'{input}') - yield ctx.call_activity(hello_act, input=1) - yield ctx.call_activity(hello_act, input=10) - yield ctx.wait_for_external_event("event1") - yield ctx.call_activity(hello_act, input=100) - yield ctx.call_activity(hello_act, input=1000) +@wfr.workflow(name='random_workflow') +def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + try: + result1 = yield ctx.call_activity(step1, input=wf_input) + result2 = yield ctx.call_activity(step2, input=result1) + result3 = yield ctx.call_activity(step3, input=result2) + except Exception as e: + yield ctx.call_activity(error_handler, input=str(e)) + raise + return [result1, result2, result3] ``` [See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) @@ -409,82 +427,77 @@ Finally, compose the application using the workflow. - A Python package called `DaprClient` to receive the Python SDK capabilities. - A builder with extensions called: - - `WorkflowRuntime`: Allows you to register workflows and workflow activities - - `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}}) + - `WorkflowRuntime`: Allows you to register the workflow runtime. + - `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) - API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. ```python -from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext -from dapr.clients import DaprClient - -# ... - -def main(): - with DaprClient() as d: - host = settings.DAPR_RUNTIME_HOST - port = settings.DAPR_GRPC_PORT - workflowRuntime = WorkflowRuntime(host, port) - workflowRuntime = WorkflowRuntime() - workflowRuntime.register_workflow(hello_world_wf) - workflowRuntime.register_activity(hello_act) - workflowRuntime.start() - - # Start workflow - print("==========Start Counter Increase as per Input:==========") - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") - - # ... - - # Pause workflow - d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}") - - # Resume workflow - d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}") - - sleep(1) - # Raise workflow - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) - - sleep(5) - # Purge workflow - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") - - # Kick off another workflow for termination purposes - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") - - # Terminate workflow - d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) - sleep(1) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}") - - # Purge workflow - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") - - workflowRuntime.shutdown() +from durabletask import worker, task + +from dapr.ext.workflow.workflow_context import Workflow +from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext +from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext +from dapr.ext.workflow.util import getAddress + +from dapr.clients import DaprInternalError +from dapr.clients.http.client import DAPR_API_TOKEN_HEADER +from dapr.conf import settings +from dapr.conf.helpers import GrpcEndpoint +from dapr.ext.workflow.logger import LoggerOptions, Logger + +wfr = wf.WorkflowRuntime() + + @wfr.workflow(name='hello_world_wf') + def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + # Workflow definition... + + @wfr.activity(name='hello_act') + def hello_act(ctx: WorkflowActivityContext, wf_input): + # Activity definition... + + # Start workflow + wfr = WorkflowRuntime() + wfr.start() + wf_client = DaprWorkflowClient() + + # ... + + # Pause workflow + wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + # ... check status ... + wf_client.resume_workflow(instance_id=instance_id) + + sleep(1) + + # Raise workflow + wf_client.raise_workflow_event( + instance_id=instance_id, + event_name=event_name, + data=event_data + ) + + # Purge workflow + state = wf_client.wait_for_workflow_completion( + instance_id, + timeout_in_seconds=30 + ) + wf_client.purge_workflow(instance_id=instance_id) + + workflowRuntime.shutdown() if __name__ == '__main__': - main() + wfr.start() + sleep(10) # wait for workflow runtime to start + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42) + print(f'Workflow started. Instance ID: {instance_id}') + state = wf_client.wait_for_workflow_completion(instance_id) + print(f'Workflow completed! Status: {state.runtime_status}') + + wfr.shutdown() ``` diff --git a/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md index 8abed6f58d3..e38701bfadd 100644 --- a/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md @@ -10,7 +10,7 @@ description: Get started with the Dapr conversation building block The conversation building block is currently in **alpha**. {{% /alert %}} -Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it for a poem about Dapr. +Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it to define Dapr. You can try out this conversation quickstart by either: @@ -18,7 +18,7 @@ You can try out this conversation quickstart by either: - [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}}) {{% alert title="Note" color="primary" %}} -Currently, only the HTTP quickstart sample is available in Python and JavaScript. +Currently, you can only use JavaScript for the quickstart sample using HTTP, not the JavaScript SDK. {{% /alert %}} ## Run the app with the template file @@ -50,7 +50,7 @@ git clone https://github.com/dapr/quickstarts.git From the root of the Quickstarts directory, navigate into the conversation directory: ```bash -cd conversation/python/http/conversation +cd conversation/python/sdk/conversation ``` Install the dependencies: @@ -61,7 +61,7 @@ pip3 install -r requirements.txt ### Step 3: Launch the conversation service -Navigate back to the `http` directory and start the conversation service with the following command: +Navigate back to the `sdk` directory and start the conversation service with the following command: ```bash dapr run -f . @@ -117,37 +117,28 @@ In the application code: - The mock LLM echoes "What is dapr?". ```python -import logging -import requests -import os - -logging.basicConfig(level=logging.INFO) - -base_url = os.getenv('BASE_URL', 'http://localhost') + ':' + os.getenv( - 'DAPR_HTTP_PORT', '3500') - -CONVERSATION_COMPONENT_NAME = 'echo' - -input = { - 'name': 'echo', - 'inputs': [{'message':'What is dapr?'}], - 'parameters': {}, - 'metadata': {} +from dapr.clients import DaprClient +from dapr.clients.grpc._request import ConversationInput + +with DaprClient() as d: + inputs = [ + ConversationInput(content="What is dapr?", role='user', scrub_pii=True), + ] + + metadata = { + 'model': 'modelname', + 'key': 'authKey', + 'cacheTTL': '10m', } -# Send input to conversation endpoint -result = requests.post( - url='%s/v1.0-alpha1/conversation/%s/converse' % (base_url, CONVERSATION_COMPONENT_NAME), - json=input -) - -logging.info('Input sent: What is dapr?') + print('Input sent: What is dapr?') -# Parse conversation output -data = result.json() -output = data["outputs"][0]["result"] + response = d.converse_alpha1( + name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata + ) -logging.info('Output response: ' + output) + for output in response.outputs: + print(f'Output response: {output.result}') ``` {{% /codetab %}} @@ -575,7 +566,7 @@ git clone https://github.com/dapr/quickstarts.git From the root of the Quickstarts directory, navigate into the conversation directory: ```bash -cd conversation/python/http/conversation +cd conversation/python/sdk/conversation ``` Install the dependencies: @@ -586,7 +577,7 @@ pip3 install -r requirements.txt ### Step 3: Launch the conversation service -Navigate back to the `http` directory and start the conversation service with the following command: +Navigate back to the `sdk` directory and start the conversation service with the following command: ```bash dapr run --app-id conversation --resources-path ../../../components -- python3 app.py diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 5f50c6a9909..02929b31da4 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -251,7 +251,6 @@ class WorkflowConsoleApp: if __name__ == '__main__': app = WorkflowConsoleApp() app.main() - ``` #### `order-processor/workflow.py` @@ -276,7 +275,6 @@ wfr = WorkflowRuntime() logging.basicConfig(level=logging.INFO) - @wfr.workflow(name="order_processing_workflow") def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str): """Defines the order processing workflow. @@ -343,7 +341,6 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification): logger = logging.getLogger('NotifyActivity') logger.info(input.message) - @wfr.activity(name="process_payment_activity") def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest): """Defines Process Payment Activity.This is used by the workflow to process a payment""" @@ -353,7 +350,6 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest +' USD') logger.info(f'Payment for request ID {input.request_id} processed successfully') - @wfr.activity(name="verify_inventory_activity") def verify_inventory_activity(ctx: WorkflowActivityContext, input: InventoryRequest) -> InventoryResult: @@ -377,8 +373,6 @@ def verify_inventory_activity(ctx: WorkflowActivityContext, return InventoryResult(True, inventory_item) return InventoryResult(False, None) - - @wfr.activity(name="update_inventory_activity") def update_inventory_activity(ctx: WorkflowActivityContext, input: PaymentRequest) -> InventoryResult: @@ -401,8 +395,6 @@ def update_inventory_activity(ctx: WorkflowActivityContext, client.save_state(store_name, input.item_being_purchased, new_val) logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock') - - @wfr.activity(name="request_approval_activity") def request_approval_activity(ctx: WorkflowActivityContext, input: OrderPayload): @@ -413,7 +405,6 @@ def request_approval_activity(ctx: WorkflowActivityContext, logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for ' +f'{input["quantity"]}' +' ' +f'{input["item_name"]}') - ``` {{% /codetab %}} From 60d5332d1aaa1413296eead40cbb49afdd076624 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Mon, 17 Mar 2025 13:04:21 -0400 Subject: [PATCH 2/2] update author and manage workflow how-tos Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 283 ++++++++++++------ .../workflow/howto-manage-workflow.md | 27 +- 2 files changed, 200 insertions(+), 110 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index ae2285d3720..009850fae7c 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -36,31 +36,17 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si -Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input +Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. ```python -@wfr.activity(name='step10') -def step1(ctx, activity_input): - print(f'Step 1: Received input: {activity_input}.') - # Do some work - return activity_input + 1 - - -@wfr.activity -def step2(ctx, activity_input): - print(f'Step 2: Received input: {activity_input}.') - # Do some work - return activity_input * 2 - - -@wfr.activity -def step3(ctx, activity_input): - print(f'Step 3: Received input: {activity_input}.') - # Do some work - return activity_input ^ 2 +@wfr.activity(name='hello_act') +def hello_act(ctx: WorkflowActivityContext, wf_input): + global counter + counter += wf_input + print(f'New counter value is: {counter}!', flush=True) ``` -[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py) +[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py) {{% /codetab %}} @@ -241,22 +227,32 @@ Next, register and call the activites in a workflow. -The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. +The `hello_world_wf` function is a function derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. ```python -@wfr.workflow(name='random_workflow') -def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): - try: - result1 = yield ctx.call_activity(step1, input=wf_input) - result2 = yield ctx.call_activity(step2, input=result1) - result3 = yield ctx.call_activity(step3, input=result2) - except Exception as e: - yield ctx.call_activity(error_handler, input=str(e)) - raise - return [result1, result2, result3] +@wfr.workflow(name='hello_world_wf') +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) + + # Change in event handling: Use when_any to handle both event and timeout + event = ctx.wait_for_external_event(event_name) + timeout = ctx.create_timer(timedelta(seconds=30)) + winner = yield when_any([event, timeout]) + + if winner == timeout: + print('Workflow timed out waiting for event') + return 'Timeout' + + yield ctx.call_activity(hello_act, input=100) + yield ctx.call_activity(hello_act, input=1000) + return 'Completed' ``` -[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) +[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py) {{% /codetab %}} @@ -423,84 +419,177 @@ Finally, compose the application using the workflow. -[In the following example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py), for a basic Python hello world application using the Python SDK, your project code would include: +[In the following example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py), for a basic Python hello world application using the Python SDK, your project code would include: - A Python package called `DaprClient` to receive the Python SDK capabilities. - A builder with extensions called: - `WorkflowRuntime`: Allows you to register the workflow runtime. - - `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}}) + - `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) -- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. +- API calls. In the example below, these calls start, pause, resume, purge, and completing the workflow. ```python -from durabletask import worker, task - -from dapr.ext.workflow.workflow_context import Workflow -from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext -from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext -from dapr.ext.workflow.util import getAddress - -from dapr.clients import DaprInternalError -from dapr.clients.http.client import DAPR_API_TOKEN_HEADER -from dapr.conf import settings -from dapr.conf.helpers import GrpcEndpoint -from dapr.ext.workflow.logger import LoggerOptions, Logger - -wfr = wf.WorkflowRuntime() - - @wfr.workflow(name='hello_world_wf') - def hello_world_wf(ctx: DaprWorkflowContext, wf_input): - # Workflow definition... - - @wfr.activity(name='hello_act') - def hello_act(ctx: WorkflowActivityContext, wf_input): - # Activity definition... - - # Start workflow - wfr = WorkflowRuntime() - wfr.start() - wf_client = DaprWorkflowClient() - - # ... - - # Pause workflow - wf_client.pause_workflow(instance_id=instance_id) - metadata = wf_client.get_workflow_state(instance_id=instance_id) - # ... check status ... - wf_client.resume_workflow(instance_id=instance_id) - - sleep(1) - - # Raise workflow - wf_client.raise_workflow_event( - instance_id=instance_id, - event_name=event_name, - data=event_data - ) - - # Purge workflow - state = wf_client.wait_for_workflow_completion( - instance_id, - timeout_in_seconds=30 - ) - wf_client.purge_workflow(instance_id=instance_id) - - workflowRuntime.shutdown() +from datetime import timedelta +from time import sleep +from dapr.ext.workflow import ( + WorkflowRuntime, + DaprWorkflowContext, + WorkflowActivityContext, + RetryPolicy, + DaprWorkflowClient, + when_any, +) +from dapr.conf import Settings +from dapr.clients.exceptions import DaprInternalError + +settings = Settings() + +counter = 0 +retry_count = 0 +child_orchestrator_count = 0 +child_orchestrator_string = '' +child_act_retry_count = 0 +instance_id = 'exampleInstanceID' +child_instance_id = 'childInstanceID' +workflow_name = 'hello_world_wf' +child_workflow_name = 'child_wf' +input_data = 'Hi Counter!' +event_name = 'event1' +event_data = 'eventData' +non_existent_id_error = 'no such instance exists' + +retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=100), +) -if __name__ == '__main__': +wfr = WorkflowRuntime() + + +@wfr.workflow(name='hello_world_wf') +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) + + # Change in event handling: Use when_any to handle both event and timeout + event = ctx.wait_for_external_event(event_name) + timeout = ctx.create_timer(timedelta(seconds=30)) + winner = yield when_any([event, timeout]) + + if winner == timeout: + print('Workflow timed out waiting for event') + return 'Timeout' + + yield ctx.call_activity(hello_act, input=100) + yield ctx.call_activity(hello_act, input=1000) + return 'Completed' + + +@wfr.activity(name='hello_act') +def hello_act(ctx: WorkflowActivityContext, wf_input): + global counter + counter += wf_input + print(f'New counter value is: {counter}!', flush=True) + + +@wfr.activity(name='hello_retryable_act') +def hello_retryable_act(ctx: WorkflowActivityContext): + global retry_count + if (retry_count % 2) == 0: + print(f'Retry count value is: {retry_count}!', flush=True) + retry_count += 1 + raise ValueError('Retryable Error') + print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True) + retry_count += 1 + + +@wfr.workflow(name='child_retryable_wf') +def child_retryable_wf(ctx: DaprWorkflowContext): + global child_orchestrator_string, child_orchestrator_count + if not ctx.is_replaying: + child_orchestrator_count += 1 + print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True) + child_orchestrator_string += str(child_orchestrator_count) + yield ctx.call_activity( + act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy + ) + if child_orchestrator_count < 3: + raise ValueError('Retryable Error') + + +@wfr.activity(name='act_for_child_wf') +def act_for_child_wf(ctx: WorkflowActivityContext, inp): + global child_orchestrator_string, child_act_retry_count + inp_char = chr(96 + inp) + print(f'Appending {inp_char} to child_orchestrator_string!', flush=True) + child_orchestrator_string += inp_char + if child_act_retry_count % 2 == 0: + child_act_retry_count += 1 + raise ValueError('Retryable Error') + child_act_retry_count += 1 + + +def main(): wfr.start() - sleep(10) # wait for workflow runtime to start + wf_client = DaprWorkflowClient() + + print('==========Start Counter Increase as per Input:==========') + wf_client.schedule_new_workflow( + workflow=hello_world_wf, input=input_data, instance_id=instance_id + ) + + wf_client.wait_for_workflow_start(instance_id) - wf_client = wf.DaprWorkflowClient() - instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42) - print(f'Workflow started. Instance ID: {instance_id}') - state = wf_client.wait_for_workflow_completion(instance_id) - print(f'Workflow completed! Status: {state.runtime_status}') + # Sleep to let the workflow run initial activities + sleep(12) + + assert counter == 11 + assert retry_count == 2 + assert child_orchestrator_string == '1aa2bb3cc' + + # Pause Test + wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}') + + # Resume Test + wf_client.resume_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') + + sleep(2) # Give the workflow time to reach the event wait state + wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data) + + print('========= Waiting for Workflow completion', flush=True) + try: + state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + if state.runtime_status.name == 'COMPLETED': + print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) + else: + print(f'Workflow failed! Status: {state.runtime_status.name}') + except TimeoutError: + print('*** Workflow timed out!') + + wf_client.purge_workflow(instance_id=instance_id) + try: + wf_client.get_workflow_state(instance_id=instance_id) + except DaprInternalError as err: + if non_existent_id_error in err._message: + print('Instance Successfully Purged') wfr.shutdown() -``` +if __name__ == '__main__': + main() +``` + {{% /codetab %}} {{% codetab %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index f03f4a4c471..c9e847ebe15 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -14,13 +14,13 @@ Now that you've [authored the workflow and its activities in your application]({ {{% codetab %}} Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs: -- **start_workflow**: Start an instance of a workflow -- **get_workflow**: Get information on the status of the workflow +- **schedule_new_workflow**: Start an instance of a workflow +- **get_workflow_state**: Get information on the status of the workflow - **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed - **resume_workflow**: Resumes a paused workflow instance - **raise_workflow_event**: Raise an event on a workflow - **purge_workflow**: Removes all metadata related to a specific workflow instance -- **terminate_workflow**: Terminate or stop a particular instance of a workflow +- **wait_for_workflow_completion**: Complete a particular instance of a workflow ```python from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext @@ -34,27 +34,28 @@ eventName = "event1" eventData = "eventData" # Start the workflow -start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) +wf_client.schedule_new_workflow( + workflow=hello_world_wf, input=input_data, instance_id=instance_id + ) # Get info on the workflow -getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.get_workflow_state(instance_id=instance_id) # Pause the workflow -d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) # Resume the workflow -d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.resume_workflow(instance_id=instance_id) # Raise an event on the workflow. - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) +wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data) # Purge the workflow -d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.purge_workflow(instance_id=instance_id) -# Terminate the workflow -d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) +# Wait for workflow completion +wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) ``` {{% /codetab %}}