Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 104 additions & 69 deletions camel/toolkits/terminal_toolkit/terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import sys
import threading
import time
from queue import Empty, Queue
from queue import Empty, Full, Queue
from typing import Any, Dict, List, Optional

from camel.logger import get_logger
Expand Down Expand Up @@ -118,6 +118,8 @@ def __init__(
# Thread-safe guard for concurrent access to
# shell_sessions and session state
self._session_lock = threading.RLock()
# Condition variable for efficient waiting on new output
self._output_condition = threading.Condition(self._session_lock)

# Initialize docker_workdir with proper type
self.docker_workdir: Optional[str] = None
Expand Down Expand Up @@ -207,7 +209,7 @@ def __init__(
)
except NotFound:
raise RuntimeError(
f"Container '{docker_container_name}' not found. "
f"Container '{docker_container_name}' not found."
)

# Ensure the working directory exists inside the container
Expand All @@ -224,10 +226,6 @@ def __init__(
f"[Docker] Failed to ensure workdir "
f"'{self.docker_workdir}': {e}"
)
except NotFound:
raise RuntimeError(
f"Docker container '{docker_container_name}' not found."
)
except APIError as e:
raise RuntimeError(f"Failed to connect to Docker daemon: {e}")

Expand Down Expand Up @@ -398,34 +396,13 @@ def update_callback(msg: str):
"using system Python"
)

def _adapt_command_for_environment(self, command: str) -> str:
r"""Adapt command to use virtual environment if available."""
# Only adapt for local backend
if self.use_docker_backend:
return command

# Check if we have any virtual environment (cloned or initial)
env_path = None
def _get_venv_path(self) -> Optional[str]:
r"""Get the virtual environment path if available."""
if self.cloned_env_path and os.path.exists(self.cloned_env_path):
env_path = self.cloned_env_path
return self.cloned_env_path
elif self.initial_env_path and os.path.exists(self.initial_env_path):
env_path = self.initial_env_path

if not env_path:
return command

# Check if command starts with python or pip
command_lower = command.strip().lower()
if command_lower.startswith('python'):
# Replace 'python' with the virtual environment python
return command.replace('python', f'"{self.python_executable}"', 1)
elif command_lower.startswith('pip'):
# Replace 'pip' with python -m pip from virtual environment
return command.replace(
'pip', f'"{self.python_executable}" -m pip', 1
)

return command
return self.initial_env_path
return None

