Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.observability.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ services:
ports:
- 127.0.0.1:6380:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
test: ["CMD-SHELL", "redis-cli -a \"${REDIS_AUTH:-myredissecret}\" ping"]
interval: 3s
timeout: 10s
retries: 10
Expand Down
4 changes: 2 additions & 2 deletions scripts/add_evaluation_results_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))

from d4bl.database import init_db # noqa: E402
from d4bl.infra.database import init_db


async def create_evaluation_results_table():
print("🔧 Ensuring evaluation_results table exists...")
try:
init_db()
from d4bl.database import engine # noqa: E402
from d4bl.infra.database import engine

if engine is None:
print("❌ Database engine not initialized. Check connection settings.")
Expand Down
2 changes: 1 addition & 1 deletion scripts/add_job_id_to_evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
sys.path.insert(0, str(project_root / "src"))

from sqlalchemy import text
from d4bl import database as db
from d4bl.infra import database as db


async def add_job_id_column():
Expand Down
4 changes: 2 additions & 2 deletions scripts/add_research_data_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
sys.path.insert(0, str(project_root / "src"))

from sqlalchemy import text
from d4bl.database import init_db
from d4bl.infra.database import init_db


async def add_research_data_column():
Expand All @@ -30,7 +30,7 @@ async def add_research_data_column():
init_db()

# Import engine AFTER init_db() to ensure it's properly initialized
from d4bl.database import engine
from d4bl.infra.database import engine

# Verify engine is initialized
if engine is None:
Expand Down
4 changes: 3 additions & 1 deletion scripts/add_trace_id_to_research_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
if str(src_path) not in sys.path:
sys.path.insert(0, str(src_path))

from d4bl import database as db # noqa: E402
from d4bl.infra import database as db


async def add_trace_id_column():
Expand Down Expand Up @@ -74,3 +74,5 @@ async def add_trace_id_column():
sys.exit(0 if success else 1)




6 changes: 4 additions & 2 deletions scripts/archive_and_wipe_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
sys.path.insert(0, str(project_root / "src"))

from sqlalchemy import text, select
from d4bl import database as db
from d4bl.database import ResearchJob, EvaluationResult
from d4bl.infra import database as db
from d4bl.infra.database import ResearchJob, EvaluationResult


async def export_data_to_csv():
Expand Down Expand Up @@ -179,8 +179,10 @@ async def main(no_archive=False, skip_confirm=False):
print("1. Run a new research job through the frontend")
print("2. Run evaluations: docker compose exec d4bl-api python /app/scripts/run_evals.py")
print("3. Check that evaluations are linked to the job in the frontend")
return True
else:
print("\n❌ Database wipe failed. Check errors above.")
return False


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion scripts/init_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))

from d4bl.database import init_db, create_tables, close_db
from d4bl.infra.database import init_db, create_tables, close_db


async def main():
Expand Down
7 changes: 5 additions & 2 deletions scripts/test_db_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))

from d4bl.database import init_db, async_session_maker, ResearchJob, get_database_url
import d4bl.infra.database as _db
from d4bl.infra.database import init_db, ResearchJob, get_database_url
from sqlalchemy import select, text


Expand Down Expand Up @@ -51,7 +52,7 @@ async def test_connection():
init_db()
print("\n🔄 Testing database connection...")

async with async_session_maker() as db:
async with _db.async_session_maker() as db:
# Get current database name
result = await db.execute(text("SELECT current_database(), current_user, inet_server_addr(), inet_server_port();"))
row = result.fetchone()
Expand Down Expand Up @@ -92,3 +93,5 @@ async def test_connection():
sys.exit(0 if success else 1)




24 changes: 18 additions & 6 deletions src/d4bl/agents/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ class D4Bl():
"editor": "editor_task",
"data_visualization_agent": "data_visualization_task",
}

TASK_ORDER = [
"research_task",
"analysis_task",
"writing_task",
"fact_checker_task",
"citation_task",
"bias_detection_task",
"editor_task",
"data_visualization_task",
]

def __init__(self):
"""Initialize crew with optional agent selection"""
Expand Down Expand Up @@ -307,13 +318,13 @@ def crew(self) -> Crew:
if agent_name in agent_methods
]

