Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 140 additions & 65 deletions camel/societies/workforce/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from camel.responses import ChatAgentResponse
from camel.utils.context_utils import ContextUtility, WorkflowSummary

from colorama import Fore

from camel.agents import ChatAgent
from camel.logger import get_logger
Expand Down Expand Up @@ -4373,11 +4372,18 @@ async def _handle_failed_task(self, task: Task) -> bool:
f"{task.failure_count}/{max_retries}): {detailed_error}"
)

print(
f"{Fore.RED}❌ Task {task.id} failed "
f"(attempt {task.failure_count}/{max_retries}): "
f"{failure_reason}{Fore.RESET}"
)
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"❌ Task {task.id} failed "
f"(attempt {task.failure_count}/{max_retries}): "
f"{failure_reason}"
),
level="error",
color="red",
)
)

task_failed_event = TaskFailedEvent(
task_id=task.id,
Expand Down Expand Up @@ -4813,7 +4819,14 @@ def dump_workforce_logs(self, file_path: str) -> None:
cb for cb in self._callbacks if isinstance(cb, WorkforceMetrics)
]
if len(metrics_cb) == 0:
print("Logger not initialized. Cannot dump logs.")
for cb in self._callbacks:
cb.log_message(
LogEvent(
message="Logger not initialized. Cannot dump logs.",
level="warning",
color="yellow",
)
)
return
metrics_cb[0].dump_to_json(file_path)
# Use logger.info or print, consistent with existing style
Expand Down Expand Up @@ -5108,28 +5121,42 @@ async def _listen_to_channel(self) -> None:

# Do not halt if we have main tasks in queue
if len(self.get_main_task_queue()) > 0:
print(
f"{Fore.RED}Task {returned_task.id} has "
f"failed for "
f"{self.failure_handling_config.max_retries}"
f" "
f"times after insufficient results, "
f"skipping that task. Final error: "
f"{returned_task.result or 'Unknown err'}"
f"{Fore.RESET}"
)
cfg = self.failure_handling_config
max_r = cfg.max_retries
err = returned_task.result or 'Unknown err'
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"Task {returned_task.id} has "
f"failed for {max_r} times "
f"after insufficient results, "
f"skipping that task. "
f"Final error: {err}"
),
level="error",
color="red",
)
)
self._skip_requested = True
continue

print(
f"{Fore.RED}Task {returned_task.id} has "
f"failed for "
f"{self.failure_handling_config.max_retries} "
f"times after insufficient results, halting "
f"the workforce. Final error: "
f"{returned_task.result or 'Unknown error'}"
f"{Fore.RESET}"
)
max_r = self.failure_handling_config.max_retries
err = returned_task.result or 'Unknown error'
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"Task {returned_task.id} has "
f"failed for {max_r} times "
f"after insufficient results, "
f"halting the workforce. "
f"Final error: {err}"
),
level="error",
color="red",
)
)
await self._graceful_shutdown(returned_task)
break
except Exception as e:
Expand All @@ -5147,11 +5174,19 @@ async def _listen_to_channel(self) -> None:
self.failure_handling_config.enabled_strategies
== []
):
print(
f"{Fore.CYAN}Task {returned_task.id} "
f"completed (quality check skipped - "
f"no recovery strategies enabled).{Fore.RESET}"
)
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"Task {returned_task.id} "
f"completed (quality check "
f"skipped - no recovery "
f"strategies)."
),
level="info",
color="cyan",
)
)
await self._handle_completed_task(returned_task)
continue

Expand All @@ -5176,31 +5211,48 @@ async def _listen_to_channel(self) -> None:
returned_task.failure_count
>= quality_retry_limit
):
print(
f"{Fore.YELLOW}Task {returned_task.id} "
f"completed with low quality score: "
f"{quality_eval.quality_score} "
f"(retry limit reached){Fore.RESET}"
)
score = quality_eval.quality_score
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"Task {returned_task.id} "
f"completed with low quality "
f"score: {score} "
f"(retry limit reached)"
),
level="warning",
color="yellow",
)
)
await self._handle_completed_task(
returned_task
)
continue