def _write_to_log(self, log_file: str, content: str) -> None:
r"""Write content to log file with optional ANSI stripping.
Expand Down Expand Up @@ -455,15 +432,33 @@ def _start_output_reader_thread(self, session_id: str):
session = self.shell_sessions[session_id]

def reader():
def safe_put(data: str) -> None:
"""Put data to queue, dropping if full to prevent blocking."""
try:
session["output_stream"].put_nowait(data)
except Full:
# Queue is full, log warning and continue
# Data is still written to log file, so not lost
logger.warning(
f"[SESSION {session_id}] Output queue full, "
f"dropping data (still logged to file)"
)
return
# Notify waiters that new output is available
# Done outside try-except to avoid catching unrelated
# exceptions
with self._output_condition:
self._output_condition.notify_all()

try:
if session["backend"] == "local":
# For local processes, read line by line from stdout
try:
for line in iter(
session["process"].stdout.readline, ''
):
session["output_stream"].put(line)
self._write_to_log(session["log_file"], line)
safe_put(line)
finally:
session["process"].stdout.close()
elif session["backend"] == "docker":
Expand All @@ -485,10 +480,10 @@ def reader():
decoded_data = data.decode(
'utf-8', errors='ignore'
)
session["output_stream"].put(decoded_data)
self._write_to_log(
session["log_file"], decoded_data
)
safe_put(decoded_data)
# Check if the process is still running
if not self.docker_api_client.exec_inspect(
session["exec_id"]
Expand All @@ -508,9 +503,11 @@ def reader():
)
finally:
try:
with self._session_lock:
with self._output_condition:
if session_id in self.shell_sessions:
self.shell_sessions[session_id]["running"] = False
# Notify waiters that session has terminated
self._output_condition.notify_all()
except Exception:
pass

Expand All @@ -521,7 +518,6 @@ def _collect_output_until_idle(
self,
id: str,
idle_duration: float = 0.5,
check_interval: float = 0.1,
max_wait: float = 5.0,
) -> str:
r"""Collects output from a session until it's idle or a max wait time
Expand All @@ -531,8 +527,6 @@ def _collect_output_until_idle(
id (str): The session ID.
idle_duration (float): How long the stream must be empty to be
considered idle.(default: 0.5)
check_interval (float): The time to sleep between checks.
(default: 0.1)
max_wait (float): The maximum total time to wait for the process
to go idle. (default: 5.0)

Expand All @@ -544,11 +538,15 @@ def _collect_output_until_idle(
if id not in self.shell_sessions:
return f"Error: No session found with ID '{id}'."

output_parts = []
idle_time = 0.0
start_time = time.time()
output_parts: List[str] = []
last_output_time = time.time()
start_time = last_output_time

while True:
elapsed = time.time() - start_time
if elapsed >= max_wait:
break

while time.time() - start_time < max_wait:
new_output = self.shell_view(id)

# Check for terminal state messages from shell_view
Expand All @@ -565,20 +563,34 @@ def _collect_output_until_idle(
if new_output.startswith("Error: No session found"):
return new_output

if new_output:
# Check if this is actual output or just the idle message
if new_output and not new_output.startswith("[No new output]"):
output_parts.append(new_output)
idle_time = 0.0 # Reset idle timer
last_output_time = time.time() # Reset idle timer
else:
idle_time += check_interval
# No new output, check if we've been idle long enough
idle_time = time.time() - last_output_time
if idle_time >= idle_duration:
# Process is idle, success
return "".join(output_parts)
time.sleep(check_interval)

# Calculate remaining time for idle and max_wait
time_until_idle = idle_duration - (time.time() - last_output_time)
time_until_max = max_wait - (time.time() - start_time)
# Wait for the shorter of: idle timeout, max timeout, or a
# reasonable check interval
wait_time = max(0.0, min(time_until_idle, time_until_max))

if wait_time > 0:
# Use condition variable to wait efficiently
# Wake up when new output arrives or timeout expires
with self._output_condition:
self._output_condition.wait(timeout=wait_time)

# If we exit the loop, it means max_wait was reached.
# Check one last time for any final output.
final_output = self.shell_view(id)
if final_output:
if final_output and not final_output.startswith("[No new output]"):
output_parts.append(final_output)

warning_message = (
Expand Down Expand Up @@ -620,11 +632,21 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:

if self.use_docker_backend:
# For Docker, we always run commands in a shell
# to support complex commands
command = f'bash -c "{command}"'
# to support complex commands.
# Use shlex.quote to properly escape the command string.
command = f'bash -c {shlex.quote(command)}'
else:
# For local execution, check if we need to use cloned environment
command = self._adapt_command_for_environment(command)
# For local execution, activate virtual environment if available
env_path = self._get_venv_path()
if env_path:
if self.os_type == 'Windows':
activate = os.path.join(
env_path, "Scripts", "activate.bat"
)
command = f'call "{activate}" && {command}'
else:
activate = os.path.join(env_path, "bin", "activate")
command = f'. "{activate}" && {command}'

session_id = id

Expand Down Expand Up @@ -697,12 +719,6 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
self.log_dir, f"session_{session_id}.log"
)

self._write_to_log(
session_log_file,
f"--- Starting non-blocking session at {time.ctime()} ---\n"
f"> {command}\n",
)

# PYTHONUNBUFFERED=1 for real-time output
# Without this, Python subprocesses buffer output (4KB buffer)
# and shell_view() won't see output until buffer fills or process
Expand All @@ -711,11 +727,24 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
env_vars["PYTHONUNBUFFERED"] = "1"
docker_env = {"PYTHONUNBUFFERED": "1"}

# Check and create session atomically to prevent race condition
with self._session_lock:
if session_id in self.shell_sessions:
existing_session = self.shell_sessions[session_id]
if existing_session.get("running", False):
return (
f"Error: Session '{session_id}' already exists "
f"and is running. Use a different ID or kill "
f"the existing session first."
)

# Create session entry while holding the lock
self.shell_sessions[session_id] = {
"id": session_id,
"process": None,
"output_stream": Queue(),
# Limit queue size to prevent memory exhaustion
# (~100MB with 10k items of ~10KB each)
"output_stream": Queue(maxsize=10000),
"command_history": [command],
"running": True,
"log_file": session_log_file,
Expand All @@ -724,6 +753,12 @@ def shell_exec(self, id: str, command: str, block: bool = True) -> str:
else "local",
}

self._write_to_log(
session_log_file,
f"--- Starting non-blocking session at {time.ctime()} ---\n"
f"> {command}\n",
)

process = None
exec_socket = None
try:
Expand Down Expand Up @@ -969,9 +1004,11 @@ def shell_ask_user_for_help(self, id: str, prompt: str) -> str:
str: The output from the shell session after the user's command has
been executed, or help information for general queries.
"""
logger.info("\n" + "=" * 60)
logger.info("LLM Agent needs your help!")
logger.info(f"PROMPT: {prompt}")
# Use print for user-facing messages since this is an interactive
# function that requires terminal input via input()
print("\n" + "=" * 60)
print("LLM Agent needs your help!")
print(f"PROMPT: {prompt}")

# Case 1: Session doesn't exist - offer to create one
if id not in self.shell_sessions:
Expand All @@ -980,9 +1017,7 @@ def shell_ask_user_for_help(self, id: str, prompt: str) -> str:
if not user_input:
return "No user response."
else:
logger.info(
f"Creating session '{id}' and executing command..."
)
print(f"Creating session '{id}' and executing command...")
result = self.shell_exec(id, user_input, block=True)
return (
f"Session '{id}' created and "
Expand All @@ -999,11 +1034,11 @@ def shell_ask_user_for_help(self, id: str, prompt: str) -> str:
last_output.strip() if last_output.strip() else "(no output)"
)

logger.info(f"SESSION: '{id}' (active)")
logger.info("=" * 60)
logger.info("--- LAST OUTPUT ---")
logger.info(last_output_display)
logger.info("-------------------")
print(f"SESSION: '{id}' (active)")
print("=" * 60)
print("--- LAST OUTPUT ---")
print(last_output_display)
print("-------------------")

try:
user_input = input("Your input: ").strip()
Expand Down
Loading
Loading