# Filter tasks based on selected agents
selected_tasks = {
# Build selected task names as a set for O(1) lookup
selected_task_names = {
self.AGENT_TASK_MAP[agent_name]
for agent_name in self.selected_agents
if agent_name in self.AGENT_TASK_MAP
}

# Get task method names
task_methods = {
'research_task': self.research_task,
Expand All @@ -325,11 +336,12 @@ def crew(self) -> Crew:
'editor_task': self.editor_task,
'data_visualization_task': self.data_visualization_task,
}


# Iterate TASK_ORDER to preserve deterministic sequential order
tasks_to_use = [
task_methods[task_name]()
for task_name in selected_tasks
if task_name in task_methods
for task_name in self.TASK_ORDER
if task_name in selected_task_names and task_name in task_methods
]

logger.info(
Expand Down
2 changes: 1 addition & 1 deletion src/d4bl/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# Import error handling utilities
from d4bl.services.error_handling import retry_with_backoff, safe_execute, ErrorRecoveryStrategy
from d4bl.settings import get_settings
from d4bl.tools import Crawl4AISearchTool, FirecrawlSearchWrapper
from d4bl.agents.tools import Crawl4AISearchTool, FirecrawlSearchWrapper
from d4bl.llm import get_ollama_llm, reset_ollama_llm
from d4bl.observability import get_langfuse_client

Expand Down
1 change: 1 addition & 0 deletions src/d4bl/evals/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""D4BL evaluation runner package."""
88 changes: 88 additions & 0 deletions src/d4bl/evals/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Run LLM evaluations against completed research jobs stored in PostgreSQL."""
from __future__ import annotations

import asyncio
import logging
from pathlib import Path
from typing import List, Optional
from uuid import UUID

from sqlalchemy import select

import d4bl.infra.database as _db
from d4bl.infra.database import ResearchJob, init_db
from d4bl.services.langfuse.runner import run_comprehensive_evaluation

logger = logging.getLogger(__name__)


async def run_evals_and_log(
max_rows: Optional[int] = None,
eval_types: Optional[List[str]] = None,
concurrency: int = 1,
interactive: bool = False,
selected_job_ids: Optional[List[UUID]] = None,
output_csv_path: Optional[Path] = None,
) -> None:
"""Run evaluations on completed research jobs and log results.

Args:
max_rows: Limit number of jobs to evaluate. Default: all.
eval_types: Which evaluator categories to run (unused; all run by default).
concurrency: Number of concurrent evaluation requests.
interactive: Unused; kept for CLI compatibility.
selected_job_ids: Restrict evaluation to specific job UUIDs.
output_csv_path: Unused; kept for CLI compatibility.
"""
init_db()

sem = asyncio.Semaphore(concurrency)

async def _evaluate_job(job: ResearchJob) -> None:
async with sem:
result_dict = job.to_dict()
research_output = str(result_dict.get("result") or "")
if not research_output:
logger.warning("Job %s has no result, skipping.", job.job_id)
return

logger.info(
"Running evaluations for job %s: %s...",
job.job_id,
job.query[:60],
)
# run_comprehensive_evaluation is synchronous — run it in a
# thread pool so it doesn't block the event loop.
await asyncio.to_thread(
run_comprehensive_evaluation,
query=job.query,
research_output=research_output,
sources=[],
trace_id=str(job.job_id),
)

async with _db.async_session_maker() as db:
query = select(ResearchJob).where(ResearchJob.status == "completed")

if selected_job_ids:
query = query.where(ResearchJob.job_id.in_(selected_job_ids))

if max_rows is not None:
query = query.limit(max_rows)

result = await db.execute(query)
jobs = result.scalars().all()

if not jobs:
logger.info("No completed research jobs found to evaluate.")
return

logger.info("Evaluating %d research job(s)...", len(jobs))

results = await asyncio.gather(
*[_evaluate_job(j) for j in jobs], return_exceptions=True
)
for job, outcome in zip(jobs, results, strict=True):
if isinstance(outcome, BaseException):
logger.error("Evaluation failed for job %s: %s", job.job_id, outcome)
logger.info("Evaluation run complete.")
17 changes: 12 additions & 5 deletions src/d4bl/infra/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,22 @@ def get_database_url() -> str:
db_name = os.getenv("POSTGRES_DB", "postgres")

# CRITICAL: In Docker, we MUST use 'postgres' as the hostname (Docker service name)
# If POSTGRES_HOST is not set or is 'localhost', we're likely in Docker and should use 'postgres'
# Check if we're in a Docker container by looking for common indicators
if db_host == "localhost" or db_host == "127.0.0.1":
# OR use 'host.docker.internal' to reach services on the host machine (like Supabase)
# Only override if host is localhost/127.0.0.1 AND we're running inside Docker
if db_host in ("localhost", "127.0.0.1"):
# Check if we're in Docker (common indicators)
if os.path.exists("/.dockerenv") or os.getenv("DOCKER_CONTAINER"):
original_host = db_host
db_host = "postgres"
print(f"⚠ Warning: Detected Docker environment, using 'postgres' as hostname instead of '{db_host}'")
print(
f"⚠ Warning: Detected Docker environment, "
f"using 'postgres' as hostname instead of '{original_host}'"
)
else:
print(f"⚠ Warning: Using 'localhost' as database host. In Docker, this should be 'postgres'")
print(
"⚠ Warning: Using 'localhost' as database host. "
"In Docker, this should be 'postgres' or 'host.docker.internal'"
)

# Ensure we're using the correct database name (not the username)
if not db_name or db_name == db_user:
Expand Down
41 changes: 41 additions & 0 deletions tests/test_crew_task_ordering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Tests that selected agent task ordering is deterministic and sequential."""
import sys

sys.path.insert(0, 'src')
from d4bl.agents.crew import D4Bl


def _get_ordered_task_names(selected_agents: list) -> list:
"""Return the task names that would run for the given selected agents, in order."""
selected_task_names = {
D4Bl.AGENT_TASK_MAP[name]
for name in selected_agents
if name in D4Bl.AGENT_TASK_MAP
}
return [t for t in D4Bl.TASK_ORDER if t in selected_task_names]


def test_researcher_before_analyst():
names = _get_ordered_task_names(["researcher", "data_analyst"])
assert "research_task" in names
assert "analysis_task" in names
assert names.index("research_task") < names.index("analysis_task")


def test_analyst_before_writer():
names = _get_ordered_task_names(["data_analyst", "writer"])
assert names.index("analysis_task") < names.index("writing_task")


def test_single_agent_returns_one_task():
names = _get_ordered_task_names(["researcher"])
assert names == ["research_task"]


def test_all_agents_full_canonical_order():
import sys
sys.path.insert(0, 'src')
from d4bl.agents.crew import D4Bl
all_agents = list(D4Bl.AGENT_TASK_MAP.keys())
names = _get_ordered_task_names(all_agents)
assert names == D4Bl.TASK_ORDER