Skip to content

Commit fcda0b7

Browse files
authored
Activity reset (#1730)
1 parent 9089bc6 commit fcda0b7

File tree

7 files changed

+143
-23
lines changed

7 files changed

+143
-23
lines changed

packages/client/src/async-completion-client.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ export class ActivityCancelledError extends Error {}
4242
@SymbolBasedInstanceOfError('ActivityPausedError')
4343
export class ActivityPausedError extends Error {}
4444

45+
/**
46+
* Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity
47+
* has been reset.
48+
*/
49+
@SymbolBasedInstanceOfError('ActivityResetError')
50+
export class ActivityResetError extends Error {}
51+
4552
/**
4653
* Options used to configure {@link AsyncCompletionClient}
4754
*/
@@ -219,6 +226,7 @@ export class AsyncCompletionClient extends BaseClient {
219226
const payloads = await encodeToPayloads(this.dataConverter, details);
220227
let cancelRequested = false;
221228
let paused = false;
229+
let reset = false;
222230
try {
223231
if (taskTokenOrFullActivityId instanceof Uint8Array) {
224232
const response = await this.workflowService.recordActivityTaskHeartbeat({
@@ -229,6 +237,7 @@ export class AsyncCompletionClient extends BaseClient {
229237
});
230238
cancelRequested = !!response.cancelRequested;
231239
paused = !!response.activityPaused;
240+
reset = !!response.activityReset;
232241
} else {
233242
const response = await this.workflowService.recordActivityTaskHeartbeatById({
234243
identity: this.options.identity,
@@ -238,14 +247,18 @@ export class AsyncCompletionClient extends BaseClient {
238247
});
239248
cancelRequested = !!response.cancelRequested;
240249
paused = !!response.activityPaused;
250+
reset = !!response.activityReset;
241251
}
242252
} catch (err) {
243253
this.handleError(err);
244254
}
255+
// Note that it is possible for a heartbeat response to have multiple fields
256+
// set as true (i.e. cancelled and pause).
245257
if (cancelRequested) {
246258
throw new ActivityCancelledError('cancelled');
247-
}
248-
if (paused) {
259+
} else if (reset) {
260+
throw new ActivityResetError('reset');
261+
} else if (paused) {
249262
throw new ActivityPausedError('paused');
250263
}
251264
}

packages/core-bridge/src/worker.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -672,10 +672,8 @@ mod config {
672672
self.local_activity_task_slot_supplier
673673
.into_slot_supplier(&mut rbo),
674674
);
675-
tuner_holder.nexus_slot_options(
676-
self.nexus_task_slot_supplier
677-
.into_slot_supplier(&mut rbo)
678-
);
675+
tuner_holder
676+
.nexus_slot_options(self.nexus_task_slot_supplier.into_slot_supplier(&mut rbo));
679677
if let Some(rbo) = rbo {
680678
tuner_holder.resource_based_options(rbo);
681679
}

packages/test/src/helpers-integration.ts

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,11 @@ export function configurableHelpers<T>(
296296
};
297297
}
298298

299-
export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise<void> {
299+
export async function setActivityState(
300+
handle: WorkflowHandle,
301+
activityId: string,
302+
state: 'pause' | 'unpause' | 'reset' | 'pause & reset'
303+
): Promise<void> {
300304
const desc = await handle.describe();
301305
const req = {
302306
namespace: handle.client.options.namespace,
@@ -306,22 +310,50 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId:
306310
},
307311
id: activityId,
308312
};
309-
if (pause) {
313+
if (state === 'pause') {
310314
await handle.client.workflowService.pauseActivity(req);
311-
} else {
315+
} else if (state === 'unpause') {
312316
await handle.client.workflowService.unpauseActivity(req);
317+
} else if (state === 'reset') {
318+
await handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true });
319+
} else {
320+
await Promise.all([
321+
handle.client.workflowService.pauseActivity(req),
322+
handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true }),
323+
]);
313324
}
314325
await waitUntil(async () => {
315326
const { raw } = await handle.describe();
316327
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
317328
// If we are pausing: success when either
318329
// • paused flag is true OR
319330
// • the activity vanished (it completed / retried)
320-
if (pause) return activityInfo ? activityInfo.paused ?? false : true;
321-
// If we are unpausing: success when either
322-
// • paused flag is false OR
323-
// • the activity vanished (already completed)
324-
return activityInfo ? !activityInfo.paused : true;
331+
if (state === 'pause') {
332+
if (!activityInfo) {
333+
return true; // Activity vanished (completed/retried)
334+
}
335+
return activityInfo.paused ?? false;
336+
} else if (state === 'unpause') {
337+
// If we are unpausing: success when either
338+
// • paused flag is false OR
339+
// • the activity vanished (already completed)
340+
return activityInfo ? !activityInfo.paused : true;
341+
} else if (state === 'reset') {
342+
// If we are resetting, success when either
343+
// • heartbeat details have been reset OR
344+
// • the activity vanished (completed / retried)
345+
return activityInfo ? activityInfo.heartbeatDetails === null : true;
346+
} else {
347+
// If we are pausing & resetting, success when either
348+
// • activity is paused AND heartbeat details have been reset OR
349+
// • the activity vanished (completed / retried)
350+
if (!activityInfo) {
351+
return true; // Activity vanished (completed/retried)
352+
}
353+
const isPaused = activityInfo.paused ?? false;
354+
const isHeartbeatReset = activityInfo.heartbeatDetails === null;
355+
return isPaused && isHeartbeatReset;
356+
}
325357
}, 15000);
326358
}
327359

