Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit 1cbee55

Browse files
FIM related fixes for alerts and DB (#1147)
There are a couple of fixes for this PR: 1. Return conversations with empty answers in the special case of FIM. Sometimes FIM doesn't give us an answer 2. Use the function to deduplicate alerts for all type of alerts 3. Fix a SQL query in which there were some messages being filtered out 4. Record FIM interactions in DB. They were being skipped.
1 parent 02d21c0 commit 1cbee55

File tree

3 files changed

+29
-10
lines changed

3 files changed

+29
-10
lines changed

src/codegate/api/v1_processing.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
PartialQuestionAnswer,
1717
PartialQuestions,
1818
QuestionAnswer,
19+
QuestionType,
1920
TokenUsageAggregate,
2021
TokenUsageByModel,
2122
)
@@ -384,8 +385,13 @@ async def match_conversations(
384385
selected_partial_qa = partial_qa
385386
break
386387

387-
# check if we have a question and answer, otherwise do not add it
388-
if selected_partial_qa and selected_partial_qa.answer is not None:
388+
# check if we have a question and answer, otherwise do not add it
389+
# if the question is a FIM question, we should add it even if there is no answer
390+
# not add Chat questions without answers
391+
if selected_partial_qa and (
392+
selected_partial_qa.answer is not None
393+
or selected_partial_qa.partial_questions.type == QuestionType.fim
394+
):
389395
# if we don't have a first question, set it. We will use it
390396
# to set the conversation timestamp and provider
391397
first_partial_qa = first_partial_qa or selected_partial_qa
@@ -396,7 +402,7 @@ async def match_conversations(
396402
alerts.extend(deduped_alerts)
397403
token_usage_agg.add_model_token_usage(selected_partial_qa.model_token_usage)
398404

399-
# only add conversation if we have some answers
405+
# if we have a conversation with at least one question and answer
400406
if len(questions_answers) > 0 and first_partial_qa is not None:
401407
if token_usage_agg.token_usage.input_tokens == 0:
402408
token_usage_agg = None
@@ -435,7 +441,6 @@ async def parse_messages_in_conversations(
435441
Get all the messages from the database and return them as a list of conversations.
436442
"""
437443
partial_question_answers = await _process_prompt_output_to_partial_qa(prompts_outputs)
438-
439444
conversations, map_q_id_to_conversation = await match_conversations(partial_question_answers)
440445
return conversations, map_q_id_to_conversation
441446

@@ -510,9 +515,6 @@ async def remove_duplicate_alerts(alerts: List[v1_models.Alert]) -> List[v1_mode
510515
for alert in sorted(
511516
alerts, key=lambda x: x.timestamp, reverse=True
512517
): # Sort alerts by timestamp descending
513-
if alert.trigger_type != "codegate-secrets":
514-
unique_alerts.append(alert)
515-
continue
516518

517519
# Extract trigger string content until "Context"
518520
trigger_string_content = alert.trigger_string.split("Context")[0]

src/codegate/db/connection.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id(
610610
LEFT JOIN outputs o ON p.id = o.prompt_id
611611
LEFT JOIN alerts a ON p.id = a.prompt_id
612612
WHERE p.workspace_id = :workspace_id
613-
AND a.trigger_category LIKE :trigger_category
613+
AND (a.trigger_category = :trigger_category OR a.trigger_category is NULL)
614614
ORDER BY o.timestamp DESC, a.timestamp DESC
615615
""" # noqa: E501
616616
)
@@ -622,7 +622,6 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id(
622622
IntermediatePromptWithOutputUsageAlerts, sql, conditions, should_raise=True
623623
)
624624
)
625-
626625
prompts_dict: Dict[str, GetPromptWithOutputsRow] = {}
627626
for row in rows:
628627
prompt_id = row.prompt_id

src/codegate/providers/base.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,24 @@ def _get_base_url(self) -> str:
9696
config = Config.get_config()
9797
return config.provider_urls.get(self.provider_route_name) if config else ""
9898

99+
async def process_stream_no_pipeline(
100+
self, stream: AsyncIterator[ModelResponse], context: PipelineContext
101+
) -> AsyncIterator[ModelResponse]:
102+
"""
103+
Process a stream when there is no pipeline.
104+
This is needed to record the output stream chunks for FIM.
105+
"""
106+
try:
107+
async for chunk in stream:
108+
context.add_output(chunk)
109+
yield chunk
110+
except Exception as e:
111+
# Log exception and stop processing
112+
logger.error(f"Error processing stream: {e}")
113+
raise e
114+
finally:
115+
await self._db_recorder.record_context(context)
116+
99117
async def _run_output_stream_pipeline(
100118
self,
101119
input_context: PipelineContext,
@@ -121,7 +139,7 @@ async def _run_output_stream_pipeline(
121139
and self.provider_route_name != "anthropic"
122140
):
123141
logger.info("No output pipeline steps configured, passing through")
124-
return model_stream
142+
return self.process_stream_no_pipeline(model_stream, input_context)
125143

126144
normalized_stream = self._output_normalizer.normalize_streaming(model_stream)
127145

0 commit comments

Comments
 (0)