Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
dcfc85c
agents framework demo
adilhafeez Nov 24, 2025
4140c1c
more changes
adilhafeez Nov 28, 2025
0ac1131
add more changes
adilhafeez Dec 4, 2025
afffa11
Merge branch 'main' into adil/agents_framework
adilhafeez Dec 4, 2025
358fa85
pending changes
adilhafeez Dec 16, 2025
5833e6d
fix tests
adilhafeez Dec 16, 2025
1630066
fix more
adilhafeez Dec 16, 2025
d83ffee
Merge branch 'main' into adil/agents_framework
adilhafeez Dec 16, 2025
8bb64f6
rebase with main and better handle error from mcp
adilhafeez Dec 16, 2025
eb65e94
add trace for filters
adilhafeez Dec 16, 2025
387d6d1
add test for client error, server error and for mcp error
adilhafeez Dec 16, 2025
974141b
update schema validate code and rename kind => type in agent_filter
adilhafeez Dec 16, 2025
07208f9
fix agent description and pre-commit
adilhafeez Dec 16, 2025
54a3d45
fix tests
adilhafeez Dec 16, 2025
3a2c182
add provider specific request parsing in agents chat
adilhafeez Dec 16, 2025
58cf6b2
fix precommit and tests
adilhafeez Dec 16, 2025
ab185b2
cleanup demo
adilhafeez Dec 16, 2025
2abfa18
update readme
adilhafeez Dec 16, 2025
26b791b
fix pre-commit
adilhafeez Dec 16, 2025
ac73806
refactor tracing
adilhafeez Dec 16, 2025
2f5fb3c
fix fmt
adilhafeez Dec 17, 2025
1fc39d5
Merge remote-tracking branch 'origin/main' into adil/agents_framework
adilhafeez Dec 17, 2025
fc52274
fix: handle MessageContent enum in responses API conversion
adilhafeez Dec 17, 2025
bb9503e
address pr feedback
adilhafeez Dec 17, 2025
4d52acf
fix span
adilhafeez Dec 18, 2025
660f8d4
Merge branch 'main' into adil/agents_framework
adilhafeez Dec 18, 2025
488744f
fix build
adilhafeez Dec 18, 2025
3398900
update openai version
adilhafeez Dec 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions arch/arch_config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,38 @@ properties:
type: array
items:
type: object
properties:
id:
type: string
url:
type: string
additionalProperties: false
required:
- id
- url
filters:
type: array
items:
type: object
properties:
id:
type: string
url:
type: string
type:
type: string
enum:
- mcp
transport:
type: string
enum:
- streamable-http
tool:
type: string
additionalProperties: false
required:
- id
- url
listeners:
oneOf:
- type: array
Expand Down
32 changes: 16 additions & 16 deletions arch/envoy.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,21 +214,21 @@ static_resources:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
generate_request_id: true
tracing:
provider:
name: envoy.tracers.opentelemetry
typed_config:
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
grpc_service:
envoy_grpc:
cluster_name: opentelemetry_collector
timeout: 0.250s
service_name: tools
random_sampling:
value: {{ arch_tracing.random_sampling }}
{% endif %}
# {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
# generate_request_id: true
# tracing:
# provider:
# name: envoy.tracers.opentelemetry
# typed_config:
# "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
# grpc_service:
# envoy_grpc:
# cluster_name: opentelemetry_collector
# timeout: 0.250s
# service_name: tools
# random_sampling:
# value: {{ arch_tracing.random_sampling }}
# {% endif %}
stat_prefix: outbound_api_traffic
codec_type: AUTO
scheme_header_transformation:
Expand Down Expand Up @@ -299,7 +299,7 @@ static_resources:
envoy_grpc:
cluster_name: opentelemetry_collector
timeout: 0.250s
service_name: arch_gateway
service_name: plano(inbound)
random_sampling:
value: {{ arch_tracing.random_sampling }}
{% endif %}
Expand Down
11 changes: 10 additions & 1 deletion arch/tools/cli/config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,17 @@ def validate_and_render_schema():

# Process agents section and convert to endpoints
agents = config_yaml.get("agents", [])
for agent in agents:
filters = config_yaml.get("filters", [])
agents_combined = agents + filters
agent_id_keys = set()

