Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/real-rats-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Add onCancel lifecycle hook
1 change: 1 addition & 0 deletions apps/webapp/app/components/runs/v3/RunIcon.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
case "task-hook-onResume":
case "task-hook-onComplete":
case "task-hook-cleanup":
case "task-hook-onCancel":
return <FunctionIcon className={cn(className, "text-text-dimmed")} />;
case "task-hook-onFailure":
case "task-hook-catchError":
Expand Down
23 changes: 0 additions & 23 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,6 @@ export class CancelTaskRunService extends BaseService {
tx: this._prisma,
});

const inProgressEvents = await eventRepository.queryIncompleteEvents(
getTaskEventStoreTableForRun(taskRun),
{
runId: taskRun.friendlyId,
},
taskRun.createdAt,
taskRun.completedAt ?? undefined
);

logger.debug("Cancelling in-progress events", {
inProgressEvents: inProgressEvents.map((event) => event.id),
});

await Promise.all(
inProgressEvents.map((event) => {
return eventRepository.cancelEvent(
event,
options?.cancelledAt ?? new Date(),
options?.reason ?? "Run cancelled"
);
})
);

return {
id: result.run.id,
};
Expand Down
56 changes: 48 additions & 8 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
TaskRunExecution,
timeout,
TriggerConfig,
UsageMeasurement,
waitUntil,
WorkerManifest,
WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -232,7 +233,10 @@ async function bootstrap() {

let _execution: TaskRunExecution | undefined;
let _isRunning = false;
let _isCancelled = false;
let _tracingSDK: TracingSDK | undefined;
let _executionMeasurement: UsageMeasurement | undefined;
const cancelController = new AbortController();

const zodIpc = new ZodIpcConnection({
listenSchema: WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -403,18 +407,33 @@ const zodIpc = new ZodIpcConnection({
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

const measurement = usage.start();
_executionMeasurement = usage.start();

// This lives outside of the executor because this will eventually be moved to the controller level
const signal = execution.run.maxDuration
? timeout.abortAfterTimeout(execution.run.maxDuration)
: undefined;
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);

const { result } = await executor.execute(execution, metadata, traceContext, signal);
timeoutController.signal.addEventListener("abort", () => {
if (_isCancelled) {
return;
}

const usageSample = usage.stop(measurement);
if (cancelController.signal.aborted) {
return;
}

cancelController.abort(timeoutController.signal.reason);
});

const { result } = await executor.execute(
execution,
metadata,
traceContext,
cancelController.signal
);

if (_isRunning && !_isCancelled) {
const usageSample = usage.stop(_executionMeasurement);

if (_isRunning) {
return sender.send("TASK_RUN_COMPLETED", {
execution,
result: {
Expand Down Expand Up @@ -458,7 +477,16 @@ const zodIpc = new ZodIpcConnection({
WAIT_COMPLETED_NOTIFICATION: async () => {
await managedWorkerRuntime.completeWaitpoints([]);
},
FLUSH: async ({ timeoutInMs }, sender) => {
CANCEL: async ({ timeoutInMs }) => {
_isCancelled = true;
cancelController.abort("run cancelled");
await callCancelHooks(timeoutInMs);
if (_executionMeasurement) {
usage.stop(_executionMeasurement);
}
await flushAll(timeoutInMs);
},
FLUSH: async ({ timeoutInMs }) => {
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
Expand All @@ -470,6 +498,18 @@ const zodIpc = new ZodIpcConnection({
},
});

async function callCancelHooks(timeoutInMs: number = 10_000) {
const now = performance.now();

try {
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
} finally {
const duration = performance.now() - now;

log(`Called cancel hooks in ${duration}ms`);
}
}

async function flushAll(timeoutInMs: number = 10_000) {
const now = performance.now();

Expand Down
54 changes: 47 additions & 7 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
TaskRunExecution,
timeout,
TriggerConfig,
UsageMeasurement,
waitUntil,
WorkerManifest,
WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -229,7 +230,10 @@ async function bootstrap() {

let _execution: TaskRunExecution | undefined;
let _isRunning = false;
let _isCancelled = false;
let _tracingSDK: TracingSDK | undefined;
let _executionMeasurement: UsageMeasurement | undefined;
const cancelController = new AbortController();

const zodIpc = new ZodIpcConnection({
listenSchema: WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -398,18 +402,33 @@ const zodIpc = new ZodIpcConnection({
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

const measurement = usage.start();
_executionMeasurement = usage.start();

// This lives outside of the executor because this will eventually be moved to the controller level
const signal = execution.run.maxDuration
? timeout.abortAfterTimeout(execution.run.maxDuration)
: undefined;
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);

const { result } = await executor.execute(execution, metadata, traceContext, signal);
timeoutController.signal.addEventListener("abort", () => {
if (_isCancelled) {
return;
}

const usageSample = usage.stop(measurement);
if (cancelController.signal.aborted) {
return;
}

cancelController.abort(timeoutController.signal.reason);
});

const { result } = await executor.execute(
execution,
metadata,
traceContext,
cancelController.signal
);

if (_isRunning && !_isCancelled) {
const usageSample = usage.stop(_executionMeasurement);

if (_isRunning) {
return sender.send("TASK_RUN_COMPLETED", {
execution,
result: {
Expand Down Expand Up @@ -454,6 +473,15 @@ const zodIpc = new ZodIpcConnection({
FLUSH: async ({ timeoutInMs }, sender) => {
await flushAll(timeoutInMs);
},
CANCEL: async ({ timeoutInMs }, sender) => {
_isCancelled = true;
cancelController.abort("run cancelled");
await callCancelHooks(timeoutInMs);
if (_executionMeasurement) {
usage.stop(_executionMeasurement);
}
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id);
},
Expand All @@ -463,6 +491,18 @@ const zodIpc = new ZodIpcConnection({
},
});

async function callCancelHooks(timeoutInMs: number = 10_000) {
const now = performance.now();

try {
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
} finally {
const duration = performance.now() - now;

console.log(`Called cancel hooks in ${duration}ms`);
}
}

async function flushAll(timeoutInMs: number = 10_000) {
const now = performance.now();

Expand Down
15 changes: 13 additions & 2 deletions packages/cli-v3/src/executions/taskRunProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ export class TaskRunProcess {
this._isBeingCancelled = true;

try {
await this.#flush();
await this.#cancel();
} catch (err) {
console.error("Error flushing task run process", { err });
console.error("Error cancelling task run process", { err });
}

await this.kill();
Expand All @@ -120,6 +120,10 @@ export class TaskRunProcess {
async cleanup(kill = true) {
this._isPreparedForNextRun = false;

if (this._isBeingCancelled) {
return;
}

try {
await this.#flush();
} catch (err) {
Expand Down Expand Up @@ -224,10 +228,17 @@ export class TaskRunProcess {
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000);
}

async #cancel(timeoutInMs: number = 30_000) {
logger.debug("sending cancel message to task run process", { pid: this.pid, timeoutInMs });

await this._ipc?.sendWithAck("CANCEL", { timeoutInMs }, timeoutInMs + 1_000);
}

async execute(
params: TaskRunProcessExecuteParams,
isWarmStart?: boolean
): Promise<TaskRunExecutionResult> {
this._isBeingCancelled = false;
this._isPreparedForNextRun = false;
this._isPreparedForNextAttempt = false;

Expand Down
22 changes: 22 additions & 0 deletions packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,25 @@ export async function tryCatch<T, E = Error>(
return [error as E, null];
}
}

export type Deferred<T> = {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (reason?: any) => void;
};

export function promiseWithResolvers<T>(): Deferred<T> {
let resolve!: (value: T) => void;
let reject!: (reason?: any) => void;

const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

return {
promise,
resolve,
reject,
};
}
3 changes: 3 additions & 0 deletions packages/core/src/v3/lifecycle-hooks-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export type {
AnyOnCleanupHookFunction,
TaskCleanupHookParams,
TaskWait,
TaskCancelHookParams,
OnCancelHookFunction,
AnyOnCancelHookFunction,
} from "./lifecycleHooks/types.js";
28 changes: 28 additions & 0 deletions packages/core/src/v3/lifecycleHooks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
AnyOnStartHookFunction,
AnyOnSuccessHookFunction,
AnyOnWaitHookFunction,
AnyOnCancelHookFunction,
RegisteredHookFunction,
RegisterHookFunctionParams,
TaskWait,
Expand Down Expand Up @@ -260,6 +261,33 @@ export class LifecycleHooksAPI {
this.#getManager().registerOnResumeHookListener(listener);
}

public registerGlobalCancelHook(hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>): void {
this.#getManager().registerGlobalCancelHook(hook);
}

public registerTaskCancelHook(
taskId: string,
hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>
): void {
this.#getManager().registerTaskCancelHook(taskId, hook);
}

public getTaskCancelHook(taskId: string): AnyOnCancelHookFunction | undefined {
return this.#getManager().getTaskCancelHook(taskId);
}

public getGlobalCancelHooks(): RegisteredHookFunction<AnyOnCancelHookFunction>[] {
return this.#getManager().getGlobalCancelHooks();
}

public callOnCancelHookListeners(): Promise<void> {
return this.#getManager().callOnCancelHookListeners();
}

public registerOnCancelHookListener(listener: () => Promise<void>): void {
this.#getManager().registerOnCancelHookListener(listener);
}

#getManager(): LifecycleHooksManager {
return getGlobal(API_NAME) ?? NOOP_LIFECYCLE_HOOKS_MANAGER;
}
Expand Down
Loading
Loading