Skip to content

AI chat resumption#19047

Draft
lucasbordeau wants to merge 6 commits intomainfrom
fix/ai-chat-thread-switching
Draft

AI chat resumption#19047
lucasbordeau wants to merge 6 commits intomainfrom
fix/ai-chat-thread-switching

Conversation

@lucasbordeau
Copy link
Copy Markdown
Contributor

This PR enables resuming a streaming chat thread in the AI chat.

Copilot AI review requested due to automatic review settings March 27, 2026 16:37
Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@socket-security
Copy link
Copy Markdown

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedresumable-stream@​2.2.1210010010092100

View full report

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enables resuming an in-progress AI chat SSE stream after a client reconnect (e.g., refresh) by persisting an activeStreamId on the chat thread and backing the stream with Redis via resumable-stream.

Changes:

  • Server: store activeStreamId on AgentChatThreadEntity, publish/consume streaming output into a Redis-backed resumable stream, and add GET /rest/agent-chat/:threadId/stream for resumption.
  • Server: include historical thread messages when executing a new streaming turn.
  • Frontend: adopt @ai-sdk/react Chat instances keyed by thread id, enable resume: true, and trigger resume on thread selection.

Reviewed changes

Copilot reviewed 17 out of 19 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
yarn.lock Adds resumable-stream dependency resolution.
packages/twenty-server/package.json Adds resumable-stream dependency for server.
packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts Persists/resets activeStreamId, streams with history, integrates resumable stream creation.
packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-resumable-stream.service.ts New Redis-backed resumable stream helper service.
packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/controllers/agent-chat.controller.ts Adds resume-stream REST endpoint.
packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity.ts Adds nullable activeStreamId column.
packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/ai-chat.module.ts Registers the new resumable stream service.
packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapAgentMessageEntitiesToUIMessages.ts New mapper from stored agent messages to UI message format.
packages/twenty-server/src/database/typeorm/core/migrations/common/1774003611071-add-active-stream-id-to-agent-chat-thread.ts Migration adding activeStreamId column.
packages/twenty-front/src/modules/ai/hooks/useAgentChat.ts Switches to per-thread Chat instances and enables resume flow.
packages/twenty-front/src/modules/ai/hooks/useAgentChatInstanceForThread.ts New hook to create/cache Chat instances per thread.
packages/twenty-front/src/modules/ai/states/agentChatInstanceByThreadIdFamilyState.ts New atom family storing Chat instances by thread id.
packages/twenty-front/src/modules/ai/components/AgentChatMessagesFetchEffect.tsx Updates fetch seeding logic to write into the Chat instance.
packages/twenty-front/src/modules/ai/components/AgentChatAiSdkStreamEffect.tsx Updates stream→Jotai bridging; removes fetched/streaming merge logic.
packages/twenty-front/src/modules/ai/components/AIChatLastMessageWithStreamingState.tsx Adjusts last-message lookup; adds debug tracking/logging.
packages/twenty-front/src/modules/ai/utils/mergeAgentChatFetchedAndStreamingMessages.ts Removed (no longer merging fetched + streaming arrays).
packages/twenty-front/src/modules/ai/states/agentChatFetchedMessagesComponentFamilyState.ts Removed (no longer storing fetched messages separately).
handoff.md Adds internal debugging/analysis notes for the streaming flicker issue.
.gitignore Adds .claude/ entry.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +22 to +28
this.streamContext = createResumableStreamContext({
waitUntil: (callback) => {
void Promise.resolve(callback);
},
publisher: this.publisher,
subscriber: this.subscriber,
});
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitUntil is currently not executing the callback (it wraps the function in a resolved promise but never calls it). This likely prevents resumable-stream from scheduling/finishing background work correctly. Call the callback (and optionally await it) inside waitUntil instead of passing the function reference.

Copilot uses AI. Check for mistakes.
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity';

import { toDisplayCredits } from 'src/engine/core-modules/usage/utils/to-display-credits.util';
import { AIModelConfig } from 'src/engine/metadata-modules/ai/ai-models/types/ai-model-config.type';
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AIModelConfig is a type export; importing it as a value here breaks the prevailing import { type ... } convention and can introduce unnecessary runtime imports / lint failures. Switch back to a type-only import for AIModelConfig.

