@@ -976,21 +976,24 @@ kj::Promise<WorkerInterface::CustomEvent::Result> TailStreamCustomEvent::sendRpc
976976
977977 capFulfiller->fulfill (kj::mv (cap));
978978
979+ // Forked promise for completion of all capabilities associated with the cap stream. This is
980+ // expected to be resolved when the request is canceled or when the client receives the stop
981+ // signal and deallocates cap after the tail worker indicates that it has processed all events
982+ // successfully.
983+ kj::ForkedPromise<void > forked = completionPaf.promise .fork ();
979984 try {
980- // Wait for EITHER:
981- // 1. The RPC to complete successfully (returns the event outcome), OR
982- // 2. The capability to be dropped by the client (returns CANCELED)
983- // Whichever happens first determines our result.
984985 EventOutcome outcome = co_await sent.then ([](auto resp) {
985986 return resp.getResult ();
986- }).exclusiveJoin (completionPaf. promise .then ([]() { return EventOutcome::CANCELED; }));
987+ }).exclusiveJoin (forked. addBranch () .then ([]() { return EventOutcome::CANCELED; }));
987988
989+ // If the sent promise returned first, we still need to wait for the parent process to drop the
990+ // capability (which should happen right after it receives the stop signal) so that no
991+ // capabilities remain in an incomplete state when we return.
992+ co_await forked.addBranch ();
988993 co_return WorkerInterface::CustomEvent::Result{.outcome = outcome};
989994 } catch (...) {
990- // If an exception occurs, capture it and ensure proper cleanup
991995 auto e = kj::getCaughtExceptionAsKj ();
992996 if (revokePaf.fulfiller ->isWaiting ()) {
993- // Reject the revoke promise to trigger capability revocation
994997 revokePaf.fulfiller ->reject (kj::cp (e));
995998 }
996999 kj::throwFatalException (kj::mv (e));
0 commit comments