Skip to content

Commit 7eec03c

Browse files
authored
Merge pull request #4156 from pipecat-ai/mb/mem0-improvements
fix(mem0): improve Mem0 service reliability and add get_memories() method
2 parents 5ad4aa9 + 83911dc commit 7eec03c

File tree

8 files changed

+112
-79
lines changed

8 files changed

+112
-79
lines changed

changelog/4156.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Added `Mem0MemoryService.get_memories()` convenience method for retrieving all stored memories outside the pipeline (e.g. to build a personalized greeting at connection time). This avoids the need to manually handle client type branching, filter construction, and async wrapping.

changelog/4156.changed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- ⚠️ Bumped `mem0ai` dependency from `~=0.1.94` to `>=1.0.8,<2`. Users of the `mem0` extra will need to update their mem0ai package.

changelog/4156.fixed.2.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed `Mem0MemoryService` failing to store messages when the context contained system or developer role messages. The Mem0 API only accepts user and assistant roles, so other roles are now filtered out before storing.

changelog/4156.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- `Mem0MemoryService` no longer blocks the event loop during memory storage and retrieval. All Mem0 API calls now run in a background thread, and message storage is fire-and-forget so it doesn't delay downstream processing.

examples/foundational/37-mem0.py

Lines changed: 23 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
"""
4343

4444
import os
45-
from typing import Union
4645

4746
from dotenv import load_dotenv
4847
from loguru import logger
@@ -69,58 +68,35 @@
6968

7069
load_dotenv(override=True)
7170

72-
try:
73-
from mem0 import Memory, MemoryClient # noqa: F401
74-
except ModuleNotFoundError as e:
75-
logger.error(f"Exception: {e}")
76-
logger.error(
77-
"In order to use Mem0, you need to `pip install mem0ai`. Also, set the environment variable MEM0_API_KEY."
78-
)
79-
raise Exception(f"Missing module: {e}")
80-
8171

82-
async def get_initial_greeting(
83-
memory_client: Union[MemoryClient, Memory], user_id: str, agent_id: str, run_id: str
84-
) -> str:
72+
async def get_initial_greeting(memory_service: Mem0MemoryService) -> str:
8573
"""Fetch all memories for the user and create a personalized greeting.
8674
75+
Args:
76+
memory_service: The Mem0 memory service instance.
77+
8778
Returns:
88-
A personalized greeting based on user memories
79+
A personalized greeting based on user memories.
8980
"""
9081
try:
91-
if isinstance(memory_client, Memory):
92-
filters = {"user_id": user_id, "agent_id": agent_id, "run_id": run_id}
93-
filters = {k: v for k, v in filters.items() if v is not None}
94-
memories = memory_client.get_all(**filters)
95-
else:
96-
# Create filters based on available IDs
97-
id_pairs = [("user_id", user_id), ("agent_id", agent_id), ("run_id", run_id)]
98-
clauses = [{name: value} for name, value in id_pairs if value is not None]
99-
filters = {"AND": clauses} if clauses else {}
100-
101-
# Get all memories for this user
102-
memories = memory_client.get_all(filters=filters, version="v2", output_format="v1.1")
103-
104-
if not memories or len(memories) == 0:
105-
logger.debug(f"!!! No memories found for this user. {memories}")
82+
results = await memory_service.get_memories()
83+
if not results:
84+
logger.debug("No memories found for this user.")
10685
return "Hello! It's nice to meet you. How can I help you today?"
10786

10887
# Create a personalized greeting based on memories
10988
greeting = "Hello! It's great to see you again. "
89+
greeting += "Based on our previous conversations, I remember: "
90+
for i, memory in enumerate(results[:3], 1):
91+
memory_content = memory.get("memory", "")
92+
# Keep memory references brief
93+
if len(memory_content) > 100:
94+
memory_content = memory_content[:97] + "..."
95+
greeting += f"{memory_content} "
11096

111-
# Add some personalization based on memories (limit to 3 memories for brevity)
112-
if len(memories) > 0:
113-
greeting += "Based on our previous conversations, I remember: "
114-
for i, memory in enumerate(memories["results"][:3], 1):
115-
memory_content = memory.get("memory", "")
116-
# Keep memory references brief
117-
if len(memory_content) > 100:
118-
memory_content = memory_content[:97] + "..."
119-
greeting += f"{memory_content} "
97+
greeting += "How can I help you today?"
12098

121-
greeting += "How can I help you today?"
122-
123-
logger.debug(f"Created personalized greeting from {len(memories)} memories")
99+
logger.debug(f"Created personalized greeting from {len(results)} memories")
124100
return greeting
125101

126102
except Exception as e:
@@ -265,23 +241,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
265241
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
266242
)
267243

268-
@task.rtvi.event_handler("on_client_ready")
269-
async def on_client_ready(rtvi):
270-
# Get personalized greeting based on user memories. Can pass agent_id and run_id as per requirement of the application to manage short term memory or agent specific memory.
271-
greeting = await get_initial_greeting(
272-
memory_client=memory.memory_client, user_id=USER_ID, agent_id=None, run_id=None
273-
)
244+
@transport.event_handler("on_client_connected")
245+
async def on_client_connected(transport, client):
246+
logger.info(f"Client connected")
247+
# Get personalized greeting based on user memories
248+
greeting = await get_initial_greeting(memory)
274249

275250
# Add the greeting as an assistant message to start the conversation
276-
context.add_message({"role": "assistant", "content": greeting})
251+
context.add_message({"role": "developer", "content": greeting})
277252

278253
# Queue the context frame to start the conversation
279254
await task.queue_frames([LLMRunFrame()])
280255

281-
@transport.event_handler("on_client_connected")
282-
async def on_client_connected(transport, client):
283-
logger.info(f"Client connected")
284-
285256
@transport.event_handler("on_client_disconnected")
286257
async def on_client_disconnected(transport, client):
287258
logger.info(f"Client disconnected")

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ lmnt = [ "pipecat-ai[websockets-base]" ]
8888
local = [ "pyaudio~=0.2.14" ]
8989
local-smart-turn = [ "coremltools>=8.0", "transformers>=4.48.0,<6", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
9090
mcp = [ "mcp[cli]>=1.11.0,<2" ]
91-
mem0 = [ "mem0ai~=0.1.94" ]
91+
mem0 = [ "mem0ai>=1.0.8,<2" ]
9292
mistral = []
9393
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
9494
moondream = [ "accelerate~=1.10.0", "einops~=0.8.0", "pyvips[binary]~=3.0.0", "timm~=1.0.13", "transformers>=4.48.0,<6" ]

src/pipecat/services/mem0/memory.py

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
historical information.
1212
"""
1313

