Skip to content

Commit 86338c4

Browse files
authored
Support reusing orchestration id (#46)
* support reuse orchestration id * add test * fix tests * implement new design * clean up * refactory * minor updates * minor updates * refactor * improve tests * correct variable name * refactory more refactory refactory * minor refactory * refactor option pattern * formatting - clean up * update changelog.md * clean up
1 parent ba82393 commit 86338c4

File tree

14 files changed

+1616
-974
lines changed

14 files changed

+1616
-974
lines changed

.github/workflows/pr-validation.yml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ on:
1313
# Allows you to run this workflow manually from the Actions tab
1414
workflow_dispatch:
1515

16+
env:
17+
# Configure protoc and go grpc plugin
18+
PROTOC_VERSION: "25.x"
19+
PROTOC_GEN_GO: "v1.28"
20+
PROTOC_GEN_GO_GRPC: "v1.2"
21+
1622
# A workflow run is made up of one or more jobs that can run sequentially or in parallel
1723
jobs:
1824
# This workflow contains a single job called "build"
@@ -36,6 +42,19 @@ jobs:
3642

3743
- name: Install dependencies
3844
run: go get .
39-
45+
46+
- name: Install Protoc
47+
uses: arduino/setup-protoc@v2
48+
with:
49+
version: ${{ env.PROTOC_VERSION }}
50+
51+
- name: Installing protoc-gen-go
52+
run: |
53+
go install google.golang.org/protobuf/cmd/protoc-gen-go@$PROTOC_GEN_GO
54+
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@$PROTOC_GEN_GO_GRPC
55+
56+
- name: Generate grpc code
57+
run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto
58+
4059
- name: Run integration tests
4160
run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
### Changed
11+
12+
- Support reusing orchestration id ([#46](https://github.com/microsoft/durabletask-go/pull/46)) - contributed by [@kaibocai](https://github.com/kaibocai)
13+
814
## [v0.3.1] - 2023-09-08
915

1016
### Fixed

api/orchestration.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import (
1313
)
1414

1515
var (
16-
ErrInstanceNotFound = errors.New("no such instance exists")
17-
ErrNotStarted = errors.New("orchestration has not started")
18-
ErrNotCompleted = errors.New("orchestration has not yet completed")
19-
ErrNoFailures = errors.New("orchestration did not report failure details")
16+
ErrInstanceNotFound = errors.New("no such instance exists")
17+
ErrNotStarted = errors.New("orchestration has not started")
18+
ErrNotCompleted = errors.New("orchestration has not yet completed")
19+
ErrNoFailures = errors.New("orchestration did not report failure details")
20+
ErrDuplicateInstance = errors.New("orchestration instance already exists")
21+
ErrIgnoreInstance = errors.New("ignore creating orchestration instance")
2022

2123
EmptyInstanceID = InstanceID("")
2224
)
@@ -57,6 +59,18 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
5759
}
5860
}
5961

62+
// WithOrchestrationIdReusePolicy configures Orchestration ID reuse policy.
63+
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) NewOrchestrationOptions {
64+
return func(req *protos.CreateInstanceRequest) error {
65+
// initialize CreateInstanceOption
66+
req.OrchestrationIdReusePolicy = &protos.OrchestrationIdReusePolicy{
67+
Action: policy.Action,
68+
OperationStatus: policy.OperationStatus,
69+
}
70+
return nil
71+
}
72+
}
73+
6074
// WithInput configures an input for the orchestration. The specified input must be serializable.
6175
func WithInput(input any) NewOrchestrationOptions {
6276
return func(req *protos.CreateInstanceRequest) error {

backend/backend.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@ type (
2323
TaskFailureDetails = protos.TaskFailureDetails
2424
)
2525

26+
type OrchestrationIdReusePolicyOptions func(*protos.OrchestrationIdReusePolicy) error
27+
28+
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) OrchestrationIdReusePolicyOptions {
29+
return func(po *protos.OrchestrationIdReusePolicy) error {
30+
if policy != nil {
31+
po.Action = policy.Action
32+
po.OperationStatus = policy.OperationStatus
33+
}
34+
return nil
35+
}
36+
}
37+
2638
type Backend interface {
2739
// CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent.
2840
//
@@ -43,7 +55,7 @@ type Backend interface {
4355

4456
// CreateOrchestrationInstance creates a new orchestration instance with a history event that
4557
// wraps a ExecutionStarted event.
46-
CreateOrchestrationInstance(context.Context, *HistoryEvent) error
58+
CreateOrchestrationInstance(context.Context, *HistoryEvent, ...OrchestrationIdReusePolicyOptions) error
4759

4860
// AddNewEvent adds a new orchestration event to the specified orchestration instance.
4961
AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error

backend/executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
311311
defer span.End()
312312

313313
e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
314-
if err := g.backend.CreateOrchestrationInstance(ctx, e); err != nil {
314+
if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
315315
return nil, err
316316
}
317317

backend/sqlite/sqlite.go

Lines changed: 142 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen
7979
be.dsn = opts.FilePath
8080
}
8181

82+
// used for local debug
83+
// be.dsn = "file:file.sqlite"
84+
8285
return be
8386
}
8487

@@ -333,8 +336,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
333336
for _, msg := range wi.State.PendingMessages() {
334337
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
335338
// Need to insert a new row into the DB
336-
var instanceID string
337-
if err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, &instanceID); err != nil {
339+
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx); err != nil {
338340
if err == backend.ErrDuplicateEvent {
339341
be.logger.Warnf(
340342
"%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.",
@@ -390,7 +392,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
390392
}
391393

392394
// CreateOrchestrationInstance implements backend.Backend
393-
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent) error {
395+
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error {
394396
if err := be.ensureDB(); err != nil {
395397
return err
396398
}
@@ -402,7 +404,10 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
402404
defer tx.Rollback()
403405

404406
var instanceID string
405-
if err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil {
407+
if instanceID, err = be.createOrchestrationInstanceInternal(ctx, e, tx, opts...); errors.Is(err, api.ErrIgnoreInstance) {
408+
// choose to ignore, do nothing
409+
return nil
410+
} else if err != nil {
406411
return err
407412
}
408413

@@ -429,19 +434,45 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
429434
return nil
430435
}
431436

432-
func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, instanceID *string) error {
437+
func buildStatusSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]struct{} {
438+
statusSet := make(map[protos.OrchestrationStatus]struct{})
439+
for _, status := range statuses {
440+
statusSet[status] = struct{}{}
441+
}
442+
return statusSet
443+
}
444+
445+
func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, opts ...backend.OrchestrationIdReusePolicyOptions) (string, error) {
433446
if e == nil {
434-
return errors.New("HistoryEvent must be non-nil")
447+
return "", errors.New("HistoryEvent must be non-nil")
435448
} else if e.Timestamp == nil {
436-
return errors.New("HistoryEvent must have a non-nil timestamp")
449+
return "", errors.New("HistoryEvent must have a non-nil timestamp")
437450
}
438451

439452
startEvent := e.GetExecutionStarted()
440453
if startEvent == nil {
441-
return errors.New("HistoryEvent must be an ExecutionStartedEvent")
454+
return "", errors.New("HistoryEvent must be an ExecutionStartedEvent")
455+
}
456+
instanceID := startEvent.OrchestrationInstance.InstanceId
457+
458+
policy := &protos.OrchestrationIdReusePolicy{}
459+
460+
for _, opt := range opts {
461+
opt(policy)
442462
}
443463

444-
// TODO: Support for re-using orchestration instance IDs
464+
rows, err := insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent)
465+
if err != nil {
466+
return "", err
467+
}
468+
469+
if rows <= 0 {
470+
return instanceID, be.handleInstanceExists(ctx, tx, startEvent, policy, e)
471+
}
472+
return instanceID, nil
473+
}
474+
475+
func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) {
445476
res, err := tx.ExecContext(
446477
ctx,
447478
`INSERT OR IGNORE INTO [Instances] (
@@ -462,19 +493,114 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context
462493
e.Timestamp.AsTime(),
463494
)
464495
if err != nil {
465-
return fmt.Errorf("failed to insert into [Instances] table: %w", err)
496+
return -1, fmt.Errorf("failed to insert into [Instances] table: %w", err)
466497
}
467498

468499
rows, err := res.RowsAffected()
469500
if err != nil {
470-
return fmt.Errorf("failed to count the rows affected: %w", err)
501+
return -1, fmt.Errorf("failed to count the rows affected: %w", err)
471502
}
503+
return rows, nil;
504+
}
472505

473-
if rows <= 0 {
474-
return backend.ErrDuplicateEvent
506+
func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, startEvent *protos.ExecutionStartedEvent, policy *protos.OrchestrationIdReusePolicy, e *backend.HistoryEvent) error {
507+
// query RuntimeStatus for the existing instance
508+
queryRow := tx.QueryRowContext(
509+
ctx,
510+
`SELECT [RuntimeStatus] FROM Instances WHERE [InstanceID] = ?`,
511+
startEvent.OrchestrationInstance.InstanceId,
512+
)
513+
var runtimeStatus *string
514+
err := queryRow.Scan(&runtimeStatus)
515+
if errors.Is(err, sql.ErrNoRows) {
516+
return api.ErrInstanceNotFound
517+
} else if err != nil {
518+
return fmt.Errorf("failed to scan the Instances table result: %w", err)
519+
}
520+
521+
// instance already exists
522+
targetStatusValues := buildStatusSet(policy.OperationStatus)
523+
// status not match, return instance duplicate error
524+
if _, ok := targetStatusValues[helpers.FromRuntimeStatusString(*runtimeStatus)]; !ok {
525+
return api.ErrDuplicateInstance
526+
}
527+
528+
// status match
529+
switch policy.Action {
530+
case protos.CreateOrchestrationAction_IGNORE:
531+
// Log an warning message and ignore creating new instance
532+
be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", startEvent.OrchestrationInstance.InstanceId)
533+
return api.ErrIgnoreInstance
534+
case protos.CreateOrchestrationAction_TERMINATE:
535+
// terminate existing instance
536+
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId),false); err != nil {
537+
return err
538+
}
539+
// create a new instance
540+
var rows int64
541+
if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil {
542+
return err
543+
}
544+
545+
// should never happen, because we clean up instance before create new one
546+
if rows <= 0 {
547+
return fmt.Errorf("failed to insert into [Instances] table because entry already exists.")
548+
}
549+
return nil
550+
}
551+
// default behavior
552+
return api.ErrDuplicateInstance
553+
}
554+
555+
func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx *sql.Tx, id api.InstanceID, onlyIfCompleted bool) error {
556+
row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
557+
if err := row.Err(); err != nil {
558+
return fmt.Errorf("failed to query for instance existence: %w", err)
559+
}
560+
561+
var unused int
562+
if err := row.Scan(&unused); errors.Is(err, sql.ErrNoRows) {
563+
return api.ErrInstanceNotFound
564+
} else if err != nil {
565+
return fmt.Errorf("failed to scan instance existence: %w", err)
475566
}
476567

477-
*instanceID = startEvent.OrchestrationInstance.InstanceId
568+
if onlyIfCompleted {
569+
// purge orchestration in ['COMPLETED', 'FAILED', 'TERMINATED']
570+
dbResult, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ? AND [RuntimeStatus] IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id))
571+
if err != nil {
572+
return fmt.Errorf("failed to delete from the Instances table: %w", err)
573+
}
574+
575+
rowsAffected, err := dbResult.RowsAffected()
576+
if err != nil {
577+
return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err)
578+
}
579+
if rowsAffected == 0 {
580+
return api.ErrNotCompleted
581+
}
582+
} else {
583+
// clean up orchestration in all [RuntimeStatus]
584+
_, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
585+
if err != nil {
586+
return fmt.Errorf("failed to delete from the Instances table: %w", err)
587+
}
588+
}
589+
590+
_, err := tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
591+
if err != nil {
592+
return fmt.Errorf("failed to delete from History table: %w", err)
593+
}
594+
595+
_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
596+
if err != nil {
597+
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
598+
}
599+
600+
_, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id))
601+
if err != nil {
602+
return fmt.Errorf("failed to delete from NewTasks table: %w", err)
603+
}
478604
return nil
479605
}
480606

@@ -837,34 +963,8 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
837963
}
838964
defer tx.Rollback()
839965

840-
row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
841-
if err := row.Err(); err != nil {
842-
return fmt.Errorf("failed to query for instance existence: %w", err)
843-
}
844-
845-
var unused int
846-
if err := row.Scan(&unused); err == sql.ErrNoRows {
847-
return api.ErrInstanceNotFound
848-
} else if err != nil {
849-
return fmt.Errorf("failed to scan instance existence: %w", err)
850-
}
851-
852-
dbResult, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ? AND [RuntimeStatus] IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id))
853-
if err != nil {
854-
return fmt.Errorf("failed to delete from the Instances table: %w", err)
855-
}
856-
857-
rowsAffected, err := dbResult.RowsAffected()
858-
if err != nil {
859-
return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err)
860-
}
861-
if rowsAffected == 0 {
862-
return api.ErrNotCompleted
863-
}
864-
865-
_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
866-
if err != nil {
867-
return fmt.Errorf("failed to delete from History table: %w", err)
966+
if err := be.cleanupOrchestrationStateInternal(ctx, tx, id, true); err != nil {
967+
return err
868968
}
869969

870970
if err = tx.Commit(); err != nil {

0 commit comments

Comments
 (0)