refactor: Simplify services/ module — dedup, enums, parallel evals#32
Conversation
Phase 3-5 of issue #19 code simplification: - Extract duplicated _keyword_relevance to parsers.keyword_relevance() - Remove evals.py compatibility shim (update import in research_runner) - Centralize Docker detection via Settings.is_docker - Add EvalStatus(str, Enum) replacing stringly-typed status values - Parallelize evaluations with ThreadPoolExecutor (finding 3.1) - Extract notify_progress() helper deduplicating 3 identical blocks - Verify JSON parse dedup is complete (finding 1.4 — no action needed) Intentionally deferred: 1.2 (retry reuse — different semantics), 2.5 (param sprawl — single caller), 3.4 (parallel URL evals — marginal) 17/20 findings now addressed for services/ module.
📝 WalkthroughWalkthroughCentralizes Docker detection in Settings.is_docker, introduces an EvalStatus enum and replaces string statuses across langfuse evaluators, adds a keyword_relevance parser, parallelizes evaluations in the runner with ThreadPoolExecutor, removes a redundant evals shim, and centralizes progress notifications in research_runner. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Runner as runner.py
participant Executor as ThreadPoolExecutor
participant Eval as Evaluator
participant Langfuse as LangfuseClient
Client->>Runner: run_comprehensive_evaluation(...)
Runner->>Runner: build eval_specs (name, func, kwargs)
Runner->>Executor: submit(_run_eval) for each spec
par Parallel evaluations
Executor->>Eval: execute evaluator function
Eval->>Langfuse: call LLM / Langfuse client
Langfuse-->>Eval: return evaluation result
Eval-->>Executor: return result dict (with EvalStatus)
end
Executor->>Runner: collect results / handle exceptions / timeouts
Runner->>Runner: aggregate EvalStatus -> overall status
Runner-->>Client: return aggregated results with EvalStatus
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/d4bl/services/langfuse/_base.py`:
- Around line 12-17: Add a concise docstring to the public EvalStatus enum
describing its purpose and the meaning of each member (SUCCESS, FAILED, SKIPPED,
PARTIAL_SUCCESS) so users of the API understand what the status values
represent; modify the EvalStatus class definition to include this explanatory
string literal immediately under the class declaration.
In `@src/d4bl/services/langfuse/bias.py`:
- Around line 17-20: The return statements for validation of research_output and
query are too long; refactor the long dict literal returned when research_output
or query is empty (in the validation block referencing variables
research_output, query and the EvalStatus enum) so it fits under 100 chars per
line — e.g., build the response dict in multiple lines or assign it to a local
variable (e.g., response = {...} with each key on its own line) and then return
that variable for both checks to keep lines short and avoid exceeding the
100-character limit.
In `@src/d4bl/services/langfuse/client.py`:
- Around line 39-40: Move the inline import of get_settings out of the function
and place it at module level: remove the local "from d4bl.settings import
get_settings" inside the code that checks "if get_settings().is_docker and
'localhost' in langfuse_host" and instead import get_settings once at the top of
src/d4bl/services/langfuse/client.py; update any references to get_settings() in
that module to use the module-level import so the check using
get_settings().is_docker and langfuse_host remains unchanged but avoids repeated
inline imports.
In `@src/d4bl/services/langfuse/hallucination.py`:
- Around line 24-29: The three validation return statements for query, answer,
and context in hallucination.py exceed the 100-char line limit; split each
return dict across multiple lines so keys and values are on their own lines
(e.g., break the return {"error": ..., "status": EvalStatus.FAILED,
"error_type": "validation"} into a multi-line dict) for the checks that use if
not query or not query.strip(), if not answer or not answer.strip(), and if not
context or not context.strip(), keeping EvalStatus.FAILED and "error_type":
"validation" intact.
In `@src/d4bl/services/langfuse/parsers.py`:
- Around line 8-15: Update keyword_relevance: replace the EN DASH in the
docstring with a normal hyphen-minus and move the empty-query check before
computing matches to avoid confusing flow and potential division by zero;
specifically, in function keyword_relevance ensure you return a default score
(e.g., 3.0) if query_words is empty before computing matches, then compute
matches and the final scaled score using query_words and text_lower as currently
done.
In `@src/d4bl/services/langfuse/reference.py`:
- Around line 25-30: The validation return statements for the
query/answer/context checks exceed the 100-character line limit; refactor each
conditional (the checks that use EvalStatus and return the error dict) to break
the returned dict across multiple lines or assign the error message and the dict
to short-named variables before returning so no single line exceeds 100 chars
(e.g., split the dict keys/value pairs onto separate lines or use a small helper
like _validation_error(status, error_type, message) and call it from the three
places). Ensure you still return the same keys ("error", "status", "error_type")
and use EvalStatus.FAILED.
In `@src/d4bl/services/langfuse/runner.py`:
- Around line 152-160: The parallel evaluation loop currently waits indefinitely
for futures; wrap as_completed or future.result with a timeout and handle
TimeoutError: use as_completed(futures, timeout=...) or call
future.result(timeout=...) inside the loop, catch
concurrent.futures.TimeoutError, then cancel outstanding futures (call
future.cancel() for any unfinished futures in the futures dict), record a
timeout failure into results["evaluations"][name] (or a generic timeout entry)
and log the timeout; update the block that creates the ThreadPoolExecutor,
futures, and the loop over as_completed so that _run_eval calls are bounded and
hung evaluators don't block the whole job.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (16)
src/d4bl/observability/langfuse.pysrc/d4bl/services/langfuse/__init__.pysrc/d4bl/services/langfuse/_base.pysrc/d4bl/services/langfuse/bias.pysrc/d4bl/services/langfuse/client.pysrc/d4bl/services/langfuse/content_relevance.pysrc/d4bl/services/langfuse/evals.pysrc/d4bl/services/langfuse/hallucination.pysrc/d4bl/services/langfuse/parsers.pysrc/d4bl/services/langfuse/quality.pysrc/d4bl/services/langfuse/reference.pysrc/d4bl/services/langfuse/report_relevance.pysrc/d4bl/services/langfuse/runner.pysrc/d4bl/services/langfuse/source_relevance.pysrc/d4bl/services/research_runner.pysrc/d4bl/settings.py
💤 Files with no reviewable changes (1)
- src/d4bl/services/langfuse/evals.py
- Add docstring to EvalStatus enum - Fix EN DASH in keyword_relevance docstring, reorder early return - Break long validation return lines in bias, hallucination, reference - Move get_settings import to module level in client.py - Add 120s timeout guard for parallel evaluation batch
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/d4bl/services/langfuse/parsers.py`:
- Around line 10-15: The scoring divides by len(query_words) while matches only
count tokens with len > 3; fix by creating a filtered token set (e.g.,
filtered_query_words = [w for w in query_words if len(w) > 3]) and use that for
both matching and as the denominator: compute matches over filtered_query_words
against text_lower, and return max(1.0, min(5.0, (matches /
len(filtered_query_words)) * 5)). If filtered_query_words is empty (query had
only short tokens), preserve current fallback behavior (return 3.0) to avoid
division-by-zero and keep neutral score.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (7)
src/d4bl/services/langfuse/_base.pysrc/d4bl/services/langfuse/bias.pysrc/d4bl/services/langfuse/client.pysrc/d4bl/services/langfuse/hallucination.pysrc/d4bl/services/langfuse/parsers.pysrc/d4bl/services/langfuse/reference.pysrc/d4bl/services/langfuse/runner.py
- Use significant_words set for both matching and denominator in keyword_relevance() to avoid score inflation from short words - Replace ThreadPoolExecutor context manager with explicit shutdown(wait=False, cancel_futures=True) to prevent blocking when futures are still running after timeout
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/d4bl/services/langfuse/runner.py (1)
152-179:⚠️ Potential issue | 🟠 MajorAvoid flushing Langfuse immediately after timed-out non-blocking shutdown.
When timeout is hit (Line 162),
executor.shutdown(wait=False, cancel_futures=True)returns immediately without waiting for running worker threads to complete. Those workers may still executelangfuse.score_current_trace()while Line 179 callsflush(), creating a race condition that can lead to incomplete or corrupted telemetry data.🔧 Proposed fix
- try: + timed_out = False + try: for future in as_completed(futures, timeout=eval_timeout_s): name, result = future.result() results["evaluations"][name] = result except TimeoutError: + timed_out = True logger.error( "Evaluation batch timed out after %ss", eval_timeout_s, ) @@ finally: executor.shutdown(wait=False, cancel_futures=True) # --- Flush Langfuse once at the end (finding 3.6) --- - if langfuse: + if langfuse and not timed_out: try: langfuse.flush() except Exception as flush_err: logger.warning("Langfuse flush failed: %s", flush_err) + elif timed_out: + eval_logger.warning( + "Skipping immediate Langfuse flush after timeout; worker threads may still be running." + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/d4bl/services/langfuse/runner.py` around lines 152 - 179, The executor is shut down with wait=False and cancel_futures=True which returns immediately while worker threads may still call langfuse.score_current_trace(), causing a race when langfuse.flush() runs; in the finally block change shutdown to wait for running tasks to finish (e.g., executor.shutdown(wait=True, cancel_futures=False) or call concurrent.futures.wait(futures.keys()) before calling langfuse.flush()) so that _run_eval workers (and any calls to langfuse.score_current_trace()) complete before invoking langfuse.flush().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/d4bl/services/langfuse/runner.py`:
- Around line 152-179: The executor is shut down with wait=False and
cancel_futures=True which returns immediately while worker threads may still
call langfuse.score_current_trace(), causing a race when langfuse.flush() runs;
in the finally block change shutdown to wait for running tasks to finish (e.g.,
executor.shutdown(wait=True, cancel_futures=False) or call
concurrent.futures.wait(futures.keys()) before calling langfuse.flush()) so that
_run_eval workers (and any calls to langfuse.score_current_trace()) complete
before invoking langfuse.flush().
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
src/d4bl/services/langfuse/parsers.pysrc/d4bl/services/langfuse/runner.py
Summary
Continues issue #19 (services/ module simplification) with Phases 3-5, completing 8 more findings (17/20 total).
Phase 3 — Safe/Quick wins:
_keyword_relevancetoparsers.keyword_relevance()(was in 3 files)evals.pycompatibility shim, update import inresearch_runner.pySettings.is_docker(replacesos.path.exists("/.dockerenv")inclient.py+observability/langfuse.py)Phase 4 — Low risk:
EvalStatus(str, Enum)replacing stringly-typed status values across all evaluators and runnernotify_progress()helper deduplicating 3 identical set_status + send_websocket_update blocks (~30 LOC saved)Phase 5 — Parallel evaluations:
ThreadPoolExecutor— all evaluations (including optional content/report relevance) run concurrentlyIntentionally deferred (3 findings):
call_llm_textuses linear delay vsretry_with_backoff's exponential; different semantics, low ROIupdate_job_status— single caller, params mirror DB columnscontent_relevance.py— marginal gain within single evaluatorTest plan
EvalStatusextendsstrso existing string comparisons remain backward-compatibleCloses #19
Summary by CodeRabbit
New Features
Improvements
Changes