14+
import asyncio
1415
from typing import Any, Dict, List, Optional
1516

1617
from loguru import logger
@@ -112,9 +113,51 @@ def __init__(
112113
self.last_query = None
113114
logger.info(f"Initialized Mem0MemoryService with {user_id=}, {agent_id=}, {run_id=}")
114115

115-
def _store_messages(self, messages: List[Dict[str, Any]]):
116+
async def get_memories(self) -> List[Dict[str, Any]]:
117+
"""Retrieve all stored memories for the configured user/agent/run IDs.
118+
119+
This is a convenience method for accessing memories outside the pipeline,
120+
e.g. to build a personalized greeting at connection time. It wraps the
121+
blocking Mem0 ``get_all()`` call in a background thread.
122+
123+
Returns:
124+
List of memory dictionaries. Each dict contains at least a
125+
``"memory"`` key with the memory text. Returns an empty list on
126+
error.
127+
"""
128+
try:
129+
if isinstance(self.memory_client, Memory):
130+
params = {
131+
"user_id": self.user_id,
132+
"agent_id": self.agent_id,
133+
"run_id": self.run_id,
134+
}
135+
params = {k: v for k, v in params.items() if v is not None}
136+
memories = await asyncio.to_thread(lambda: self.memory_client.get_all(**params))
137+
else:
138+
id_pairs = [
139+
("user_id", self.user_id),
140+
("agent_id", self.agent_id),
141+
("run_id", self.run_id),
142+
]
143+
clauses = [{name: value} for name, value in id_pairs if value is not None]
144+
filters = {"OR": clauses} if clauses else {}
145+
memories = await asyncio.to_thread(
146+
lambda: self.memory_client.get_all(filters=filters)
147+
)
148+
149+
results = memories.get("results", []) if isinstance(memories, dict) else memories
150+
return results
151+
except Exception as e:
152+
logger.error(f"Error retrieving memories from Mem0: {e}")
153+
return []
154+
155+
async def _store_messages(self, messages: List[Dict[str, Any]]):
116156
"""Store messages in Mem0.
117157
158+
Runs the blocking Mem0 API call in a background thread to avoid
159+
blocking the event loop.
160+
118161
Args:
119162
messages: List of message dictionaries to store in memory.
120163
"""
@@ -131,14 +174,16 @@ def _store_messages(self, messages: List[Dict[str, Any]]):
131174

132175
if isinstance(self.memory_client, Memory):
133176
del params["output_format"]
134-
# Note: You can run this in background to avoid blocking the conversation
135-
self.memory_client.add(**params)
177+
await asyncio.to_thread(lambda: self.memory_client.add(**params))
136178
except Exception as e:
137179
logger.error(f"Error storing messages in Mem0: {e}")
138180

139-
def _retrieve_memories(self, query: str) -> List[Dict[str, Any]]:
181+
async def _retrieve_memories(self, query: str) -> List[Dict[str, Any]]:
140182
"""Retrieve relevant memories from Mem0.
141183
184+
Runs the blocking Mem0 API call in a background thread to avoid
185+
blocking the event loop.
186+
142187
Args:
143188
query: The query to search for relevant memories.
144189
@@ -156,7 +201,7 @@ def _retrieve_memories(self, query: str) -> List[Dict[str, Any]]:
156201
"limit": self.search_limit,
157202
}
158203
params = {k: v for k, v in params.items() if v is not None}
159-
results = self.memory_client.search(**params)
204+
results = await asyncio.to_thread(lambda: self.memory_client.search(**params))
160205
else:
161206
id_pairs = [
162207
("user_id", self.user_id),
@@ -165,13 +210,15 @@ def _retrieve_memories(self, query: str) -> List[Dict[str, Any]]:
165210
]
166211
clauses = [{name: value} for name, value in id_pairs if value is not None]
167212
filters = {"OR": clauses} if clauses else {}
168-
results = self.memory_client.search(
169-
query=query,
170-
filters=filters,
171-
version=self.api_version,
172-
top_k=self.search_limit,
173-
threshold=self.search_threshold,
174-
output_format="v1.1",
213+
results = await asyncio.to_thread(
214+
lambda: self.memory_client.search(
215+
query=query,
216+
filters=filters,
217+
version=self.api_version,
218+
top_k=self.search_limit,
219+
threshold=self.search_threshold,
220+
output_format="v1.1",
221+
)
175222
)
176223

177224
logger.debug(f"Retrieved {len(results)} memories from Mem0")
@@ -180,7 +227,9 @@ def _retrieve_memories(self, query: str) -> List[Dict[str, Any]]:
180227
logger.error(f"Error retrieving memories from Mem0: {e}")
181228
return []
182229

183-
def _enhance_context_with_memories(self, context: LLMContext | OpenAILLMContext, query: str):
230+
async def _enhance_context_with_memories(
231+
self, context: LLMContext | OpenAILLMContext, query: str
232+
):
184233
"""Enhance the LLM context with relevant memories.
185234
186235
Args:
@@ -193,7 +242,7 @@ def _enhance_context_with_memories(self, context: LLMContext | OpenAILLMContext,
193242

194243
self.last_query = query
195244

196-
memories = self._retrieve_memories(query)
245+
memories = await self._retrieve_memories(query)
197246
if not memories:
198247
return
199248

@@ -203,11 +252,14 @@ def _enhance_context_with_memories(self, context: LLMContext | OpenAILLMContext,
203252
memory_text += f"{i}. {memory.get('memory', '')}\n\n"
204253

205254
# Add memories as a system message or user message based on configuration
206-
if self.add_as_system_message:
207-
context.add_message({"role": "system", "content": memory_text})
208-
else:
209-
# Add as a user message that provides context
210-
context.add_message({"role": "user", "content": memory_text})
255+
role = "system" if self.add_as_system_message else "user"
256+
memory_message = {"role": role, "content": memory_text}
257+
258+
messages = context.get_messages()
259+
position = max(0, min(self.position, len(messages)))
260+
messages.insert(position, memory_message)
261+
context.set_messages(messages)
262+
211263
logger.debug(f"Enhanced context with {len(memories)} memories")
212264

213265
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -240,10 +292,15 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
240292
break
241293

242294
if latest_user_message:
295+
# Filter to only user/assistant messages — Mem0 API
296+
# doesn't accept other roles (system, developer, etc.)
297+
messages_to_store = [
298+
m for m in context_messages if m.get("role") in ("user", "assistant")
299+
]
243300
# Enhance context with memories before passing it downstream
244-
self._enhance_context_with_memories(context, latest_user_message)
245-
# Store the conversation in Mem0. Only call this when user message is detected
246-
self._store_messages(context_messages)
301+
await self._enhance_context_with_memories(context, latest_user_message)
302+
# Store the conversation in Mem0 as a background task
303+
self.create_task(self._store_messages(messages_to_store), name="mem0_store")
247304

248305
# If we received an LLMMessagesFrame, create a new one with the enhanced messages
249306
if messages is not None:

uv.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)