fix: Iteration 2 — Infrastructure cleanup (imports, task ordering, exit codes, Redis)#13
Conversation
…so run_evals.py imports correctly
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughReorganized database imports to Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Runner as "Eval Runner\n(src/d4bl/evals/runner.py)"
participant DB as "Postgres DB\n(ResearchJob table)"
participant Worker as "Eval Worker\n(run_comprehensive_evaluation)"
Runner->>DB: init_db() / open session\nquery completed ResearchJob rows
Runner->>Runner: acquire semaphore (concurrency limit)
Runner->>Worker: dispatch job (query, research_output, trace_id)\n(executed in threadpool)
Worker-->>Runner: return evaluation result / exception
Runner->>Runner: release semaphore
Runner->>Runner: gather results & log completion
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (9)
src/d4bl/infra/database.py (1)
100-100: Redundant guard —db_host != "host.docker.internal"is alwaysTruehere.The outer condition already constrains
db_hostto"localhost"or"127.0.0.1", so it can never equal"host.docker.internal"at that point. The second clause is dead code.♻️ Simplified condition
- if (db_host == "localhost" or db_host == "127.0.0.1") and db_host != "host.docker.internal": + if db_host in ("localhost", "127.0.0.1"):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/infra/database.py` at line 100, The if condition using db_host contains a redundant check: remove the impossible clause db_host != "host.docker.internal" and simplify the guard so it only checks whether db_host is "localhost" or "127.0.0.1" (e.g., replace the expression in the if that currently reads the three-part boolean with a concise test like db_host in ("localhost", "127.0.0.1")). Keep the surrounding logic unchanged and ensure any downstream code still executes under the same intended local-host-only branch.scripts/add_evaluation_results_table.py (1)
19-19: Consider using the module-alias import pattern forenginefor consistency.This script imports
init_dbat the top and then re-importsengineinside the function body (afterinit_db()) to avoid holding a staleNonereference.scripts/add_job_id_to_evaluations.pyandscripts/add_trace_id_to_research_jobs.pyuse the cleanerfrom d4bl.infra import database as dbalias, accessingdb.enginedirectly — a pattern that's less surprising and consistent across scripts.♻️ Suggested refactor
-from d4bl.infra.database import init_db # noqa: E402 +from d4bl.infra import database as db async def create_evaluation_results_table(): ... - init_db() - from d4bl.infra.database import engine # noqa: E402 - - if engine is None: + db.init_db() + + if db.engine is None: ... - async with engine.begin() as conn: + async with db.engine.begin() as conn:Also applies to: 26-26
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/add_evaluation_results_table.py` at line 19, Replace the top-level import of init_db with the module-alias pattern used elsewhere: import the database module as db (from d4bl.infra import database as db), then call db.init_db() and access db.engine after initialization; this avoids re-importing engine later and prevents a stale None reference while keeping consistency with scripts like add_job_id_to_evaluations.py and add_trace_id_to_research_jobs.py.scripts/add_research_data_column.py (1)
21-21: Consider the module-alias pattern for consistency with peer scripts.The delayed in-function import of
engine(afterinit_db()) works but is non-idiomatic.scripts/add_job_id_to_evaluations.pyandscripts/add_trace_id_to_research_jobs.pyusefrom d4bl.infra import database as dband accessdb.enginedirectly, which is cleaner and consistent.♻️ Suggested refactor
-from d4bl.infra.database import init_db +from d4bl.infra import database as db async def add_research_data_column(): ... - init_db() - from d4bl.infra.database import engine - if engine is None: + db.init_db() + if db.engine is None: ... - async with engine.begin() as conn: + async with db.engine.begin() as conn:Also applies to: 33-33
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/add_research_data_column.py` at line 21, Replace the delayed in-function import of engine with the module-alias pattern used elsewhere: import the database module at top-level with "from d4bl.infra import database as db", call init_db() as before, and then reference db.engine (instead of performing a local "from d4bl.infra.database import engine" after init_db()). Update any uses of the late-imported symbol to use db.engine and remove the in-function import so the script matches the pattern used by the other migration scripts (init_db, db.engine).docker-compose.observability.yml (1)
143-143: Add--no-auth-warningto suppress per-probe stderr noise.
redis-cli -a <password>prints a warning to stderr on every invocation. With a 3 s interval this generates ~20 log lines per minute per host in container logs.♻️ One-line fix
- test: ["CMD-SHELL", "redis-cli -a \"${REDIS_AUTH:-myredissecret}\" ping"] + test: ["CMD-SHELL", "redis-cli --no-auth-warning -a \"${REDIS_AUTH:-myredissecret}\" ping"]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose.observability.yml` at line 143, The readiness probe command using redis-cli (the test line containing redis-cli -a "${REDIS_AUTH:-myredissecret}" ping) emits repetitive stderr auth warnings; update that probe command to include the --no-auth-warning flag so redis-cli -a ... --no-auth-warning ping is used to suppress per-probe stderr noise.src/d4bl/agents/crew.py (1)
65-74: AnnotateTASK_ORDER(andAGENT_TASK_MAP) withClassVarto silence RUF012.Both are intended to be class-level constants, not per-instance mutable defaults. Without
ClassVar, tools (including Ruff and CrewAI's metaclass/@CrewBase) may treat them as instance-field defaults.♻️ Proposed fix
-from typing import List, Optional +from typing import ClassVar, Dict, List, Optional ... # Agent to task mapping - AGENT_TASK_MAP = { + AGENT_TASK_MAP: ClassVar[Dict[str, str]] = { "researcher": "research_task", ... } - TASK_ORDER = [ + TASK_ORDER: ClassVar[List[str]] = [ "research_task", ... ]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/agents/crew.py` around lines 65 - 74, TASK_ORDER and AGENT_TASK_MAP are class-level constants but are currently unannotated causing RUF012; update their type annotations to use typing.ClassVar (e.g. TASK_ORDER: ClassVar[List[str]] = [...] and AGENT_TASK_MAP: ClassVar[Dict[..., ...]] = {...}) to mark them as class variables, and add the necessary import (from typing import ClassVar, List, Dict, ...). Locate the names TASK_ORDER and AGENT_TASK_MAP in src/d4bl/agents/crew.py and change their annotations accordingly, choosing the appropriate container element types for AGENT_TASK_MAP's values (e.g., Type[SomeAgentClass] or Callable) to preserve existing typing.tests/test_crew_task_ordering.py (2)
4-15: Movesys.pathmanipulation to module level; use precise type hints.Calling
sys.path.insert(0, 'src')inside the helper means it runs once per test call, accumulating duplicate entries on each invocation. The import can also be hoisted to the top of the file.Additionally,
listbare type hints should beList[str]for consistency with the rest of the codebase.♻️ Proposed refactor
"""Tests that selected agent task ordering is deterministic and sequential.""" +import sys +from typing import List + +sys.path.insert(0, 'src') +from d4bl.agents.crew import D4Bl -def _get_ordered_task_names(selected_agents: list) -> list: +def _get_ordered_task_names(selected_agents: List[str]) -> List[str]: """Return the task names that would run for the given selected agents, in order.""" - import sys - sys.path.insert(0, 'src') - from d4bl.agents.crew import D4Bl selected_task_names = { D4Bl.AGENT_TASK_MAP[name] for name in selected_agents if name in D4Bl.AGENT_TASK_MAP } return [t for t in D4Bl.TASK_ORDER if t in selected_task_names]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_crew_task_ordering.py` around lines 4 - 15, Move the sys.path manipulation and the import of D4Bl out of the helper and to module-level (so sys.path.insert(0, 'src') runs once) and update the helper signature to use precise typing: change selected_agents: list -> selected_agents: List[str] and the return type -> List[str]; ensure you import typing.List at module top and still reference D4Bl, D4Bl.AGENT_TASK_MAP, and D4Bl.TASK_ORDER inside _get_ordered_task_names.
35-41: Redundantsys.pathinsert and re-import intest_all_agents_full_canonical_order.
_get_ordered_task_namesalready importsD4Bl. Lines 36-38 duplicate that setup unnecessarily; using the helper is sufficient here.♻️ Proposed fix (after hoisting import as above)
def test_all_agents_full_canonical_order(): - import sys - sys.path.insert(0, 'src') - from d4bl.agents.crew import D4Bl all_agents = list(D4Bl.AGENT_TASK_MAP.keys()) names = _get_ordered_task_names(all_agents) assert names == D4Bl.TASK_ORDER🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_crew_task_ordering.py` around lines 35 - 41, The test_all_agents_full_canonical_order contains redundant sys.path insertion and local import of D4Bl; remove lines that manipulate sys.path and the from d4bl.agents.crew import D4Bl import, and rely on the helper _get_ordered_task_names which already imports D4Bl internally; keep using D4Bl.AGENT_TASK_MAP to build all_agents and assert against D4Bl.TASK_ORDER as before so the test calls _get_ordered_task_names(all_agents) and asserts names == D4Bl.TASK_ORDER without re-importing or altering sys.path.scripts/archive_and_wipe_db.py (1)
182-185:return True/return Falsefix is correct —sys.exitnow reflects actual outcome.The prior code without explicit
returnhadmain()always resolve toNone(falsy), causingsys.exit(1)unconditionally. This is now fixed.Consider adding return type annotations to match coding guidelines:
🔧 Optional: add return type hints
-async def export_data_to_csv(): +async def export_data_to_csv() -> None: -async def wipe_database(): +async def wipe_database() -> bool: -async def main(no_archive=False, skip_confirm=False): +async def main(no_archive: bool = False, skip_confirm: bool = False) -> bool:As per coding guidelines, type hints should be used in Python code.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/archive_and_wipe_db.py` around lines 182 - 185, Add explicit return type annotations to the function(s) that now return True/False (e.g., the main function) to follow project typing guidelines; update the signature of main (and any other functions changed in this diff that return a boolean) to include -> bool and run a quick mypy/flake8 check to ensure signatures and any callers match the new annotations.src/d4bl/evals/runner.py (1)
56-79: Ruff B023:semcaptured by closure defined insideasync forloop; restructure to avoid the smell.
semis not the loop variable, and thebreakat line 80 guarantees a single iteration, so the closure is safe in practice. However, defining_evaluate_jobinside theasync for db in get_db():loop triggers B023 and obscures the relationship between the semaphore and the gather. Movingsemand_evaluate_joboutside the loop body (or using the session directly viaasync_session_maker) eliminates the lint warning and makes the control flow clearer:♻️ Suggested restructure
async def run_evals_and_log(...) -> None: ... init_db() + sem = asyncio.Semaphore(concurrency) + + async def _evaluate_job(job: ResearchJob) -> None: + async with sem: + result_dict = job.to_dict() + research_output = str(result_dict.get("result") or "") + if not research_output: + logger.warning("Job %s has no result, skipping.", job.job_id) + return + logger.info("Running evaluations for job %s: %s...", job.job_id, job.query[:60]) + await asyncio.to_thread( + run_comprehensive_evaluation, + query=job.query, + research_output=research_output, + sources=[], + trace_id=str(job.job_id), + ) async for db in get_db(): query = select(ResearchJob).where(ResearchJob.status == "completed") ... jobs = result.scalars().all() if not jobs: ... return logger.info("Evaluating %d research job(s)...", len(jobs)) - sem = asyncio.Semaphore(concurrency) - - async def _evaluate_job(job: ResearchJob) -> None: - async with sem: - ... - - await asyncio.gather(*[_evaluate_job(j) for j in jobs]) + await asyncio.gather(*[_evaluate_job(j) for j in jobs], return_exceptions=True) logger.info("Evaluation run complete.") break🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/evals/runner.py` around lines 56 - 79, Move the semaphore and the nested coroutine out of the loop to avoid capturing sem in a closure: create sem = asyncio.Semaphore(concurrency) before entering the async for get_db() loop and define the async function _evaluate_job(job: ResearchJob) at module/function scope (outside the loop) so it references that sem rather than being recreated per iteration; keep the body that checks job.to_dict()/research_output, logs via logger.info/warning, and calls run_comprehensive_evaluation(query=..., research_output=..., sources=[], trace_id=str(job.job_id)), then collect jobs and call await asyncio.gather(*[_evaluate_job(j) for j in jobs]) after the loop (or use async_session_maker to obtain the session outside and pass needed context into _evaluate_job) to eliminate the B023 lint warning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/add_evaluation_results_table.py`:
- Line 19: Remove the unused "# noqa: E402" directives from the top-level import
statements; specifically delete the trailing "# noqa: E402" on the line
importing init_db (from d4bl.infra.database import init_db) and any other
imports in this module that include the same directive, leaving the normal
imports intact so Ruff no longer reports RUF100/no-op noqa comments.
In `@scripts/add_trace_id_to_research_jobs.py`:
- Line 13: The import line "from d4bl.infra import database as db # noqa: E402"
contains an unnecessary noqa directive for E402; remove the trailing " # noqa:
E402" so the line becomes a normal import (i.e., keep the import statement using
database as db but drop the unused noqa comment) to satisfy Ruff RUF100.
In `@scripts/test_db_connection.py`:
- Line 14: The import currently binds async_session_maker to None at import time
causing a stale binding; change the file to import the database module instead
of the symbol (e.g., import d4bl.infra.database as database or db) and then call
init_db() and use database.async_session_maker() (and database.ResearchJob /
database.get_database_url if referenced) so the name resolves to the updated
callable set by init_db; alternatively, if you must keep the symbol import,
re-import or rebind async_session_maker from the database module after calling
init_db() so the callable is not None.
In `@src/d4bl/evals/runner.py`:
- Around line 71-76: The call to the synchronous run_comprehensive_evaluation
inside async def _evaluate_job blocks the event loop and prevents concurrency;
update the call to run_comprehensive_evaluation(...) so it is executed via
asyncio.to_thread (or an equivalent thread-executor) to offload its blocking
HTTP calls to a worker thread, e.g., replace the direct invocation with await
asyncio.to_thread(run_comprehensive_evaluation, query=job.query,
research_output=research_output, sources=[], trace_id=str(job.job_id)) so the
coroutine yields control and asyncio.gather can run evaluations concurrently.
- Line 78: Change the concurrent evaluation call to use asyncio.gather(...,
return_exceptions=True) so one failure doesn't cancel the rest: call
asyncio.gather(*[_evaluate_job(j) for j in jobs], return_exceptions=True), then
iterate over the returned results and explicitly log any exceptions (e.g., with
process/logger.error), including context about which job (use the job or its id
from jobs) produced the exception; keep successful results as-is and do not let
a single _evaluate_job exception abort the run.
In `@src/d4bl/infra/database.py`:
- Line 107: The print call in database.py uses an unnecessary f-string prefix
with no placeholders (Ruff F541); change the print(f"...") invocation to a
regular string print("...") for the warning message (locate the print statement
that currently reads the localhost/Docker warning and remove the leading 'f').
---
Nitpick comments:
In `@docker-compose.observability.yml`:
- Line 143: The readiness probe command using redis-cli (the test line
containing redis-cli -a "${REDIS_AUTH:-myredissecret}" ping) emits repetitive
stderr auth warnings; update that probe command to include the --no-auth-warning
flag so redis-cli -a ... --no-auth-warning ping is used to suppress per-probe
stderr noise.
In `@scripts/add_evaluation_results_table.py`:
- Line 19: Replace the top-level import of init_db with the module-alias pattern
used elsewhere: import the database module as db (from d4bl.infra import
database as db), then call db.init_db() and access db.engine after
initialization; this avoids re-importing engine later and prevents a stale None
reference while keeping consistency with scripts like
add_job_id_to_evaluations.py and add_trace_id_to_research_jobs.py.
In `@scripts/add_research_data_column.py`:
- Line 21: Replace the delayed in-function import of engine with the
module-alias pattern used elsewhere: import the database module at top-level
with "from d4bl.infra import database as db", call init_db() as before, and then
reference db.engine (instead of performing a local "from d4bl.infra.database
import engine" after init_db()). Update any uses of the late-imported symbol to
use db.engine and remove the in-function import so the script matches the
pattern used by the other migration scripts (init_db, db.engine).
In `@scripts/archive_and_wipe_db.py`:
- Around line 182-185: Add explicit return type annotations to the function(s)
that now return True/False (e.g., the main function) to follow project typing
guidelines; update the signature of main (and any other functions changed in
this diff that return a boolean) to include -> bool and run a quick mypy/flake8
check to ensure signatures and any callers match the new annotations.
In `@src/d4bl/agents/crew.py`:
- Around line 65-74: TASK_ORDER and AGENT_TASK_MAP are class-level constants but
are currently unannotated causing RUF012; update their type annotations to use
typing.ClassVar (e.g. TASK_ORDER: ClassVar[List[str]] = [...] and
AGENT_TASK_MAP: ClassVar[Dict[..., ...]] = {...}) to mark them as class
variables, and add the necessary import (from typing import ClassVar, List,
Dict, ...). Locate the names TASK_ORDER and AGENT_TASK_MAP in
src/d4bl/agents/crew.py and change their annotations accordingly, choosing the
appropriate container element types for AGENT_TASK_MAP's values (e.g.,
Type[SomeAgentClass] or Callable) to preserve existing typing.
In `@src/d4bl/evals/runner.py`:
- Around line 56-79: Move the semaphore and the nested coroutine out of the loop
to avoid capturing sem in a closure: create sem = asyncio.Semaphore(concurrency)
before entering the async for get_db() loop and define the async function
_evaluate_job(job: ResearchJob) at module/function scope (outside the loop) so
it references that sem rather than being recreated per iteration; keep the body
that checks job.to_dict()/research_output, logs via logger.info/warning, and
calls run_comprehensive_evaluation(query=..., research_output=..., sources=[],
trace_id=str(job.job_id)), then collect jobs and call await
asyncio.gather(*[_evaluate_job(j) for j in jobs]) after the loop (or use
async_session_maker to obtain the session outside and pass needed context into
_evaluate_job) to eliminate the B023 lint warning.
In `@src/d4bl/infra/database.py`:
- Line 100: The if condition using db_host contains a redundant check: remove
the impossible clause db_host != "host.docker.internal" and simplify the guard
so it only checks whether db_host is "localhost" or "127.0.0.1" (e.g., replace
the expression in the if that currently reads the three-part boolean with a
concise test like db_host in ("localhost", "127.0.0.1")). Keep the surrounding
logic unchanged and ensure any downstream code still executes under the same
intended local-host-only branch.
In `@tests/test_crew_task_ordering.py`:
- Around line 4-15: Move the sys.path manipulation and the import of D4Bl out of
the helper and to module-level (so sys.path.insert(0, 'src') runs once) and
update the helper signature to use precise typing: change selected_agents: list
-> selected_agents: List[str] and the return type -> List[str]; ensure you
import typing.List at module top and still reference D4Bl, D4Bl.AGENT_TASK_MAP,
and D4Bl.TASK_ORDER inside _get_ordered_task_names.
- Around line 35-41: The test_all_agents_full_canonical_order contains redundant
sys.path insertion and local import of D4Bl; remove lines that manipulate
sys.path and the from d4bl.agents.crew import D4Bl import, and rely on the
helper _get_ordered_task_names which already imports D4Bl internally; keep using
D4Bl.AGENT_TASK_MAP to build all_agents and assert against D4Bl.TASK_ORDER as
before so the test calls _get_ordered_task_names(all_agents) and asserts names
== D4Bl.TASK_ORDER without re-importing or altering sys.path.
- Fix import-time binding in test_db_connection.py by accessing async_session_maker via module reference after init_db() runs - Wrap synchronous run_comprehensive_evaluation in asyncio.to_thread to avoid blocking the event loop in evals/runner.py - Add return_exceptions=True to asyncio.gather and log per-job failures - Remove spurious f-prefix from non-interpolated print in database.py - Remove unnecessary # noqa: E402 comments from migration scripts - Move sys.path.insert and D4Bl import to module level in test file
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (4)
src/d4bl/evals/runner.py (3)
84-84:zip()missingstrict=True.While
asyncio.gatherguaranteeslen(results) == len(jobs), addingstrict=Truemakes the invariant explicit and catches future regressions.♻️ Proposed fix
- for job, outcome in zip(jobs, results): + for job, outcome in zip(jobs, results, strict=True):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/evals/runner.py` at line 84, Update the zip call to enforce the jobs/results length invariant by using zip(..., strict=True); specifically change the loop that currently reads "for job, outcome in zip(jobs, results):" to "for job, outcome in zip(jobs, results, strict=True):" so any future length mismatch is raised immediately—apply this change where the variables jobs and results are iterated (in the loop in runner.py).
56-88: Ruff B023:_evaluate_jobcapturessemfrom the loop body — refactor to move both outside theasync forblock.
semis assigned inside theasync for db in get_db()loop body, and_evaluate_jobcloses over it by reference. If the loop ever runs more than once, all closure instances would share the final binding ofsem. While the currentbreakprevents this, the structure is fragile.A secondary concern:
async for db in get_db(): ... breakis an unconventional pattern —get_db()is designed as a FastAPI dependency generator, and thebreakis the only reason session cleanup works here. Using_db.async_session_maker()directly (after importing_dbat module level) is cleaner and removes both issues.♻️ Proposed refactor
+import d4bl.infra.database as _db +from d4bl.infra.database import ResearchJob, init_db ... async def run_evals_and_log(...) -> None: ... init_db() + sem = asyncio.Semaphore(concurrency) + + async def _evaluate_job(job: ResearchJob) -> None: + async with sem: + result_dict = job.to_dict() + research_output = str(result_dict.get("result") or "") + if not research_output: + logger.warning("Job %s has no result, skipping.", job.job_id) + return + logger.info("Running evaluations for job %s: %s...", job.job_id, job.query[:60]) + await asyncio.to_thread( + run_comprehensive_evaluation, + query=job.query, + research_output=research_output, + sources=[], + trace_id=str(job.job_id), + ) + - async for db in get_db(): - query = select(ResearchJob).where(ResearchJob.status == "completed") - ... - sem = asyncio.Semaphore(concurrency) - - async def _evaluate_job(job: ResearchJob) -> None: - async with sem: - ... - - results = await asyncio.gather( - *[_evaluate_job(j) for j in jobs], return_exceptions=True - ) - ... - break + async with _db.async_session_maker() as db: + query = select(ResearchJob).where(ResearchJob.status == "completed") + if selected_job_ids: + query = query.where(ResearchJob.job_id.in_(selected_job_ids)) + if max_rows is not None: + query = query.limit(max_rows) + result = await db.execute(query) + jobs = result.scalars().all() + + if not jobs: + logger.info("No completed research jobs found to evaluate.") + return + + logger.info("Evaluating %d research job(s)...", len(jobs)) + results = await asyncio.gather( + *[_evaluate_job(j) for j in jobs], return_exceptions=True + ) + for job, outcome in zip(jobs, results, strict=True): + if isinstance(outcome, BaseException): + logger.error("Evaluation failed for job %s: %s", job.job_id, outcome) + logger.info("Evaluation run complete.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/evals/runner.py` around lines 56 - 88, The semaphore sem and the inner async function _evaluate_job are currently created inside the async for db in get_db() loop and capture sem by closure, which is fragile; move the sem = asyncio.Semaphore(concurrency) and the async def _evaluate_job(job: ResearchJob) definition outside the async for block so they aren’t rebound on each iteration, and replace the async for db in get_db() usage with a direct session maker call (use _db.async_session_maker() imported at module level) to create the session instead of relying on the generator+break pattern; ensure the rest of the logic (await asyncio.gather, result handling, logging) remains unchanged and that session cleanup uses the explicit session maker/context manager.
56-88: Ruff B023:_evaluate_jobcapturessemfrom theasync forloop body — refactor both outside the loop.
semis defined in theasync for db in get_db()loop body and captured by reference in_evaluate_job. While thebreakat line 88 ensures the loop only ever executes once, this is fragile: any future change removingbreakor adding a second yield toget_db()would cause all closures to unexpectedly share the last binding ofsem.A secondary concern:
async for db in get_db(): ... breakis an unconventional direct usage of a FastAPI-style dependency generator. Calling_db.async_session_maker()directly (after aliasing the module) is both idiomatic and removes the B023 concern simultaneously, sincesemand_evaluate_jobcan be hoisted above the session context.♻️ Proposed refactor
+import d4bl.infra.database as _db from d4bl.infra.database import ResearchJob, init_db ... async def run_evals_and_log(...) -> None: ... init_db() + + sem = asyncio.Semaphore(concurrency) + + async def _evaluate_job(job: ResearchJob) -> None: + async with sem: + result_dict = job.to_dict() + research_output = str(result_dict.get("result") or "") + if not research_output: + logger.warning("Job %s has no result, skipping.", job.job_id) + return + logger.info("Running evaluations for job %s: %s...", job.job_id, job.query[:60]) + await asyncio.to_thread( + run_comprehensive_evaluation, + query=job.query, + research_output=research_output, + sources=[], + trace_id=str(job.job_id), + ) - async for db in get_db(): - query = select(ResearchJob).where(ResearchJob.status == "completed") - if selected_job_ids: - query = query.where(ResearchJob.job_id.in_(selected_job_ids)) - if max_rows: - query = query.limit(max_rows) - result = await db.execute(query) - jobs = result.scalars().all() - ... - sem = asyncio.Semaphore(concurrency) - async def _evaluate_job(job: ResearchJob) -> None: - async with sem: - ... - results = await asyncio.gather(...) - ... - break + async with _db.async_session_maker() as db: + query = select(ResearchJob).where(ResearchJob.status == "completed") + if selected_job_ids: + query = query.where(ResearchJob.job_id.in_(selected_job_ids)) + if max_rows is not None: + query = query.limit(max_rows) + result = await db.execute(query) + jobs = result.scalars().all() + + if not jobs: + logger.info("No completed research jobs found to evaluate.") + return + + logger.info("Evaluating %d research job(s)...", len(jobs)) + results = await asyncio.gather( + *[_evaluate_job(j) for j in jobs], return_exceptions=True + ) + for job, outcome in zip(jobs, results, strict=True): + if isinstance(outcome, BaseException): + logger.error("Evaluation failed for job %s: %s", job.job_id, outcome) + logger.info("Evaluation run complete.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/evals/runner.py` around lines 56 - 88, The closure _evaluate_job is capturing sem from inside the async for get_db() loop which is fragile; move the semaphore creation (sem = asyncio.Semaphore(concurrency)) and the async def _evaluate_job(...) definition above the database/session acquisition so they are not bound inside the loop, and replace the async for db in get_db() usage by calling the session factory directly (use _db.async_session_maker() or equivalent to create an async session with "async with") to acquire the DB session; ensure sem and _evaluate_job are hoisted before opening the session and that the session is passed or accessed from inside the loop body so closures no longer capture loop-local bindings.src/d4bl/infra/database.py (1)
100-100: Redundant guarddb_host != "host.docker.internal"is alwaysTruehere.The left side of
andalready constrainsdb_hostto"localhost"or"127.0.0.1", so it can never equal"host.docker.internal". The condition and its associated comment are dead code.♻️ Proposed simplification
- if (db_host == "localhost" or db_host == "127.0.0.1") and db_host != "host.docker.internal": + if db_host in ("localhost", "127.0.0.1"):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/infra/database.py` at line 100, The condition checking db_host contains a redundant clause: in the if statement that tests (db_host == "localhost" or db_host == "127.0.0.1") remove the unnecessary "and db_host != \"host.docker.internal\"" because it can never be true and is dead code; update any associated comment to reflect the simplified check and, if the original intent was to allow "host.docker.internal", replace the condition with an explicit inclusion (e.g., include db_host == "host.docker.internal") instead of the negation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/d4bl/evals/runner.py`:
- Around line 44-45: The guard incorrectly treats max_rows=0 as "no limit";
update the condition around the call to query.limit(max_rows) (in the function
containing variables `query` and `max_rows` in src/d4bl/evals/runner.py) to
explicitly check for a None value (e.g., use `if max_rows is not None:`) so that
zero is honored and `.limit(0)` is invoked when requested.
- Around line 44-45: The current truthy check skips applying query.limit when
max_rows is 0; in the function run_evals_and_log replace the condition "if
max_rows:" with an explicit presence check (e.g., "if max_rows is not None") so
query.limit(max_rows) is called for 0, and optionally add a simple validation to
ensure max_rows is a non-negative integer before calling query.limit(max_rows)
to avoid negative values. Ensure you update the branch that calls
query.limit(max_rows) and keep the variable names max_rows and query.limit
exactly as used.
In `@src/d4bl/infra/database.py`:
- Around line 105-107: The two long warning print statements in
src/d4bl/infra/database.py exceed the 100-char limit; replace each long
single-line print with shorter messages under 100 chars (for example split into
two prints: a short "⚠ Warning: Detected Docker environment" header and a second
f-string containing "using 'postgres' as hostname instead of
'{original_host}'"), and do the same for the else branch (e.g. "⚠ Warning: Using
'localhost' as database host." followed by "In Docker, use 'postgres' or
'host.docker.internal'") so both lines stay within the limit while preserving
original_host interpolation.
---
Nitpick comments:
In `@src/d4bl/evals/runner.py`:
- Line 84: Update the zip call to enforce the jobs/results length invariant by
using zip(..., strict=True); specifically change the loop that currently reads
"for job, outcome in zip(jobs, results):" to "for job, outcome in zip(jobs,
results, strict=True):" so any future length mismatch is raised
immediately—apply this change where the variables jobs and results are iterated
(in the loop in runner.py).
- Around line 56-88: The semaphore sem and the inner async function
_evaluate_job are currently created inside the async for db in get_db() loop and
capture sem by closure, which is fragile; move the sem =
asyncio.Semaphore(concurrency) and the async def _evaluate_job(job: ResearchJob)
definition outside the async for block so they aren’t rebound on each iteration,
and replace the async for db in get_db() usage with a direct session maker call
(use _db.async_session_maker() imported at module level) to create the session
instead of relying on the generator+break pattern; ensure the rest of the logic
(await asyncio.gather, result handling, logging) remains unchanged and that
session cleanup uses the explicit session maker/context manager.
- Around line 56-88: The closure _evaluate_job is capturing sem from inside the
async for get_db() loop which is fragile; move the semaphore creation (sem =
asyncio.Semaphore(concurrency)) and the async def _evaluate_job(...) definition
above the database/session acquisition so they are not bound inside the loop,
and replace the async for db in get_db() usage by calling the session factory
directly (use _db.async_session_maker() or equivalent to create an async session
with "async with") to acquire the DB session; ensure sem and _evaluate_job are
hoisted before opening the session and that the session is passed or accessed
from inside the loop body so closures no longer capture loop-local bindings.
In `@src/d4bl/infra/database.py`:
- Line 100: The condition checking db_host contains a redundant clause: in the
if statement that tests (db_host == "localhost" or db_host == "127.0.0.1")
remove the unnecessary "and db_host != \"host.docker.internal\"" because it can
never be true and is dead code; update any associated comment to reflect the
simplified check and, if the original intent was to allow
"host.docker.internal", replace the condition with an explicit inclusion (e.g.,
include db_host == "host.docker.internal") instead of the negation.
| if max_rows: | ||
| query = query.limit(max_rows) |
There was a problem hiding this comment.
if max_rows: incorrectly treats max_rows=0 as "no limit".
0 is falsy, so passing max_rows=0 skips the .limit() clause and returns all rows instead of none.
🐛 Proposed fix
- if max_rows:
+ if max_rows is not None:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/d4bl/evals/runner.py` around lines 44 - 45, The guard incorrectly treats
max_rows=0 as "no limit"; update the condition around the call to
query.limit(max_rows) (in the function containing variables `query` and
`max_rows` in src/d4bl/evals/runner.py) to explicitly check for a None value
(e.g., use `if max_rows is not None:`) so that zero is honored and `.limit(0)`
is invoked when requested.
if max_rows: silently skips the limit when max_rows=0.
0 is falsy, so calling run_evals_and_log(max_rows=0) bypasses .limit() and returns all rows.
🐛 Proposed fix
- if max_rows:
+ if max_rows is not None:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/d4bl/evals/runner.py` around lines 44 - 45, The current truthy check
skips applying query.limit when max_rows is 0; in the function run_evals_and_log
replace the condition "if max_rows:" with an explicit presence check (e.g., "if
max_rows is not None") so query.limit(max_rows) is called for 0, and optionally
add a simple validation to ensure max_rows is a non-negative integer before
calling query.limit(max_rows) to avoid negative values. Ensure you update the
branch that calls query.limit(max_rows) and keep the variable names max_rows and
query.limit exactly as used.
| print(f"⚠ Warning: Detected Docker environment, using 'postgres' as hostname instead of '{original_host}'") | ||
| else: | ||
| print(f"⚠ Warning: Using 'localhost' as database host. In Docker, this should be 'postgres'") | ||
| print("⚠ Warning: Using 'localhost' as database host. In Docker, this should be 'postgres' or 'host.docker.internal'") |
There was a problem hiding this comment.
Both print strings exceed the 100-character line limit.
Lines 105 and 107 are approximately 119 and 128 characters, respectively.
♻️ Proposed fix
- print(f"⚠ Warning: Detected Docker environment, using 'postgres' as hostname instead of '{original_host}'")
+ print(
+ f"⚠ Warning: Detected Docker environment, "
+ f"using 'postgres' as hostname instead of '{original_host}'"
+ )- print("⚠ Warning: Using 'localhost' as database host. In Docker, this should be 'postgres' or 'host.docker.internal'")
+ print(
+ "⚠ Warning: Using 'localhost' as database host. "
+ "In Docker, this should be 'postgres' or 'host.docker.internal'"
+ )As per coding guidelines: "Enforce 100 character maximum line length".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/d4bl/infra/database.py` around lines 105 - 107, The two long warning
print statements in src/d4bl/infra/database.py exceed the 100-char limit;
replace each long single-line print with shorter messages under 100 chars (for
example split into two prints: a short "⚠ Warning: Detected Docker environment"
header and a second f-string containing "using 'postgres' as hostname instead of
'{original_host}'"), and do the same for the else branch (e.g. "⚠ Warning: Using
'localhost' as database host." followed by "In Docker, use 'postgres' or
'host.docker.internal'") so both lines stay within the limit while preserving
original_host interpolation.
- Refactor evals/runner.py: hoist sem and _evaluate_job outside the
session block, replace async-for-get_db()+break with direct
async_session_maker context manager (fixes Ruff B023 closure capture)
- Fix max_rows=0 treated as "no limit": use `if max_rows is not None:`
- Add strict=True to zip(jobs, results) for invariant enforcement
- Simplify database.py guard: replace redundant 3-part boolean with
`db_host in ("localhost", "127.0.0.1")`
- Break long warning print strings to stay within 100-char line limit
* fix: Iteration 2 — Infrastructure cleanup (imports, task ordering, exit codes, Redis) (#13) * fix: Correct import paths in migration scripts (d4bl.database → d4bl.infra.database) * fix: Fix remaining broken import paths in test_db_connection and add_trace_id scripts * fix: Correct import path in crew.py (d4bl.tools → d4bl.agents.tools) * fix: Return True from archive_and_wipe_db.py main() on success so exit code is 0 * fix: Log original db_host before overwriting it in Docker detection * fix: Authenticate redis-cli in healthcheck when requirepass is set * fix: Preserve sequential task order when filtering by selected_agents in crew.py * fix: Create d4bl.evals.runner with Langfuse-backed run_evals_and_log so run_evals.py imports correctly * fix: Address CodeRabbit PR #13 review issues - Fix import-time binding in test_db_connection.py by accessing async_session_maker via module reference after init_db() runs - Wrap synchronous run_comprehensive_evaluation in asyncio.to_thread to avoid blocking the event loop in evals/runner.py - Add return_exceptions=True to asyncio.gather and log per-job failures - Remove spurious f-prefix from non-interpolated print in database.py - Remove unnecessary # noqa: E402 comments from migration scripts - Move sys.path.insert and D4Bl import to module level in test file * fix: Address CodeRabbit PR #13 second-round review issues - Refactor evals/runner.py: hoist sem and _evaluate_job outside the session block, replace async-for-get_db()+break with direct async_session_maker context manager (fixes Ruff B023 closure capture) - Fix max_rows=0 treated as "no limit": use `if max_rows is not None:` - Add strict=True to zip(jobs, results) for invariant enforcement - Simplify database.py guard: replace redundant 3-part boolean with `db_host in ("localhost", "127.0.0.1")` - Break long warning print strings to stay within 100-char line limit * docs: Add Explore Data feature design document * docs: Add Explore Data page implementation plan * feat: Add CensusIndicator and PolicyBill database models * feat: Add Explore Data page with census indicators, policy bills, and interactive dashboard Implement the full /explore feature: three new API endpoints (indicators, policies, states), Pydantic schemas, Census ACS and OpenStates ingestion scripts, and a Next.js dashboard with StateMap, RacialGapChart, MetricFilterPanel, and PolicyTable components. Add CodeRabbit config for automated PR reviews. * fix: Address CodeRabbit review feedback - Fix .coderabbit.yaml nesting (auto_incremental_review under auto_review) - Add AbortController to explore page fetches to prevent stale responses - Replace catch(e: any) with catch(e: unknown) and proper type narrowing - Add keyboard accessibility to StateMap (tabIndex, role, onKeyDown) - Add docstrings to IndicatorItem, PolicyBillItem, StateSummaryItem - Map introduced_date and last_action_date in OpenStates ingestion upsert - Isolate metric-required test to omit only the metric field - Add language tag to fenced code block in design doc - Split long metadata description line in layout.tsx
Summary
Closes all Iteration 2 items tracked in epic #12.
d4bl.database→d4bl.infra.databaseacrossinit_db.py,add_evaluation_results_table.py,add_job_id_to_evaluations.py,add_research_data_column.py,archive_and_wipe_db.py,test_db_connection.py,add_trace_id_to_research_jobs.pysrc/d4bl/crew.py:d4bl.tools→d4bl.agents.toolsarchive_and_wipe_db.py:main()now returnsTrueon success so the script exits 0 instead of always 1src/d4bl/agents/crew.py: AddTASK_ORDERlist; filterselected_tasksthrough it so sequentialProcessalways runsresearch_taskbeforeanalysis_taskbeforewriting_task(was non-deterministic set iteration)src/d4bl/infra/database.py: Captureoriginal_hostbefore reassigningdb_hostso the Docker detection log prints the correct original valuedocker-compose.observability.yml: Switch Redis healthcheck fromCMD ping(unauthenticated) toCMD-SHELLwith-aflag so it passes whenrequirepassis setsrc/d4bl/evals/runner.py(new): Langfuse-backedrun_evals_and_logsoscripts/run_evals.pyimports correctly instead of hittingModuleNotFoundErrorTest plan
pytest tests/ -v)python -c "from d4bl.evals.runner import run_evals_and_log"→ OKpython scripts/run_evals.py --help→ shows argparse help, no ImportErrortests/test_crew_task_ordering.pyverifyresearch_taskalways precedesanalysis_taskSummary by CodeRabbit
New Features
Bug Fixes
Tests
Chores