for agent in agents_combined:
agent_id = agent.get("id")
if agent_id in agent_id_keys:
raise Exception(
f"Duplicate agent id {agent_id}, please provide unique id for each agent"
)
agent_id_keys.add(agent_id)
agent_endpoint = agent.get("url")

if agent_id and agent_endpoint:
Expand Down
4 changes: 4 additions & 0 deletions arch/tools/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def convert_legacy_listeners(
"timeout": "30s",
}

# Handle None case
if listeners is None:
return [llm_gateway_listener], llm_gateway_listener, prompt_gateway_listener

if isinstance(listeners, dict):
# legacy listeners
# check if type is array or object
Expand Down
27 changes: 11 additions & 16 deletions arch/tools/test/test_config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,16 @@ def test_validate_and_render_happy_path_agent_config(monkeypatch):
version: v0.3.0

agents:
- name: query_rewriter
kind: openai
endpoint: http://localhost:10500
- name: context_builder
kind: openai
endpoint: http://localhost:10501
- name: response_generator
kind: openai
endpoint: http://localhost:10502
- name: research_agent
kind: openai
endpoint: http://localhost:10500
- name: input_guard_rails
kind: openai
endpoint: http://localhost:10503
- id: query_rewriter
url: http://localhost:10500
- id: context_builder
url: http://localhost:10501
- id: response_generator
url: http://localhost:10502
- id: research_agent
url: http://localhost:10500
- id: input_guard_rails
url: http://localhost:10503

listeners:
- name: tmobile
Expand Down Expand Up @@ -156,7 +151,7 @@ def test_validate_and_render_happy_path_agent_config(monkeypatch):
mock.mock_open().return_value, # ARCH_CONFIG_FILE_RENDERED (write)
]
with mock.patch("builtins.open", m_open):
with mock.patch("config_generator.Environment"):
with mock.patch("cli.config_generator.Environment"):
validate_and_render_schema()


Expand Down
153 changes: 129 additions & 24 deletions crates/brightstaff/src/handlers/agent_chat_completions.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use std::sync::Arc;
use std::time::{Instant, SystemTime};

use bytes::Bytes;
use hermesllm::apis::openai::ChatCompletionsRequest;
use common::consts::TRACE_PARENT_HEADER;
use common::traces::{SpanBuilder, SpanKind, parse_traceparent, generate_random_span_id};
use hermesllm::apis::OpenAIMessage;
use hermesllm::clients::SupportedAPIsFromClient;
use hermesllm::providers::request::ProviderRequest;
use hermesllm::ProviderRequestType;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use hyper::{Request, Response};
use serde::ser::Error as SerError;
use tracing::{debug, info, warn};

use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::llm_router::RouterService;
use crate::tracing::{OperationNameBuilder, operation_component, http};

