Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 195 additions & 8 deletions camel/toolkits/terminal_toolkit/terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import sys
import threading
import time
import uuid
from queue import Empty, Queue
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from camel.logger import get_logger
from camel.toolkits.base import BaseToolkit
from camel.toolkits.base import BaseToolkit, RegisteredAgentToolkit
from camel.toolkits.function_tool import FunctionTool

if TYPE_CHECKING:
from camel.agents import ChatAgent
from camel.toolkits.terminal_toolkit.utils import (
check_nodejs_availability,
clone_current_environment,
Expand Down Expand Up @@ -58,7 +62,7 @@ def _to_plain(text: str) -> str:


@MCPServer()
class TerminalToolkit(BaseToolkit):
class TerminalToolkit(BaseToolkit, RegisteredAgentToolkit):
r"""A toolkit for LLM agents to execute and interact with terminal commands
in either a local or a sandboxed Docker environment.

Expand Down Expand Up @@ -100,8 +104,11 @@ def __init__(
clone_current_env: bool = False,
install_dependencies: Optional[List[str]] = None,
):
# Initialize parent classes
BaseToolkit.__init__(self, timeout=timeout)
RegisteredAgentToolkit.__init__(self)

self.use_docker_backend = use_docker_backend
self.timeout = timeout
self.shell_sessions: Dict[str, Dict[str, Any]] = {}
# Thread-safe guard for concurrent access to
# shell_sessions and session state
Expand Down Expand Up @@ -236,6 +243,83 @@ def __init__(
if self.install_dependencies:
self._install_dependencies()

def register_agent(self, agent: "ChatAgent") -> None:
r"""Register a ChatAgent with this toolkit and update
shell_exec_use_toolkit_via_codetool schemas.

Args:
agent (ChatAgent): The ChatAgent instance to register.
"""
# Call parent's register_agent
super().register_agent(agent)

# Generate import guide from agent's tools
import_guide = self._generate_import_guide_from_agent(agent)

if import_guide:
# Find and update shell_exec_use_toolkit_via_code's schema
tool_name = "shell_exec_use_toolkit_via_code"
if tool_name in agent._internal_tools:
tool = agent._internal_tools[tool_name]
schema = tool.get_openai_tool_schema()

# Append import guide to description
original_desc = schema["function"].get("description", "")
schema["function"]["description"] = (
original_desc + import_guide
)

# Update the schema
tool.set_openai_tool_schema(schema)
logger.info(
f"Updated {tool_name} schema with toolkit import guide"
)

def _generate_import_guide_from_agent(self, agent: "ChatAgent") -> str:
r"""Generate toolkit import guide and method mapping from agent's
registered tools.

Args:
agent (ChatAgent): The ChatAgent instance containing the tools.

Returns:
str: A formatted string containing import statements for all
detected toolkits, or empty string if no toolkits found.
"""
# Collect toolkit info: {class_name: (module_name, [tool_names])}
toolkit_info: Dict[str, tuple] = {}

for tool_name, tool in agent._internal_tools.items():
# Get the underlying function
func = tool.func

# Check if it's a bound method (has __self__)
if hasattr(func, "__self__"):
toolkit_instance = func.__self__
toolkit_class = type(toolkit_instance)
class_name = toolkit_class.__name__
module_name = toolkit_class.__module__

# Add or update toolkit info
if class_name not in toolkit_info:
toolkit_info[class_name] = (module_name, [])
toolkit_info[class_name][1].append(tool_name)

if not toolkit_info:
return ""

# Generate import guide with method mapping
lines = ["\n\n=== Available Toolkit Imports ==="]
lines.append("Use these import statements and methods in your code:\n")

for class_name, (module_name, tool_names) in sorted(
toolkit_info.items()
):
lines.append(f"from {module_name} import {class_name}")
lines.append(f" # Methods: {', '.join(sorted(tool_names))}")

return "\n".join(lines)

def _setup_cloned_environment(self):
r"""Set up a cloned Python environment."""
self.cloned_env_path = os.path.join(self.working_dir, ".venv")
Expand Down Expand Up @@ -998,6 +1082,98 @@ def shell_ask_user_for_help(self, id: str, prompt: str) -> str:
except EOFError:
return f"User input interrupted for session '{id}'."

def shell_exec_use_toolkit_via_code(
self, id: str, code: str, block: bool = True
) -> str:
r"""Executes Python code that uses camel.toolkits to perform tasks.

This method enables executing Python code that leverages the camel
toolkit ecosystem to accomplish complex tasks while reducing token
consumption compared to multiple individual tool calls.

The code should follow this pattern:
1. Import required toolkit(s) from camel.toolkits
2. Initialize toolkit instance(s) with necessary parameters
3. Call appropriate methods and print/return results
4. Handle results and chain multiple toolkits if needed
5. F-string expressions ({...}) CANNOT contain backslashes (\)
wrong: f"{text.replace('\n', '<br>')}" correct:
text_br = text.replace('\n', '<br>') f"{text_br}"

Example Code:
```python
from camel.toolkits import ArxivToolkit, FileToolkit

# Initialize toolkits
arxiv_toolkit = ArxivToolkit()
file_toolkit = FileToolkit()

# Search for papers
papers = arxiv_toolkit.search_papers(
query="large language models",
max_results=5
)

# Save results to file
file_toolkit.write_file(
file_path="research_results.txt",
content=str(papers)
)
```

Args:
id (str): A unique identifier for the code execution session.
This ID is used to track and manage the execution.
code (str): The Python code to execute. The code should import
and use camel.toolkits to perform the desired tasks.
block (bool, optional): Determines the execution mode.
Defaults to True.
If True (blocking mode), waits for execution to complete.
If False (non-blocking mode), starts execution in background.

Returns:
str: The output of the code execution, which varies by mode.
In blocking mode, returns the complete stdout/stderr output.
In non-blocking mode, returns a session confirmation message.
Use shell_view(id) to check output of non-blocking execution.
"""

# Create a unique temporary file for the code
temp_filename = f"_camel_toolkit_code_{uuid.uuid4().hex[:8]}.py"
temp_filepath = os.path.join(self.working_dir, temp_filename)

try:
# Write code to temporary file
with open(temp_filepath, 'w', encoding='utf-8') as f:
f.write(code)

# Build command to execute the file with current Python interpreter
python_cmd = f'"{sys.executable}" "{temp_filepath}"'

# Execute the code
result = self.shell_exec(id=id, command=python_cmd, block=block)

# Non-blocking mode: record temp file path in session for cleanup
if not block:
with self._session_lock:
if id in self.shell_sessions:
self.shell_sessions[id]["temp_file"] = temp_filepath

return result

except Exception as e:
error_msg = f"Error executing code: {e}"
logger.error(error_msg)
return error_msg

finally:
# Clean up temporary file (only for blocking mode)
if block and os.path.exists(temp_filepath):
try:
os.remove(temp_filepath)
except Exception:
pass

def __enter__(self):
r"""Context manager entry."""
return self
Expand All @@ -1008,14 +1184,17 @@ def __exit__(self, exc_type, exc_val, exc_tb):
return False

def cleanup(self):
r"""Clean up all active sessions."""
r"""Clean up all active sessions and temporary files."""
with self._session_lock:
session_ids = list(self.shell_sessions.keys())

for session_id in session_ids:
with self._session_lock:
is_running = self.shell_sessions.get(session_id, {}).get(
"running", False
)
session = self.shell_sessions.get(session_id, {})
is_running = session.get("running", False)
temp_file = session.get("temp_file")

# Terminate running processes
if is_running:
try:
self.shell_kill_process(session_id)
Expand All @@ -1025,6 +1204,13 @@ def cleanup(self):
f"during cleanup: {e}"
)

# Clean up temporary file associated with this session
if temp_file and os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception:
pass

cleanup._manual_timeout = True # type: ignore[attr-defined]

def __del__(self):
Expand All @@ -1050,4 +1236,5 @@ def get_tools(self) -> List[FunctionTool]:
FunctionTool(self.shell_write_to_process),
FunctionTool(self.shell_kill_process),
FunctionTool(self.shell_ask_user_for_help),
FunctionTool(self.shell_exec_use_toolkit_via_code),
]
79 changes: 79 additions & 0 deletions examples/toolkits/terminal_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from camel.configs import ChatGPTConfig
from camel.models import ModelFactory
from camel.toolkits import TerminalToolkit
from camel.toolkits.resend_toolkit import ResendToolkit
from camel.toolkits.search_toolkit import SearchToolkit
from camel.types import ModelPlatformType, ModelType

# Get current script directory
Expand Down Expand Up @@ -342,3 +344,80 @@
images=None)]
===============================================================================
"""


# Create toolkit instances
terminal_toolkit = TerminalToolkit(
working_directory=workspace_dir, safe_mode=True
)
search_toolkit = SearchToolkit()
resend_toolkit = ResendToolkit()

# Combine all tools
tools = (
terminal_toolkit.get_tools()
+ search_toolkit.get_tools()
+ resend_toolkit.get_tools()
)

model_config_dict = ChatGPTConfig(
temperature=0.0,
).as_dict()

model = ModelFactory.create(
model_platform=ModelPlatformType.DEFAULT,
model_type=ModelType.DEFAULT,
model_config_dict=model_config_dict,
)

sys_msg = "You are a helpful assistant."

# Pass terminal_toolkit to toolkits_to_register_agent to enable
# automatic toolkit import guidance for shell_exec_use_toolkit_via_code
agent = ChatAgent(
system_message=sys_msg,
model=model,
tools=tools,
toolkits_to_register_agent=[terminal_toolkit],
)
agent.reset()

usr_msg = """
You should use shell_exec_use_toolkit_via_code to write code to search
wikipedia for 'Artificial Intelligence' and send email to
[email protected] with the search result attached."""
response = agent.step(usr_msg)
print(str(response.info['tool_calls']))

'''
=========================================================================
[ToolCallingRecord(tool_name='shell_exec_use_toolkit_via_code', args=
{'block': True, 'code': '\nfrom camel.toolkits import SearchToolkit\n
from camel.toolkits import ResendToolkit\n\n# Initialize the toolkits
\nsearch_toolkit = SearchToolkit()\nresend_toolkit = ResendToolkit()\n
\n# Search Wikipedia for \'Artificial Intelligence\'\nprint("Searching
Wikipedia for \'Artificial Intelligence\'...")\nwiki_result =
search_toolkit.search_wiki(entity="Artificial Intelligence")\n
print(f"Search completed. Result length: {len(str(wiki_result))}
characters")\n\n# Prepare email content\nemail_subject = "Wikipedia
Search Result: Artificial Intelligence"\nemail_html = f"""\n<html>\n
<body>\n<h2>Wikipedia Search Result for \'Artificial Intelligence\'
</h2>\n<hr>\n<div style="font-family: Arial, sans-serif; line-height:
1.6;">\n{wiki_result}\n</div>\n<hr>\n<p><i>This email was automatically
generated with Wikipedia search results.</i></p>\n</body>\n</html>
\n"""\n\nemail_text = f"""\nWikipedia Search Result for \'Artificial
Intelligence\'\n\n{wiki_result}\n\n---\nThis email was automatically
generated with Wikipedia search results.\n"""\n\n# Send the email\nprint
("Sending email to [email protected]...")\nresult = resend_toolkit.
send_email(\n to=["[email protected]"],\n subject=email_subject,
\n from_email="[email protected]",\n html=email_html,\n
text=email_text,\n cc=None,\n bcc=None,\n reply_to=None,\n
tags=None,\n headers=None\n)\n\nprint("Email sent successfully!")
\nprint(f"Result: {result}")\n', 'id': 'wiki_search_and_email'},
result="Searching Wikipedia for 'Artificial Intelligence'...\nSearch
completed. Result length: 1369 characters\nSending email to
[email protected]...\nEmail sent successfully!\nResult: Email
sent successfully. Email ID: d6f51fcd-8119-43b8-b691-6a1182b14986",
tool_call_id='toolu_01MZYpKWZoJTCSRSNUojLnKo', images=None)]
=========================================================================
'''
Loading