Suggested change
import { AIModelConfig } from 'src/engine/metadata-modules/ai/ai-models/types/ai-model-config.type';
import { type AIModelConfig } from 'src/engine/metadata-modules/ai/ai-models/types/ai-model-config.type';

Copilot uses AI. Check for mistakes.
Comment on lines +128 to +132
console.log('Received request to resume stream for threadId:', threadId);

const uuidRegex =
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;

Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid console.log in server controllers (it bypasses Nest logging config and can leak request data in production). Use the injected Logger (already present on this controller) or remove this debug log entirely.

Copilot uses AI. Check for mistakes.
Comment on lines +80 to +94
const existingInstance = store.get(threadAtom);

if (existingInstance !== null) {
return { agentChatInstanceForThread: existingInstance };
}

const newInstance = createAgentChatInstanceForThread({
threadId,
onFinish,
retryFetchWithRenewedToken,
});

store.set(threadAtom, newInstance);

return { agentChatInstanceForThread: newInstance };
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hook mutates the Jotai store (store.set(...)) during render when there is no existing Chat instance. Updating state during render can trigger React warnings and subtle render loops. Create/cache the Chat instance in an effect/memoized initializer (e.g. useMemo + useEffect for store.set) or via an atom that initializes the instance for a given threadId.

Copilot uses AI. Check for mistakes.
Comment on lines 12 to +31
@@ -14,6 +19,17 @@ export const AIChatLastMessageWithStreamingState = () => {
const agentChatIsStreaming = useAtomStateValue(agentChatIsStreamingState);
const agentChatError = useAtomStateValue(agentChatErrorState);

const agentChatMessage = useAtomComponentFamilySelectorValue(
agentChatMessageComponentFamilySelector,
{ messageId: lastMessageId },
);

messageMap.set(new Date().toISOString(), agentChatMessage!);

console.log({
messageMap,
});
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messageMap + console.log(...) are global debug artifacts that will grow without bound as the component re-renders, and they can log message content to the console. Please remove them or guard them behind a dev-only flag with bounded storage to avoid memory leaks and accidental data exposure.

Copilot uses AI. Check for mistakes.
Comment on lines +264 to +275
await this.resumableStreamService.createResumableStream(
streamId,
() => stream,
);
this.logger.log(
'createResumableStream done, writing activeStreamId to DB',
);

await this.threadRepository.update(thread.id, {
activeStreamId: streamId,
});
this.logger.log(`activeStreamId written to DB: ${streamId}`);
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumeSseStream updates activeStreamId only after awaiting createResumableStream(...). This creates a window where the client can refresh/disconnect before the DB is updated, making resumption impossible, and it can also race with the onFinish update that clears activeStreamId back to null. Consider persisting activeStreamId immediately after generating it (and ensuring it gets cleared on finish), then starting the resumable stream in the background.

Suggested change
await this.resumableStreamService.createResumableStream(
streamId,
() => stream,
);
this.logger.log(
'createResumableStream done, writing activeStreamId to DB',
);
await this.threadRepository.update(thread.id, {
activeStreamId: streamId,
});
this.logger.log(`activeStreamId written to DB: ${streamId}`);
this.logger.log(
'Writing activeStreamId to DB before creating resumable stream',
);
await this.threadRepository.update(thread.id, {
activeStreamId: streamId,
});
this.logger.log(`activeStreamId written to DB: ${streamId}`);
// Start creating the resumable stream in the background to avoid
// a window where the client disconnects before the DB is updated
// and to reduce races with onFinish clearing activeStreamId.
void this.resumableStreamService
.createResumableStream(streamId, () => stream)
.then(() => {
this.logger.log(
`createResumableStream done for streamId: ${streamId}`,
);
})
.catch((error) => {
this.logger.error(
`Error creating resumable stream for streamId ${streamId}:`,
error,
);
});

Copilot uses AI. Check for mistakes.
});
this.logger.log(`activeStreamId written to DB: ${streamId}`);
} catch (error) {
this.logger.error('consumeSseStream error:', error);
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nest's Logger.error signature is (message, stack?, context?); passing the raw error object as the second argument will be logged as a stack string like [object Object]. Log error instanceof Error ? error.stack : String(error) (or include the error message in the first arg) to keep server logs useful.

Suggested change
this.logger.error('consumeSseStream error:', error);
this.logger.error(
'consumeSseStream error:',
error instanceof Error ? error.stack : String(error),
);

Copilot uses AI. Check for mistakes.
Comment on lines +52 to +60
const threadId = store.get(currentAIChatThreadState.atom);

if (!isDefined(threadId)) {
return;
}

const threadAtom = agentChatInstanceByThreadIdFamilyState.atomFamily({
threadId,
});
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleDataLoaded uses store.get(currentAIChatThreadState.atom) (the current thread) instead of the thread id that the GraphQL query was executed for. If the user switches threads while a request is in-flight, messages from the old thread can be applied to the new thread's Chat instance. Use the currentAIChatThread value captured when starting the query (e.g. close over it / pass it through) rather than reading the latest value from the store inside the callback.

Copilot uses AI. Check for mistakes.
return messageEntities.map((message) => ({
id: message.id,
role: message.role as ExtendedUIMessage['role'],
parts: (message.parts ?? [])
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(message.parts ?? []).sort(...) sorts the entity array in place. Since these are TypeORM entity instances, mutating the relation array can have unintended side effects if the same entity is reused later in the request lifecycle. Consider sorting a shallow copy instead (e.g. copy then sort) to keep mapping side-effect free.

Suggested change
parts: (message.parts ?? [])
parts: [...(message.parts ?? [])]

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 issues found across 19 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapAgentMessageEntitiesToUIMessages.ts">

<violation number="1" location="packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapAgentMessageEntitiesToUIMessages.ts:36">
P1: This drops persisted `file`, `source-document`, and `data-routing-status` parts from resumed chat history, so resumed threads can lose attachments and other saved context.</violation>

<violation number="2" location="packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapAgentMessageEntitiesToUIMessages.ts:48">
P2: `.sort()` mutates `message.parts` in place. Since these are TypeORM entity relation arrays, use `[...(message.parts ?? [])].sort(...)` to avoid side effects if the entity is accessed again later in the request lifecycle.</violation>
</file>

<file name="packages/twenty-front/src/modules/ai/components/AgentChatAiSdkStreamEffect.tsx">

<violation number="1" location="packages/twenty-front/src/modules/ai/components/AgentChatAiSdkStreamEffect.tsx:70">
P2: The new module-scoped `lastMessageMap` retains every streamed message and is never cleared or consumed by the feature.</violation>
</file>

<file name="packages/twenty-front/src/modules/ai/hooks/useAgentChatInstanceForThread.ts">

<violation number="1" location="packages/twenty-front/src/modules/ai/hooks/useAgentChatInstanceForThread.ts:44">
P1: Re-validate the retried response before returning it; failed retries currently bypass the transport's error handling.</violation>
</file>

<file name="packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-resumable-stream.service.ts">

<violation number="1" location="packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-resumable-stream.service.ts:24">
P1: `waitUntil` never invokes its callback — `Promise.resolve(callback)` wraps the function reference in a resolved promise without calling it. This should be `Promise.resolve(callback())` so the `resumable-stream` library can schedule its background cleanup/finalization work correctly.</violation>
</file>

<file name="packages/twenty-front/src/modules/ai/components/AIChatLastMessageWithStreamingState.tsx">

<violation number="1" location="packages/twenty-front/src/modules/ai/components/AIChatLastMessageWithStreamingState.tsx:27">
P2: This writes a new entry into a module-scoped `Map` on every render, so streaming replies will steadily leak memory instead of updating a single cached message.</violation>
</file>

<file name="packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/controllers/agent-chat.controller.ts">

<violation number="1" location="packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/controllers/agent-chat.controller.ts:128">
P2: Use `this.logger.log(...)` instead of `console.log(...)` — the controller already has an injected `Logger`. Raw `console.log` bypasses Nest's logging configuration and can leak request data in production.</violation>
</file>

<file name="packages/twenty-front/src/modules/ai/components/AgentChatMessagesFetchEffect.tsx">

<violation number="1" location="packages/twenty-front/src/modules/ai/components/AgentChatMessagesFetchEffect.tsx:52">
P1: This reads `currentAIChatThreadState` at callback execution time, not when the query was initiated. If the user switches threads while the fetch is in-flight, the response from the old thread will be written to the new thread's Chat instance. Capture the thread ID at query initiation time and use that captured value inside the callback instead.</violation>
</file>

<file name="packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts">

<violation number="1" location="packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts:264">
P2: `activeStreamId` is persisted to the DB only after `createResumableStream` completes. If the client disconnects during that await, the stream ID is never saved and resumption becomes impossible. Persist `activeStreamId` before starting the resumable stream to close this timing window.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

state: part.state ?? undefined,
} as ToolUIPart;
}
return null;
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: This drops persisted file, source-document, and data-routing-status parts from resumed chat history, so resumed threads can lose attachments and other saved context.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapAgentMessageEntitiesToUIMessages.ts, line 36:

<comment>This drops persisted `file`, `source-document`, and `data-routing-status` parts from resumed chat history, so resumed threads can lose attachments and other saved context.</comment>

<file context>
@@ -0,0 +1,53 @@
+          state: part.state ?? undefined,
+        } as ToolUIPart;
+      }
+      return null;
+    }
+  }
</file context>
Fix with Cubic