/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
Expand All @@ -33,8 +41,17 @@ pub async fn agent_chat(
_: String,
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match handle_agent_chat(request, router_service, agents_list, listeners).await {
match handle_agent_chat(
request,
router_service,
agents_list,
listeners,
trace_collector,
)
.await
{
Ok(response) => Ok(response),
Err(err) => {
// Check if this is a client error from the pipeline that should be cascaded
Expand Down Expand Up @@ -109,10 +126,11 @@ async fn handle_agent_chat(
router_service: Arc<RouterService>,
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(router_service);
let pipeline_processor = PipelineProcessor::default();
let mut pipeline_processor = PipelineProcessor::default();
let response_handler = ResponseHandler::new();

// Extract listener name from headers
Expand All @@ -132,6 +150,13 @@ async fn handle_agent_chat(
info!("Handling request for listener: {}", listener.name);

// Parse request body
let request_path = request
.uri()
.path()
.to_string()
.strip_prefix("/agents")
.unwrap()
.to_string();
let request_headers = request.headers().clone();
let chat_request_bytes = request.collect().await?.to_bytes();

Expand All @@ -140,61 +165,141 @@ async fn handle_agent_chat(
String::from_utf8_lossy(&chat_request_bytes)
);

let chat_completions_request: ChatCompletionsRequest =
serde_json::from_slice(&chat_request_bytes).map_err(|err| {
warn!(
"Failed to parse request body as ChatCompletionsRequest: {}",
err
);
AgentFilterChainError::RequestParsing(err)
// Determine the API type from the endpoint
let api_type =
SupportedAPIsFromClient::from_endpoint(request_path.as_str()).ok_or_else(|| {
let err_msg = format!("Unsupported endpoint: {}", request_path);
warn!("{}", err_msg);
AgentFilterChainError::RequestParsing(serde_json::Error::custom(err_msg))
})?;

let client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) {
Ok(request) => request,
Err(err) => {
warn!("Failed to parse request as ProviderRequestType: {}", err);
let err_msg = format!("Failed to parse request: {}", err);
return Err(AgentFilterChainError::RequestParsing(
serde_json::Error::custom(err_msg),
));
}
};

let message: Vec<OpenAIMessage> = client_request.get_messages();

// let chat_completions_request: ChatCompletionsRequest =
// serde_json::from_slice(&chat_request_bytes).map_err(|err| {
// warn!(
// "Failed to parse request body as ChatCompletionsRequest: {}",
// err
// );
// AgentFilterChainError::RequestParsing(err)
// })?;

// Extract trace parent for routing
let trace_parent = request_headers
.iter()
.find(|(key, _)| key.as_str() == "traceparent")
.find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER)
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());

// Create agent map for pipeline processing and agent selection
let agent_map = {
let agents = agents_list.read().await;
let agents = agents.as_ref().unwrap();
agent_selector.create_agent_map(agents)
};

// Parse trace parent to get trace_id and parent_span_id
let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent {
parse_traceparent(tp)
} else {
(String::new(), None)
};

// Select appropriate agent using arch router llm model
let selected_agent = agent_selector
.select_agent(&chat_completions_request.messages, &listener, trace_parent)
.select_agent(&message, &listener, trace_parent.clone())
.await?;

debug!("Processing agent pipeline: {}", selected_agent.id);

// Create agent map for pipeline processing
let agent_map = {
let agents = agents_list.read().await;
let agents = agents.as_ref().unwrap();
agent_selector.create_agent_map(agents)
};
// Record the start time for agent span
let agent_start_time = SystemTime::now();
let agent_start_instant = Instant::now();
// let (span_id, trace_id) = trace_collector.start_span(
// trace_parent.clone(),
// operation_component::AGENT,
// &format!("/agents{}", request_path),
// &selected_agent.id,
// );

let span_id = generate_random_span_id();

// Process the filter chain
let processed_messages = pipeline_processor
let chat_history = pipeline_processor
.process_filter_chain(
&chat_completions_request,
&message,
&selected_agent,
&agent_map,
&request_headers,
Some(&trace_collector),
trace_id.clone(),
span_id.clone(),
)
.await?;

// Get terminal agent and send final response
let terminal_agent_name = selected_agent.id;
let terminal_agent_name = selected_agent.id.clone();
let terminal_agent = agent_map.get(&terminal_agent_name).unwrap();

debug!("Processing terminal agent: {}", terminal_agent_name);
debug!("Terminal agent details: {:?}", terminal_agent);

let llm_response = pipeline_processor
.invoke_upstream_agent(
&processed_messages,
&chat_completions_request,
.invoke_agent(
&chat_history,
client_request,
terminal_agent,
&request_headers,
trace_id.clone(),
span_id.clone(),
)
.await?;

// Record agent span after processing is complete
let agent_end_time = SystemTime::now();
let agent_elapsed = agent_start_instant.elapsed();

// Build full path with /agents prefix
let full_path = format!("/agents{}", request_path);

// Build operation name: POST {full_path} {agent_name}
let operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path(&full_path)
.with_target(&terminal_agent_name)
.build();

let mut span_builder = SpanBuilder::new(&operation_name)
.with_span_id(span_id)
.with_kind(SpanKind::Internal)
.with_start_time(agent_start_time)
.with_end_time(agent_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, full_path)
.with_attribute("agent.name", terminal_agent_name.clone())
.with_attribute("duration_ms", format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0));

if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if let Some(parent_id) = parent_span_id {
span_builder = span_builder.with_parent_span_id(parent_id);
}

let span = span_builder.build();
// Use plano(agent) as service name for the agent processing span
trace_collector.record_span(operation_component::AGENT, span);

// Create streaming response
response_handler
.create_streaming_response(llm_response)
Expand Down
Loading
Loading