Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions packages/adapters/postgres/src/amfs_postgres/async_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,23 @@ async def __aenter__(self) -> Any:
team_id = get_request_tenant_team_id()
is_admin = get_request_is_account_admin()

guc_account = tid if tid else ""
guc_team = team_id if team_id else ""
guc_admin = "true" if is_admin else "false"

if not tid:
logger.warning(
"async pool checkout: tenant account_id is EMPTY "
"(tid=%r, team=%r, admin=%r) — RLS reads will return no rows",
tid, team_id, is_admin,
)

async with conn.cursor() as cur:
await cur.execute(
"SELECT set_config('amfs.current_account_id', %s, false),"
" set_config('amfs.current_team_id', %s, false),"
" set_config('amfs.is_account_admin', %s, false)",
(
tid if tid else "",
team_id if team_id else "",
"true" if is_admin else "false",
),
(guc_account, guc_team, guc_admin),
)
return conn

Expand Down
50 changes: 44 additions & 6 deletions packages/http-server/src/amfs_http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,23 @@ async def read_entry(
branch: str = Query("main"),
_auth: str | None = Depends(verify_api_key),
) -> dict[str, Any]:
mem = _get_memory()
if _async_adapter is not None:
entry = await _async_adapter.read(entity_path, key, branch=branch)
try:
entry = await _async_adapter.read(entity_path, key, branch=branch)
except Exception:
logger.warning("Async read failed for %s/%s — falling back to sync", entity_path, key, exc_info=True)
entry = None
if entry is None:
entry = mem.read(entity_path, key, branch=branch)
if entry is not None:
logger.warning(
"Async adapter missed entry %s/%s but sync found it — RLS context mismatch",
entity_path, key,
)
if entry is not None:
asyncio.create_task(_async_adapter.increment_recall_count(entity_path, key, branch=branch))
else:
mem = _get_memory()
entry = mem.read(entity_path, key, branch=branch)
if entry is None:
return {"status": "not_found", "entity_path": entity_path, "key": key}
Expand Down Expand Up @@ -659,10 +670,22 @@ async def list_entries(
"[TLS-DIAG] /entries tls_account=%s state_account=%s state_user=%s has_tenant_ctx=%s",
_tls_acct, _state_acct, _state_user, _has_ctx,
)
mem = _get_memory()
if _async_adapter is not None:
entries = await _async_adapter.list(entity_path, branch=branch, include_superseded=include_superseded)
try:
entries = await _async_adapter.list(entity_path, branch=branch, include_superseded=include_superseded)
except Exception:
logger.warning("Async list failed for %s — falling back to sync", entity_path, exc_info=True)
entries = []
if not entries:
sync_entries = mem.list(entity_path, branch=branch, include_superseded=include_superseded)
if sync_entries:
logger.warning(
"Async adapter returned 0 entries for %s but sync found %d — RLS context mismatch",
entity_path, len(sync_entries),
)
entries = sync_entries
else:
mem = _get_memory()
entries = mem.list(entity_path, branch=branch, include_superseded=include_superseded)
total_before = len(entries)

Expand Down Expand Up @@ -707,10 +730,25 @@ async def search_entries(
limit=req.limit,
depth=req.depth,
)
mem = _get_memory()
if _async_adapter is not None:
results = await _async_adapter.search(sq, branch=branch)
try:
results = await _async_adapter.search(sq, branch=branch)
except Exception:
logger.warning("Async search failed — falling back to sync", exc_info=True)
results = []
if not results:
try:
sync_results = mem._adapter.search(sq, branch=branch)
except TypeError:
sync_results = mem._adapter.search(sq)
if sync_results:
logger.warning(
"Async search returned 0 results but sync found %d — RLS context mismatch",
len(sync_results),
)
results = sync_results
else:
mem = _get_memory()
try:
results = mem._adapter.search(sq, branch=branch)
except TypeError:
Expand Down
Loading