packages/test/src/test-integration-split-three.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ test(
156156
await worker.runUntil(handle.result());
157157
let firstChild = true;
158158
const history = await handle.fetchHistory();
159-
console.log('events');
160159
for (const event of history?.events ?? []) {
161160
switch (event.eventType) {
162161
case temporal.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:

packages/test/src/test-integration-workflows.ts

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import {
4545
hasActivityHeartbeat,
4646
helpers,
4747
makeTestFunction,
48-
setActivityPauseState,
48+
setActivityState,
4949
} from './helpers-integration';
5050
import { overrideSdkInternalFlag } from './mock-internal-flags';
5151
import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
@@ -1459,7 +1459,7 @@ test('Activity pause returns expected cancellation details', async (t) => {
14591459
}, 10000);
14601460

14611461
// Now pause the activity
1462-
await setActivityPauseState(handle, testActivityId, true);
1462+
await setActivityState(handle, testActivityId, 'pause');
14631463
// Get the result - should contain pause cancellation details
14641464
const result = await handle.result();
14651465

@@ -1494,15 +1494,79 @@ test('Activity can be cancelled via pause and retry after unpause', async (t) =>
14941494
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
14951495
}, 10000);
14961496

1497-
await setActivityPauseState(handle, testActivityId, true);
1497+
await setActivityState(handle, testActivityId, 'pause');
14981498
await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 10000);
1499-
await setActivityPauseState(handle, testActivityId, false);
1499+
await setActivityState(handle, testActivityId, 'unpause');
15001500

15011501
const result = await handle.result();
15021502
t.true(result == null);
15031503
});
15041504
});
15051505