# Print visual feedback for quality-failed tasks
# Log visual feedback for quality-failed tasks
# with recovery strategy
recovery_action = (
quality_eval.recovery_strategy.value
if quality_eval.recovery_strategy
else ""
)
print(
f"{Fore.YELLOW}⚠️ Task {returned_task.id} "
f"failed quality check (score: "
f"{quality_eval.quality_score}). "
f"Issues: {', '.join(quality_eval.issues)}. "
f"Recovery: {recovery_action}{Fore.RESET}"
)
score = quality_eval.quality_score
issues = ', '.join(quality_eval.issues)
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"⚠️ Task {returned_task.id} "
f"failed quality check "
f"(score: {score}). "
f"Issues: {issues}. "
f"Recovery: {recovery_action}"
),
level="warning",
color="yellow",
)
)

# Mark as failed for recovery
returned_task.failure_count += 1
Expand Down Expand Up @@ -5252,11 +5304,19 @@ async def _listen_to_channel(self) -> None:
)
continue
else:
print(
f"{Fore.CYAN}Task {returned_task.id} "
f"completed successfully (quality score: "
f"{quality_eval.quality_score}).{Fore.RESET}"
)
score = quality_eval.quality_score
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"Task {returned_task.id} "
f"completed successfully "
f"(quality score: {score})."
),
level="info",
color="cyan",
)
)
await self._handle_completed_task(returned_task)
elif returned_task.state == TaskState.FAILED:
try:
Expand All @@ -5266,24 +5326,39 @@ async def _listen_to_channel(self) -> None:

# Do not halt if we have main tasks in queue
if len(self.get_main_task_queue()) > 0:
print(
f"{Fore.RED}Task {returned_task.id} has "
f"failed for "
f"{self.failure_handling_config.max_retries} "
f"times, skipping that task. Final error: "
f"{returned_task.result or 'Unknown error'}"
f"{Fore.RESET}"
)
max_r = self.failure_handling_config.max_retries
err = returned_task.result or 'Unknown error'
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"Task {returned_task.id} has "
f"failed for {max_r} times, "
f"skipping that task. "
f"Final error: {err}"
),
level="error",
color="red",
)
)
self._skip_requested = True
continue

print(
f"{Fore.RED}Task {returned_task.id} has failed "
f"for {self.failure_handling_config.max_retries} "
f"times, halting the workforce. Final error: "
f"{returned_task.result or 'Unknown error'}"
f"{Fore.RESET}"
)
max_r = self.failure_handling_config.max_retries
err = returned_task.result or 'Unknown error'
for cb in self._callbacks:
cb.log_message(
LogEvent(
message=(
f"Task {returned_task.id} has "
f"failed for {max_r} times, "
f"halting the workforce. "
f"Final error: {err}"
),
level="error",
color="red",
)
)
# Graceful shutdown instead of immediate break
await self._graceful_shutdown(returned_task)
break
Expand Down
1 change: 1 addition & 0 deletions camel/societies/workforce/workforce_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class WorkforceCallback(ABC):
"yellow": Fore.YELLOW,
"red": Fore.RED,
"green": Fore.GREEN,
"blue": Fore.BLUE,
"cyan": Fore.CYAN,
"magenta": Fore.MAGENTA,
"gray": Fore.LIGHTBLACK_EX,
Expand Down
7 changes: 7 additions & 0 deletions test/workforce/test_workforce_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from camel.models import ModelFactory
from camel.societies.workforce.events import (
AllTasksCompletedEvent,
LogEvent,
TaskAssignedEvent,
TaskCompletedEvent,
TaskCreatedEvent,
Expand All @@ -44,6 +45,9 @@ class _NonMetricsCallback(WorkforceCallback):
def __init__(self) -> None:
self.events: list[WorkforceEvent] = []

def log_message(self, event: LogEvent) -> None:
pass

# Task events
def log_task_created(self, event: TaskCreatedEvent) -> None:
self.events.append(event)
Expand Down Expand Up @@ -85,6 +89,9 @@ def __init__(self) -> None:
self.get_ascii_tree_called = False
self.get_kpis_called = False

def log_message(self, event: LogEvent) -> None:
pass

# WorkforceMetrics interface
def reset_task_data(self) -> None:
self.dump_to_json_called = False
Expand Down
Loading