Skip to content

Commit 2f91214

Browse files
authored
Use mcp tools for filter chain (#621)
* agents framework demo * more changes * add more changes * pending changes * fix tests * fix more * rebase with main and better handle error from mcp * add trace for filters * add test for client error, server error and for mcp error * update schema validate code and rename kind => type in agent_filter * fix agent description and pre-commit * fix tests * add provider specific request parsing in agents chat * fix precommit and tests * cleanup demo * update readme * fix pre-commit * refactor tracing * fix fmt * fix: handle MessageContent enum in responses API conversion - Update request.rs to handle new MessageContent enum structure from main - MessageContent can now be Text(String) or Items(Vec<InputContent>) - Handle new InputItem variants (ItemReference, FunctionCallOutput) - Fixes compilation error after merging latest main (#632) * address pr feedback * fix span * fix build * update openai version
1 parent cb82a83 commit 2f91214

40 files changed

+4882
-186
lines changed

arch/arch_config_schema.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,38 @@ properties:
1414
type: array
1515
items:
1616
type: object
17+
properties:
18+
id:
19+
type: string
20+
url:
21+
type: string
22+
additionalProperties: false
23+
required:
24+
- id
25+
- url
26+
filters:
27+
type: array
28+
items:
29+
type: object
30+
properties:
31+
id:
32+
type: string
33+
url:
34+
type: string
35+
type:
36+
type: string
37+
enum:
38+
- mcp
39+
transport:
40+
type: string
41+
enum:
42+
- streamable-http
43+
tool:
44+
type: string
45+
additionalProperties: false
46+
required:
47+
- id
48+
- url
1749
listeners:
1850
oneOf:
1951
- type: array

arch/envoy.template.yaml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -214,21 +214,21 @@ static_resources:
214214
- name: envoy.filters.network.http_connection_manager
215215
typed_config:
216216
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
217-
{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
218-
generate_request_id: true
219-
tracing:
220-
provider:
221-
name: envoy.tracers.opentelemetry
222-
typed_config:
223-
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
224-
grpc_service:
225-
envoy_grpc:
226-
cluster_name: opentelemetry_collector
227-
timeout: 0.250s
228-
service_name: tools
229-
random_sampling:
230-
value: {{ arch_tracing.random_sampling }}
231-
{% endif %}
217+
# {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
218+
# generate_request_id: true
219+
# tracing:
220+
# provider:
221+
# name: envoy.tracers.opentelemetry
222+
# typed_config:
223+
# "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
224+
# grpc_service:
225+
# envoy_grpc:
226+
# cluster_name: opentelemetry_collector
227+
# timeout: 0.250s
228+
# service_name: tools
229+
# random_sampling:
230+
# value: {{ arch_tracing.random_sampling }}
231+
# {% endif %}
232232
stat_prefix: outbound_api_traffic
233233
codec_type: AUTO
234234
scheme_header_transformation:
@@ -299,7 +299,7 @@ static_resources:
299299
envoy_grpc:
300300
cluster_name: opentelemetry_collector
301301
timeout: 0.250s
302-
service_name: arch_gateway
302+
service_name: plano(inbound)
303303
random_sampling:
304304
value: {{ arch_tracing.random_sampling }}
305305
{% endif %}

arch/tools/cli/config_generator.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,17 @@ def validate_and_render_schema():
101101

102102
# Process agents section and convert to endpoints
103103
agents = config_yaml.get("agents", [])
104-
for agent in agents:
104+
filters = config_yaml.get("filters", [])
105+
agents_combined = agents + filters
106+
agent_id_keys = set()
107+
108+
for agent in agents_combined:
105109
agent_id = agent.get("id")
110+
if agent_id in agent_id_keys:
111+
raise Exception(
112+
f"Duplicate agent id {agent_id}, please provide unique id for each agent"
113+
)
114+
agent_id_keys.add(agent_id)
106115
agent_endpoint = agent.get("url")
107116

108117
if agent_id and agent_endpoint:

arch/tools/cli/utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ def convert_legacy_listeners(
5757
"timeout": "30s",
5858
}
5959

60+
# Handle None case
61+
if listeners is None:
62+
return [llm_gateway_listener], llm_gateway_listener, prompt_gateway_listener
63+
6064
if isinstance(listeners, dict):
6165
# legacy listeners
6266
# check if type is array or object

arch/tools/test/test_config_generator.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,21 +94,16 @@ def test_validate_and_render_happy_path_agent_config(monkeypatch):
9494
version: v0.3.0
9595
9696
agents:
97-
- name: query_rewriter
98-
kind: openai
99-
endpoint: http://localhost:10500
100-
- name: context_builder
101-
kind: openai
102-
endpoint: http://localhost:10501
103-
- name: response_generator
104-
kind: openai
105-
endpoint: http://localhost:10502
106-
- name: research_agent
107-
kind: openai
108-
endpoint: http://localhost:10500
109-
- name: input_guard_rails
110-
kind: openai
111-
endpoint: http://localhost:10503
97+
- id: query_rewriter
98+
url: http://localhost:10500
99+
- id: context_builder
100+
url: http://localhost:10501
101+
- id: response_generator
102+
url: http://localhost:10502
103+
- id: research_agent
104+
url: http://localhost:10500
105+
- id: input_guard_rails
106+
url: http://localhost:10503
112107
113108
listeners:
114109
- name: tmobile
@@ -156,7 +151,7 @@ def test_validate_and_render_happy_path_agent_config(monkeypatch):
156151
mock.mock_open().return_value, # ARCH_CONFIG_FILE_RENDERED (write)
157152
]
158153
with mock.patch("builtins.open", m_open):
159-
with mock.patch("config_generator.Environment"):
154+
with mock.patch("cli.config_generator.Environment"):
160155
validate_and_render_schema()
161156

162157

crates/brightstaff/src/handlers/agent_chat_completions.rs

Lines changed: 129 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
use std::sync::Arc;
2+
use std::time::{Instant, SystemTime};
23

34
use bytes::Bytes;
4-
use hermesllm::apis::openai::ChatCompletionsRequest;
5+
use common::consts::TRACE_PARENT_HEADER;
6+
use common::traces::{SpanBuilder, SpanKind, parse_traceparent, generate_random_span_id};
7+
use hermesllm::apis::OpenAIMessage;
8+
use hermesllm::clients::SupportedAPIsFromClient;
9+
use hermesllm::providers::request::ProviderRequest;
10+
use hermesllm::ProviderRequestType;
511
use http_body_util::combinators::BoxBody;
612
use http_body_util::BodyExt;
713
use hyper::{Request, Response};
14+
use serde::ser::Error as SerError;
815
use tracing::{debug, info, warn};
916

1017
use super::agent_selector::{AgentSelectionError, AgentSelector};
1118
use super::pipeline_processor::{PipelineError, PipelineProcessor};
1219
use super::response_handler::ResponseHandler;
1320
use crate::router::llm_router::RouterService;
21+
use crate::tracing::{OperationNameBuilder, operation_component, http};
1422

1523
/// Main errors for agent chat completions
1624
#[derive(Debug, thiserror::Error)]
@@ -33,8 +41,17 @@ pub async fn agent_chat(
3341
_: String,
3442
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
3543
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
44+
trace_collector: Arc<common::traces::TraceCollector>,
3645
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
37-
match handle_agent_chat(request, router_service, agents_list, listeners).await {
46+
match handle_agent_chat(
47+
request,
48+
router_service,
49+
agents_list,
50+
listeners,
51+
trace_collector,
52+
)
53+
.await
54+
{
3855
Ok(response) => Ok(response),
3956
Err(err) => {
4057
// Check if this is a client error from the pipeline that should be cascaded
@@ -109,10 +126,11 @@ async fn handle_agent_chat(
109126
router_service: Arc<RouterService>,
110127
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
111128
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
129+
trace_collector: Arc<common::traces::TraceCollector>,
112130
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
113131
// Initialize services
114132
let agent_selector = AgentSelector::new(router_service);
115-
let pipeline_processor = PipelineProcessor::default();
133+
let mut pipeline_processor = PipelineProcessor::default();
116134
let response_handler = ResponseHandler::new();
117135

118136
// Extract listener name from headers
@@ -132,6 +150,13 @@ async fn handle_agent_chat(
132150
info!("Handling request for listener: {}", listener.name);
133151

134152
// Parse request body
153+
let request_path = request
154+
.uri()
155+
.path()
156+
.to_string()
157+
.strip_prefix("/agents")
158+
.unwrap()
159+
.to_string();
135160
let request_headers = request.headers().clone();
136161
let chat_request_bytes = request.collect().await?.to_bytes();
137162

@@ -140,61 +165,141 @@ async fn handle_agent_chat(
140165
String::from_utf8_lossy(&chat_request_bytes)
141166
);
142167

143-
let chat_completions_request: ChatCompletionsRequest =
144-
serde_json::from_slice(&chat_request_bytes).map_err(|err| {
145-
warn!(
146-
"Failed to parse request body as ChatCompletionsRequest: {}",
147-
err
148-
);
149-
AgentFilterChainError::RequestParsing(err)
168+
// Determine the API type from the endpoint
169+
let api_type =
170+
SupportedAPIsFromClient::from_endpoint(request_path.as_str()).ok_or_else(|| {
171+
let err_msg = format!("Unsupported endpoint: {}", request_path);
172+
warn!("{}", err_msg);
173+
AgentFilterChainError::RequestParsing(serde_json::Error::custom(err_msg))
150174
})?;
151175

176+
let client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) {
177+
Ok(request) => request,
178+
Err(err) => {
179+
warn!("Failed to parse request as ProviderRequestType: {}", err);
180+
let err_msg = format!("Failed to parse request: {}", err);
181+
return Err(AgentFilterChainError::RequestParsing(
182+
serde_json::Error::custom(err_msg),
183+
));
184+
}
185+
};
186+
187+
let message: Vec<OpenAIMessage> = client_request.get_messages();
188+
189+
// let chat_completions_request: ChatCompletionsRequest =
190+
// serde_json::from_slice(&chat_request_bytes).map_err(|err| {
191+
// warn!(
192+
// "Failed to parse request body as ChatCompletionsRequest: {}",
193+
// err
194+
// );
195+
// AgentFilterChainError::RequestParsing(err)
196+
// })?;
197+
152198
// Extract trace parent for routing
153199
let trace_parent = request_headers
154200
.iter()
155-
.find(|(key, _)| key.as_str() == "traceparent")
201+
.find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER)
156202
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
157203

204+
// Create agent map for pipeline processing and agent selection
205+
let agent_map = {
206+
let agents = agents_list.read().await;
207+
let agents = agents.as_ref().unwrap();
208+
agent_selector.create_agent_map(agents)
209+
};
210+
211+
// Parse trace parent to get trace_id and parent_span_id
212+
let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent {
213+
parse_traceparent(tp)
214+
} else {
215+
(String::new(), None)
216+
};
217+
158218
// Select appropriate agent using arch router llm model
159219
let selected_agent = agent_selector
160-
.select_agent(&chat_completions_request.messages, &listener, trace_parent)
220+
.select_agent(&message, &listener, trace_parent.clone())
161221
.await?;
162222

163223
debug!("Processing agent pipeline: {}", selected_agent.id);
164224

165-
// Create agent map for pipeline processing
166-
let agent_map = {
167-
let agents = agents_list.read().await;
168-
let agents = agents.as_ref().unwrap();
169-
agent_selector.create_agent_map(agents)
170-
};
225+
// Record the start time for agent span
226+
let agent_start_time = SystemTime::now();
227+
let agent_start_instant = Instant::now();
228+
// let (span_id, trace_id) = trace_collector.start_span(
229+
// trace_parent.clone(),
230+
// operation_component::AGENT,
231+
// &format!("/agents{}", request_path),
232+
// &selected_agent.id,
233+
// );
234+
235+
let span_id = generate_random_span_id();
171236

172237
// Process the filter chain
173-
let processed_messages = pipeline_processor
238+
let chat_history = pipeline_processor
174239
.process_filter_chain(
175-
&chat_completions_request,
240+
&message,
176241
&selected_agent,
177242
&agent_map,
178243
&request_headers,
244+
Some(&trace_collector),
245+
trace_id.clone(),
246+
span_id.clone(),
179247
)
180248
.await?;
181249

182250
// Get terminal agent and send final response
183-
let terminal_agent_name = selected_agent.id;
251+
let terminal_agent_name = selected_agent.id.clone();
184252
let terminal_agent = agent_map.get(&terminal_agent_name).unwrap();
185253

186254
debug!("Processing terminal agent: {}", terminal_agent_name);
187255
debug!("Terminal agent details: {:?}", terminal_agent);
188256

189257
let llm_response = pipeline_processor
190-
.invoke_upstream_agent(
191-
&processed_messages,
192-
&chat_completions_request,
258+
.invoke_agent(
259+
&chat_history,
260+
client_request,
193261
terminal_agent,
194262
&request_headers,
263+
trace_id.clone(),
264+
span_id.clone(),
195265
)
196266
.await?;
197267

268+
// Record agent span after processing is complete
269+
let agent_end_time = SystemTime::now();
270+
let agent_elapsed = agent_start_instant.elapsed();
271+
272+
// Build full path with /agents prefix
273+
let full_path = format!("/agents{}", request_path);
274+
275+
// Build operation name: POST {full_path} {agent_name}
276+
let operation_name = OperationNameBuilder::new()
277+
.with_method("POST")
278+
.with_path(&full_path)
279+
.with_target(&terminal_agent_name)
280+
.build();
281+
282+
let mut span_builder = SpanBuilder::new(&operation_name)
283+
.with_span_id(span_id)
284+
.with_kind(SpanKind::Internal)
285+
.with_start_time(agent_start_time)
286+
.with_end_time(agent_end_time)
287+
.with_attribute(http::METHOD, "POST")
288+
.with_attribute(http::TARGET, full_path)
289+
.with_attribute("agent.name", terminal_agent_name.clone())
290+
.with_attribute("duration_ms", format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0));
291+
292+
if !trace_id.is_empty() {
293+
span_builder = span_builder.with_trace_id(trace_id);
294+
}
295+
if let Some(parent_id) = parent_span_id {
296+
span_builder = span_builder.with_parent_span_id(parent_id);
297+
}
298+
299+
let span = span_builder.build();
300+
// Use plano(agent) as service name for the agent processing span
301+
trace_collector.record_span(operation_component::AGENT, span);
302+
198303
// Create streaming response
199304
response_handler
200305
.create_streaming_response(llm_response)

0 commit comments

Comments
 (0)