if (response.status === 401) {
const retriedResponse = await retryFetchWithRenewedToken(input, init);

return retriedResponse ?? response;
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Re-validate the retried response before returning it; failed retries currently bypass the transport's error handling.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-front/src/modules/ai/hooks/useAgentChatInstanceForThread.ts, line 44:

<comment>Re-validate the retried response before returning it; failed retries currently bypass the transport's error handling.</comment>

<file context>
@@ -0,0 +1,95 @@
+        if (response.status === 401) {
+          const retriedResponse = await retryFetchWithRenewedToken(input, init);
+
+          return retriedResponse ?? response;
+        }
+
</file context>
Fix with Cubic


this.streamContext = createResumableStreamContext({
waitUntil: (callback) => {
void Promise.resolve(callback);
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: waitUntil never invokes its callback — Promise.resolve(callback) wraps the function reference in a resolved promise without calling it. This should be Promise.resolve(callback()) so the resumable-stream library can schedule its background cleanup/finalization work correctly.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-resumable-stream.service.ts, line 24:

<comment>`waitUntil` never invokes its callback — `Promise.resolve(callback)` wraps the function reference in a resolved promise without calling it. This should be `Promise.resolve(callback())` so the `resumable-stream` library can schedule its background cleanup/finalization work correctly.</comment>

<file context>
@@ -0,0 +1,77 @@
+
+    this.streamContext = createResumableStreamContext({
+      waitUntil: (callback) => {
+        void Promise.resolve(callback);
+      },
+      publisher: this.publisher,
</file context>
Suggested change
void Promise.resolve(callback);
void Promise.resolve(callback());
Fix with Cubic

const uiMessages = mapDBMessagesToUIMessages(data.chatMessages ?? []);
setAgentChatFetchedMessages(uiMessages);

const threadId = store.get(currentAIChatThreadState.atom);
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: This reads currentAIChatThreadState at callback execution time, not when the query was initiated. If the user switches threads while the fetch is in-flight, the response from the old thread will be written to the new thread's Chat instance. Capture the thread ID at query initiation time and use that captured value inside the callback instead.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-front/src/modules/ai/components/AgentChatMessagesFetchEffect.tsx, line 52:

<comment>This reads `currentAIChatThreadState` at callback execution time, not when the query was initiated. If the user switches threads while the fetch is in-flight, the response from the old thread will be written to the new thread's Chat instance. Capture the thread ID at query initiation time and use that captured value inside the callback instead.</comment>

<file context>
@@ -52,9 +48,34 @@ export const AgentChatMessagesFetchEffect = () => {
       const uiMessages = mapDBMessagesToUIMessages(data.chatMessages ?? []);
-      setAgentChatFetchedMessages(uiMessages);
+
+      const threadId = store.get(currentAIChatThreadState.atom);
+
+      if (!isDefined(threadId)) {
</file context>
Fix with Cubic

setAgentChatMessages(chatState.messages);

const lastMessage = chatState.messages.at(-1);
lastMessageMap.set(new Date().toISOString(), lastMessage!);
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The new module-scoped lastMessageMap retains every streamed message and is never cleared or consumed by the feature.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-front/src/modules/ai/components/AgentChatAiSdkStreamEffect.tsx, line 70:

<comment>The new module-scoped `lastMessageMap` retains every streamed message and is never cleared or consumed by the feature.</comment>

<file context>
@@ -72,20 +64,24 @@ export const AgentChatAiSdkStreamEffect = () => {
+    setAgentChatMessages(chatState.messages);
+
+    const lastMessage = chatState.messages.at(-1);
+    lastMessageMap.set(new Date().toISOString(), lastMessage!);
+
+    console.log('AgentChatAiSdkStreamEffect - Updated messages', {
</file context>
Fix with Cubic

{ messageId: lastMessageId },
);

messageMap.set(new Date().toISOString(), agentChatMessage!);
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: This writes a new entry into a module-scoped Map on every render, so streaming replies will steadily leak memory instead of updating a single cached message.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-front/src/modules/ai/components/AIChatLastMessageWithStreamingState.tsx, line 27:

<comment>This writes a new entry into a module-scoped `Map` on every render, so streaming replies will steadily leak memory instead of updating a single cached message.</comment>

<file context>
@@ -14,6 +19,17 @@ export const AIChatLastMessageWithStreamingState = () => {
+    { messageId: lastMessageId },
+  );
+
+  messageMap.set(new Date().toISOString(), agentChatMessage!);
+
+  console.log({
</file context>
Fix with Cubic

@AuthUserWorkspaceId() userWorkspaceId: string,
@Res() response: Response,
) {
console.log('Received request to resume stream for threadId:', threadId);
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Use this.logger.log(...) instead of console.log(...) — the controller already has an injected Logger. Raw console.log bypasses Nest's logging configuration and can leak request data in production.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/controllers/agent-chat.controller.ts, line 128:

<comment>Use `this.logger.log(...)` instead of `console.log(...)` — the controller already has an injected `Logger`. Raw `console.log` bypasses Nest's logging configuration and can leak request data in production.</comment>

<file context>
@@ -103,4 +117,57 @@ export class AgentChatController {
+    @AuthUserWorkspaceId() userWorkspaceId: string,
+    @Res() response: Response,
+  ) {
+    console.log('Received request to resume stream for threadId:', threadId);
+
+    const uuidRegex =
</file context>
Suggested change
console.log('Received request to resume stream for threadId:', threadId);
this.logger.log(`Received request to resume stream for threadId: ${threadId}`);
Fix with Cubic

const streamId = generateId();

this.logger.log(`consumeSseStream called, streamId: ${streamId}`);
await this.resumableStreamService.createResumableStream(
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: activeStreamId is persisted to the DB only after createResumableStream completes. If the client disconnects during that await, the stream ID is never saved and resumption becomes impossible. Persist activeStreamId before starting the resumable stream to close this timing window.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-server/src/engine/metadata-modules/ai/ai-chat/services/agent-chat-streaming.service.ts, line 264:

<comment>`activeStreamId` is persisted to the DB only after `createResumableStream` completes. If the client disconnects during that await, the stream ID is never saved and resumption becomes impossible. Persist `activeStreamId` before starting the resumable stream to close this timing window.</comment>

<file context>
@@ -233,10 +256,26 @@ export class AgentChatStreamingService {
+            const streamId = generateId();
+
+            this.logger.log(`consumeSseStream called, streamId: ${streamId}`);
+            await this.resumableStreamService.createResumableStream(
+              streamId,
+              () => stream,
</file context>
Fix with Cubic

id: message.id,
role: message.role as ExtendedUIMessage['role'],
parts: (message.parts ?? [])
.sort((partA, partB) => partA.orderIndex - partB.orderIndex)
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: .sort() mutates message.parts in place. Since these are TypeORM entity relation arrays, use [...(message.parts ?? [])].sort(...) to avoid side effects if the entity is accessed again later in the request lifecycle.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapAgentMessageEntitiesToUIMessages.ts, line 48:

<comment>`.sort()` mutates `message.parts` in place. Since these are TypeORM entity relation arrays, use `[...(message.parts ?? [])].sort(...)` to avoid side effects if the entity is accessed again later in the request lifecycle.</comment>

<file context>
@@ -0,0 +1,53 @@
+    id: message.id,
+    role: message.role as ExtendedUIMessage['role'],
+    parts: (message.parts ?? [])
+      .sort((partA, partB) => partA.orderIndex - partB.orderIndex)
+      .map(mapAgentMessagePartEntityToUIMessagePart)
+      .filter((part): part is ExtendedUIMessagePart => part !== null),
</file context>
Fix with Cubic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants