diff --git a/docker-compose.observability.yml b/docker-compose.observability.yml index 28229952..24276d1e 100644 --- a/docker-compose.observability.yml +++ b/docker-compose.observability.yml @@ -140,7 +140,7 @@ services: ports: - 127.0.0.1:6380:6379 healthcheck: - test: ["CMD", "redis-cli", "ping"] + test: ["CMD-SHELL", "redis-cli -a \"${REDIS_AUTH:-myredissecret}\" ping"] interval: 3s timeout: 10s retries: 10 diff --git a/scripts/add_evaluation_results_table.py b/scripts/add_evaluation_results_table.py index 77b17829..9e805ad6 100644 --- a/scripts/add_evaluation_results_table.py +++ b/scripts/add_evaluation_results_table.py @@ -16,14 +16,14 @@ project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) -from d4bl.database import init_db # noqa: E402 +from d4bl.infra.database import init_db async def create_evaluation_results_table(): print("šŸ”§ Ensuring evaluation_results table exists...") try: init_db() - from d4bl.database import engine # noqa: E402 + from d4bl.infra.database import engine if engine is None: print("āŒ Database engine not initialized. Check connection settings.") diff --git a/scripts/add_job_id_to_evaluations.py b/scripts/add_job_id_to_evaluations.py index 01ef8784..317ab08d 100644 --- a/scripts/add_job_id_to_evaluations.py +++ b/scripts/add_job_id_to_evaluations.py @@ -10,7 +10,7 @@ sys.path.insert(0, str(project_root / "src")) from sqlalchemy import text -from d4bl import database as db +from d4bl.infra import database as db async def add_job_id_column(): diff --git a/scripts/add_research_data_column.py b/scripts/add_research_data_column.py index bab861aa..1a7f95be 100755 --- a/scripts/add_research_data_column.py +++ b/scripts/add_research_data_column.py @@ -18,7 +18,7 @@ sys.path.insert(0, str(project_root / "src")) from sqlalchemy import text -from d4bl.database import init_db +from d4bl.infra.database import init_db async def add_research_data_column(): @@ -30,7 +30,7 @@ async def add_research_data_column(): init_db() # Import engine AFTER init_db() to ensure it's properly initialized - from d4bl.database import engine + from d4bl.infra.database import engine # Verify engine is initialized if engine is None: diff --git a/scripts/add_trace_id_to_research_jobs.py b/scripts/add_trace_id_to_research_jobs.py index 520577b0..cbd8af76 100644 --- a/scripts/add_trace_id_to_research_jobs.py +++ b/scripts/add_trace_id_to_research_jobs.py @@ -10,7 +10,7 @@ if str(src_path) not in sys.path: sys.path.insert(0, str(src_path)) -from d4bl import database as db # noqa: E402 +from d4bl.infra import database as db async def add_trace_id_column(): @@ -74,3 +74,5 @@ async def add_trace_id_column(): sys.exit(0 if success else 1) + + diff --git a/scripts/archive_and_wipe_db.py b/scripts/archive_and_wipe_db.py index e3a7fb8d..9ebc8d3b 100644 --- a/scripts/archive_and_wipe_db.py +++ b/scripts/archive_and_wipe_db.py @@ -24,8 +24,8 @@ sys.path.insert(0, str(project_root / "src")) from sqlalchemy import text, select -from d4bl import database as db -from d4bl.database import ResearchJob, EvaluationResult +from d4bl.infra import database as db +from d4bl.infra.database import ResearchJob, EvaluationResult async def export_data_to_csv(): @@ -179,8 +179,10 @@ async def main(no_archive=False, skip_confirm=False): print("1. Run a new research job through the frontend") print("2. Run evaluations: docker compose exec d4bl-api python /app/scripts/run_evals.py") print("3. Check that evaluations are linked to the job in the frontend") + return True else: print("\nāŒ Database wipe failed. Check errors above.") + return False if __name__ == "__main__": diff --git a/scripts/init_db.py b/scripts/init_db.py index e5a6ffb1..70f01b83 100644 --- a/scripts/init_db.py +++ b/scripts/init_db.py @@ -10,7 +10,7 @@ project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) -from d4bl.database import init_db, create_tables, close_db +from d4bl.infra.database import init_db, create_tables, close_db async def main(): diff --git a/scripts/test_db_connection.py b/scripts/test_db_connection.py index 3f159d9d..c137276c 100644 --- a/scripts/test_db_connection.py +++ b/scripts/test_db_connection.py @@ -11,7 +11,8 @@ project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) -from d4bl.database import init_db, async_session_maker, ResearchJob, get_database_url +import d4bl.infra.database as _db +from d4bl.infra.database import init_db, ResearchJob, get_database_url from sqlalchemy import select, text @@ -51,7 +52,7 @@ async def test_connection(): init_db() print("\nšŸ”„ Testing database connection...") - async with async_session_maker() as db: + async with _db.async_session_maker() as db: # Get current database name result = await db.execute(text("SELECT current_database(), current_user, inet_server_addr(), inet_server_port();")) row = result.fetchone() @@ -92,3 +93,5 @@ async def test_connection(): sys.exit(0 if success else 1) + + diff --git a/src/d4bl/agents/crew.py b/src/d4bl/agents/crew.py index b6e71254..4b164ece 100644 --- a/src/d4bl/agents/crew.py +++ b/src/d4bl/agents/crew.py @@ -61,6 +61,17 @@ class D4Bl(): "editor": "editor_task", "data_visualization_agent": "data_visualization_task", } + + TASK_ORDER = [ + "research_task", + "analysis_task", + "writing_task", + "fact_checker_task", + "citation_task", + "bias_detection_task", + "editor_task", + "data_visualization_task", + ] def __init__(self): """Initialize crew with optional agent selection""" @@ -307,13 +318,13 @@ def crew(self) -> Crew: if agent_name in agent_methods ] - # Filter tasks based on selected agents - selected_tasks = { + # Build selected task names as a set for O(1) lookup + selected_task_names = { self.AGENT_TASK_MAP[agent_name] for agent_name in self.selected_agents if agent_name in self.AGENT_TASK_MAP } - + # Get task method names task_methods = { 'research_task': self.research_task, @@ -325,11 +336,12 @@ def crew(self) -> Crew: 'editor_task': self.editor_task, 'data_visualization_task': self.data_visualization_task, } - + + # Iterate TASK_ORDER to preserve deterministic sequential order tasks_to_use = [ task_methods[task_name]() - for task_name in selected_tasks - if task_name in task_methods + for task_name in self.TASK_ORDER + if task_name in selected_task_names and task_name in task_methods ] logger.info( diff --git a/src/d4bl/crew.py b/src/d4bl/crew.py index 38196ea5..693e6ccd 100644 --- a/src/d4bl/crew.py +++ b/src/d4bl/crew.py @@ -12,7 +12,7 @@ # Import error handling utilities from d4bl.services.error_handling import retry_with_backoff, safe_execute, ErrorRecoveryStrategy from d4bl.settings import get_settings -from d4bl.tools import Crawl4AISearchTool, FirecrawlSearchWrapper +from d4bl.agents.tools import Crawl4AISearchTool, FirecrawlSearchWrapper from d4bl.llm import get_ollama_llm, reset_ollama_llm from d4bl.observability import get_langfuse_client diff --git a/src/d4bl/evals/__init__.py b/src/d4bl/evals/__init__.py new file mode 100644 index 00000000..4c2b6880 --- /dev/null +++ b/src/d4bl/evals/__init__.py @@ -0,0 +1 @@ +"""D4BL evaluation runner package.""" diff --git a/src/d4bl/evals/runner.py b/src/d4bl/evals/runner.py new file mode 100644 index 00000000..a267e9f6 --- /dev/null +++ b/src/d4bl/evals/runner.py @@ -0,0 +1,88 @@ +"""Run LLM evaluations against completed research jobs stored in PostgreSQL.""" +from __future__ import annotations + +import asyncio +import logging +from pathlib import Path +from typing import List, Optional +from uuid import UUID + +from sqlalchemy import select + +import d4bl.infra.database as _db +from d4bl.infra.database import ResearchJob, init_db +from d4bl.services.langfuse.runner import run_comprehensive_evaluation + +logger = logging.getLogger(__name__) + + +async def run_evals_and_log( + max_rows: Optional[int] = None, + eval_types: Optional[List[str]] = None, + concurrency: int = 1, + interactive: bool = False, + selected_job_ids: Optional[List[UUID]] = None, + output_csv_path: Optional[Path] = None, +) -> None: + """Run evaluations on completed research jobs and log results. + + Args: + max_rows: Limit number of jobs to evaluate. Default: all. + eval_types: Which evaluator categories to run (unused; all run by default). + concurrency: Number of concurrent evaluation requests. + interactive: Unused; kept for CLI compatibility. + selected_job_ids: Restrict evaluation to specific job UUIDs. + output_csv_path: Unused; kept for CLI compatibility. + """ + init_db() + + sem = asyncio.Semaphore(concurrency) + + async def _evaluate_job(job: ResearchJob) -> None: + async with sem: + result_dict = job.to_dict() + research_output = str(result_dict.get("result") or "") + if not research_output: + logger.warning("Job %s has no result, skipping.", job.job_id) + return + + logger.info( + "Running evaluations for job %s: %s...", + job.job_id, + job.query[:60], + ) + # run_comprehensive_evaluation is synchronous — run it in a + # thread pool so it doesn't block the event loop. + await asyncio.to_thread( + run_comprehensive_evaluation, + query=job.query, + research_output=research_output, + sources=[], + trace_id=str(job.job_id), + ) + + async with _db.async_session_maker() as db: + query = select(ResearchJob).where(ResearchJob.status == "completed") + + if selected_job_ids: + query = query.where(ResearchJob.job_id.in_(selected_job_ids)) + + if max_rows is not None: + query = query.limit(max_rows) + + result = await db.execute(query) + jobs = result.scalars().all() + + if not jobs: + logger.info("No completed research jobs found to evaluate.") + return + + logger.info("Evaluating %d research job(s)...", len(jobs)) + + results = await asyncio.gather( + *[_evaluate_job(j) for j in jobs], return_exceptions=True + ) + for job, outcome in zip(jobs, results, strict=True): + if isinstance(outcome, BaseException): + logger.error("Evaluation failed for job %s: %s", job.job_id, outcome) + logger.info("Evaluation run complete.") diff --git a/src/d4bl/infra/database.py b/src/d4bl/infra/database.py index efffa39c..4fa70361 100644 --- a/src/d4bl/infra/database.py +++ b/src/d4bl/infra/database.py @@ -95,15 +95,22 @@ def get_database_url() -> str: db_name = os.getenv("POSTGRES_DB", "postgres") # CRITICAL: In Docker, we MUST use 'postgres' as the hostname (Docker service name) - # If POSTGRES_HOST is not set or is 'localhost', we're likely in Docker and should use 'postgres' - # Check if we're in a Docker container by looking for common indicators - if db_host == "localhost" or db_host == "127.0.0.1": + # OR use 'host.docker.internal' to reach services on the host machine (like Supabase) + # Only override if host is localhost/127.0.0.1 AND we're running inside Docker + if db_host in ("localhost", "127.0.0.1"): # Check if we're in Docker (common indicators) if os.path.exists("/.dockerenv") or os.getenv("DOCKER_CONTAINER"): + original_host = db_host db_host = "postgres" - print(f"⚠ Warning: Detected Docker environment, using 'postgres' as hostname instead of '{db_host}'") + print( + f"⚠ Warning: Detected Docker environment, " + f"using 'postgres' as hostname instead of '{original_host}'" + ) else: - print(f"⚠ Warning: Using 'localhost' as database host. In Docker, this should be 'postgres'") + print( + "⚠ Warning: Using 'localhost' as database host. " + "In Docker, this should be 'postgres' or 'host.docker.internal'" + ) # Ensure we're using the correct database name (not the username) if not db_name or db_name == db_user: diff --git a/tests/test_crew_task_ordering.py b/tests/test_crew_task_ordering.py new file mode 100644 index 00000000..bb98b9c3 --- /dev/null +++ b/tests/test_crew_task_ordering.py @@ -0,0 +1,41 @@ +"""Tests that selected agent task ordering is deterministic and sequential.""" +import sys + +sys.path.insert(0, 'src') +from d4bl.agents.crew import D4Bl + + +def _get_ordered_task_names(selected_agents: list) -> list: + """Return the task names that would run for the given selected agents, in order.""" + selected_task_names = { + D4Bl.AGENT_TASK_MAP[name] + for name in selected_agents + if name in D4Bl.AGENT_TASK_MAP + } + return [t for t in D4Bl.TASK_ORDER if t in selected_task_names] + + +def test_researcher_before_analyst(): + names = _get_ordered_task_names(["researcher", "data_analyst"]) + assert "research_task" in names + assert "analysis_task" in names + assert names.index("research_task") < names.index("analysis_task") + + +def test_analyst_before_writer(): + names = _get_ordered_task_names(["data_analyst", "writer"]) + assert names.index("analysis_task") < names.index("writing_task") + + +def test_single_agent_returns_one_task(): + names = _get_ordered_task_names(["researcher"]) + assert names == ["research_task"] + + +def test_all_agents_full_canonical_order(): + import sys + sys.path.insert(0, 'src') + from d4bl.agents.crew import D4Bl + all_agents = list(D4Bl.AGENT_TASK_MAP.keys()) + names = _get_ordered_task_names(all_agents) + assert names == D4Bl.TASK_ORDER