1506+
test('Activity reset returns expected cancellation details', async (t) => {
1507+
const { createWorker, startWorkflow } = helpers(t);
1508+
const worker = await createWorker({
1509+
activities: {
1510+
heartbeatCancellationDetailsActivity,
1511+
},
1512+
});
1513+
1514+
await worker.runUntil(async () => {
1515+
const testActivityId = randomUUID();
1516+
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
1517+
1518+
// Wait for it to exist and heartbeat
1519+
await waitUntil(async () => {
1520+
const { raw } = await handle.describe();
1521+
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
1522+
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
1523+
}, 10000);
1524+
1525+
await setActivityState(handle, testActivityId, 'reset');
1526+
const result = await handle.result();
1527+
t.deepEqual(result, {
1528+
cancelRequested: false,
1529+
notFound: false,
1530+
paused: false,
1531+
timedOut: false,
1532+
workerShutdown: false,
1533+
reset: true,
1534+
});
1535+
});
1536+
});
1537+
1538+
test('Activity set as both paused and reset returns expected cancellation details', async (t) => {
1539+
const { createWorker, startWorkflow } = helpers(t);
1540+
const worker = await createWorker({
1541+
activities: {
1542+
heartbeatCancellationDetailsActivity,
1543+
},
1544+
});
1545+
1546+
await worker.runUntil(async () => {
1547+
const testActivityId = randomUUID();
1548+
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
1549+
1550+
// Wait for it to exist and heartbeat
1551+
await waitUntil(async () => {
1552+
const { raw } = await handle.describe();
1553+
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
1554+
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
1555+
}, 10000);
1556+
1557+
await setActivityState(handle, testActivityId, 'pause & reset');
1558+
const result = await handle.result();
1559+
t.deepEqual(result, {
1560+
cancelRequested: false,
1561+
notFound: false,
1562+
paused: true,
1563+
timedOut: false,
1564+
workerShutdown: false,
1565+
reset: true,
1566+
});
1567+
});
1568+
});
1569+
15061570
const reservedNames = [TEMPORAL_RESERVED_PREFIX, STACK_TRACE_QUERY_NAME, ENHANCED_STACK_TRACE_QUERY_NAME];
15071571

15081572
test('Cannot register activities using reserved prefixes', async (t) => {

packages/worker/src/activity.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,14 @@ export class Activity {
149149
(error instanceof CancelledFailure || isAbortError(error)) &&
150150
this.context.cancellationSignal.aborted
151151
) {
152-
if (this.context.cancellationDetails?.paused) {
152+
if (this.context.cancellationDetails?.cancelRequested) {
153+
this.workerLogger.debug('Activity completed as cancelled', { durationMs });
154+
} else if (this.context.cancellationDetails?.reset) {
155+
this.workerLogger.debug('Activity reset', { durationMs });
156+
} else if (this.context.cancellationDetails?.paused) {
153157
this.workerLogger.debug('Activity paused', { durationMs });
154158
} else {
159+
// Fallback log - completed as cancelled.
155160
this.workerLogger.debug('Activity completed as cancelled', { durationMs });
156161
}
157162
} else if (error instanceof CompleteAsyncError) {
@@ -204,8 +209,17 @@ export class Activity {
204209
} else if (this.cancelReason) {
205210
// Either a CancelledFailure that we threw or AbortError from AbortController
206211
if (err instanceof CancelledFailure) {
207-
// If cancel due to activity pause, emit an application failure for the pause.
208-
if (this.context.cancellationDetails?.paused) {
212+
// If cancel due to activity pause or reset, emit an application failure.
213+
if (this.context.cancellationDetails?.reset) {
214+
return {
215+
failed: {
216+
failure: await encodeErrorToFailure(
217+
this.dataConverter,
218+
new ApplicationFailure('Activity reset', 'ActivityReset')
219+
),
220+
},
221+
};
222+
} else if (this.context.cancellationDetails?.paused) {
209223
return {
210224
failed: {
211225
failure: await encodeErrorToFailure(

packages/worker/src/worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,7 @@ export class Worker {
10361036
details,
10371037
onError() {
10381038
// activity must be defined
1039-
// empty cancellation details, not corresponding detail for heartbeat detail conversion failure
1039+
// empty cancellation details, no corresponding detail for heartbeat detail conversion failure
10401040
activity?.cancel(
10411041
'HEARTBEAT_DETAILS_CONVERSION_FAILED',
10421042
ActivityCancellationDetails.fromProto(undefined)

0 commit comments

Comments
 (0)