Skip to content

feat: add sse server implementation for mcp client #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/two-bats-judge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openai/agents-core': patch
---

feat: add sse server implementation for mcp
3 changes: 2 additions & 1 deletion examples/mcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"start:hosted-mcp-on-approval": "tsx hosted-mcp-on-approval.ts",
"start:hosted-mcp-human-in-the-loop": "tsx hosted-mcp-human-in-the-loop.ts",
"start:hosted-mcp-simple": "tsx hosted-mcp-simple.ts",
"start:tool-filter": "tsx tool-filter-example.ts"
"start:tool-filter": "tsx tool-filter-example.ts",
"start:sse": "tsx sse-example.ts"
}
}
32 changes: 32 additions & 0 deletions examples/mcp/sse-example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Agent, run, MCPServerSSE, withTrace } from '@openai/agents';

async function main() {
const mcpServer = new MCPServerSSE({
url: 'https://gitmcp.io/openai/codex',
name: 'SSE MCP Server',
});

const agent = new Agent({
name: 'SSE Assistant',
instructions: 'Use the tools to respond to user requests.',
mcpServers: [mcpServer],
});

try {
await withTrace('SSE MCP Server Example', async () => {
await mcpServer.connect();
const result = await run(
agent,
'Please help me with the available tools.',
);
console.log(result.finalOutput);
});
} finally {
await mcpServer.close();
}
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
1 change: 1 addition & 0 deletions packages/agents-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export {
MCPServer,
MCPServerStdio,
MCPServerStreamableHttp,
MCPServerSSE,
} from './mcp';
export {
MCPToolFilterCallable,
Expand Down
95 changes: 95 additions & 0 deletions packages/agents-core/src/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { UserError } from './errors';
import {
MCPServerStdio as UnderlyingMCPServerStdio,
MCPServerStreamableHttp as UnderlyingMCPServerStreamableHttp,
MCPServerSSE as UnderlyingMCPServerSSE,
} from '@openai/agents-core/_shims';
import { getCurrentSpan, withMCPListToolsSpan } from './tracing';
import { logger as globalLogger, getLogger, Logger } from './logger';
Expand All @@ -24,6 +25,9 @@ export const DEFAULT_STDIO_MCP_CLIENT_LOGGER_NAME =
export const DEFAULT_STREAMABLE_HTTP_MCP_CLIENT_LOGGER_NAME =
'openai-agents:streamable-http-mcp-client';

export const DEFAULT_SSE_MCP_CLIENT_LOGGER_NAME =
'openai-agents:sse-mcp-client';

/**
* Interface for MCP server implementations.
* Provides methods for connecting, listing tools, calling tools, and cleanup.
Expand Down Expand Up @@ -113,6 +117,41 @@ export abstract class BaseMCPServerStreamableHttp implements MCPServer {
}
}

export abstract class BaseMCPServerSSE implements MCPServer {
public cacheToolsList: boolean;
protected _cachedTools: any[] | undefined = undefined;
public toolFilter?: MCPToolFilterCallable | MCPToolFilterStatic;

protected logger: Logger;
constructor(options: MCPServerSSEOptions) {
this.logger =
options.logger ?? getLogger(DEFAULT_SSE_MCP_CLIENT_LOGGER_NAME);
this.cacheToolsList = options.cacheToolsList ?? false;
this.toolFilter = options.toolFilter;
}

abstract get name(): string;
abstract connect(): Promise<void>;
abstract close(): Promise<void>;
abstract listTools(): Promise<any[]>;
abstract callTool(
_toolName: string,
_args: Record<string, unknown> | null,
): Promise<CallToolResultContent>;
abstract invalidateToolsCache(): Promise<void>;

/**
* Logs a debug message when debug logging is enabled.
* @param buildMessage A function that returns the message to log.
*/
protected debugLog(buildMessage: () => string): void {
if (debug.enabled(this.logger.namespace)) {
// only when this is true, the function to build the string is called
this.logger.debug(buildMessage());
}
}
}

/**
* Minimum MCP tool data definition.
* This type definition does not intend to cover all possible properties.
Expand Down Expand Up @@ -206,6 +245,42 @@ export class MCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
}
}

export class MCPServerSSE extends BaseMCPServerSSE {
private underlying: UnderlyingMCPServerSSE;
constructor(options: MCPServerSSEOptions) {
super(options);
this.underlying = new UnderlyingMCPServerSSE(options);
}
get name(): string {
return this.underlying.name;
}
connect(): Promise<void> {
return this.underlying.connect();
}
close(): Promise<void> {
return this.underlying.close();
}
async listTools(): Promise<MCPTool[]> {
if (this.cacheToolsList && this._cachedTools) {
return this._cachedTools;
}
const tools = await this.underlying.listTools();
if (this.cacheToolsList) {
this._cachedTools = tools;
}
return tools;
}
callTool(
toolName: string,
args: Record<string, unknown> | null,
): Promise<CallToolResultContent> {
return this.underlying.callTool(toolName, args);
}
invalidateToolsCache(): Promise<void> {
return this.underlying.invalidateToolsCache();
}
}

/**
* Fetches and flattens all tools from multiple MCP servers.
* Logs and skips any servers that fail to respond.
Expand Down Expand Up @@ -467,6 +542,26 @@ export interface MCPServerStreamableHttpOptions {
// ----------------------------------------------------
}

export interface MCPServerSSEOptions {
url: string;
cacheToolsList?: boolean;
clientSessionTimeoutSeconds?: number;
name?: string;
logger?: Logger;
toolFilter?: MCPToolFilterCallable | MCPToolFilterStatic;
timeout?: number;

// ----------------------------------------------------
// OAuth
// import { OAuthClientProvider } from '@modelcontextprotocol/sdk/client/auth.js';
authProvider?: any;
// RequestInit
requestInit?: any;
// import { SSEReconnectionOptions } from '@modelcontextprotocol/sdk/client/sse.js';
eventSourceInit?: any;
// ----------------------------------------------------
}

/**
* Represents a JSON-RPC request message.
*/
Expand Down
32 changes: 32 additions & 0 deletions packages/agents-core/src/shims/mcp-server/browser.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import {
BaseMCPServerSSE,
BaseMCPServerStdio,
BaseMCPServerStreamableHttp,
CallToolResultContent,
MCPServerSSEOptions,
MCPServerStdioOptions,
MCPServerStreamableHttpOptions,
MCPTool,
Expand Down Expand Up @@ -60,3 +62,33 @@ export class MCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
throw new Error('Method not implemented.');
}
}

export class MCPServerSSE extends BaseMCPServerSSE {
constructor(params: MCPServerSSEOptions) {
super(params);
}

get name(): string {
return 'MCPServerSSE';
}
connect(): Promise<void> {
throw new Error('Method not implemented.');
}
close(): Promise<void> {
throw new Error('Method not implemented.');
}

listTools(): Promise<MCPTool[]> {
throw new Error('Method not implemented.');
}
callTool(
_toolName: string,
_args: Record<string, unknown> | null,
): Promise<CallToolResultContent> {
throw new Error('Method not implemented.');
}

invalidateToolsCache(): Promise<void> {
throw new Error('Method not implemented.');
}
}
123 changes: 123 additions & 0 deletions packages/agents-core/src/shims/mcp-server/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import { DEFAULT_REQUEST_TIMEOUT_MSEC } from '@modelcontextprotocol/sdk/shared/p
import {
BaseMCPServerStdio,
BaseMCPServerStreamableHttp,
BaseMCPServerSSE,
CallToolResultContent,
DefaultMCPServerStdioOptions,
InitializeResult,
MCPServerStdioOptions,
MCPServerStreamableHttpOptions,
MCPServerSSEOptions,
MCPTool,
invalidateServerToolsCache,
} from '../../mcp';
Expand Down Expand Up @@ -166,6 +168,127 @@ export class NodeMCPServerStdio extends BaseMCPServerStdio {
}
}

export class NodeMCPServerSSE extends BaseMCPServerSSE {
protected session: Client | null = null;
protected _cacheDirty = true;
protected _toolsList: any[] = [];
protected serverInitializeResult: InitializeResult | null = null;
protected clientSessionTimeoutSeconds?: number;
protected timeout: number;

params: MCPServerSSEOptions;
private _name: string;
private transport: any = null;

constructor(params: MCPServerSSEOptions) {
super(params);
this.clientSessionTimeoutSeconds = params.clientSessionTimeoutSeconds ?? 5;
this.params = params;
this._name = params.name || `sse: ${this.params.url}`;
this.timeout = params.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
}

async connect(): Promise<void> {
try {
const { SSEClientTransport } = await import(
'@modelcontextprotocol/sdk/client/sse.js'
).catch(failedToImport);
const { Client } = await import(
'@modelcontextprotocol/sdk/client/index.js'
).catch(failedToImport);
this.transport = new SSEClientTransport(new URL(this.params.url), {
authProvider: this.params.authProvider,
requestInit: this.params.requestInit,
eventSourceInit: this.params.eventSourceInit,
});
this.session = new Client({
name: this._name,
version: '1.0.0', // You may want to make this configurable
});
await this.session.connect(this.transport);
this.serverInitializeResult = {
serverInfo: { name: this._name, version: '1.0.0' },
} as InitializeResult;
} catch (e) {
this.logger.error('Error initializing MCP server:', e);
await this.close();
throw e;
}
this.debugLog(() => `Connected to MCP server: ${this._name}`);
}

async invalidateToolsCache(): Promise<void> {
await invalidateServerToolsCache(this.name);
this._cacheDirty = true;
}

async listTools(): Promise<MCPTool[]> {
const { ListToolsResultSchema } = await import(
'@modelcontextprotocol/sdk/types.js'
).catch(failedToImport);
if (!this.session) {
throw new Error(
'Server not initialized. Make sure you call connect() first.',
);
}
if (this.cacheToolsList && !this._cacheDirty && this._toolsList) {
return this._toolsList;
}

this._cacheDirty = false;
const response = await this.session.listTools();
this.debugLog(() => `Listed tools: ${JSON.stringify(response)}`);
this._toolsList = ListToolsResultSchema.parse(response).tools;
return this._toolsList;
}

async callTool(
toolName: string,
args: Record<string, unknown> | null,
): Promise<CallToolResultContent> {
const { CallToolResultSchema } = await import(
'@modelcontextprotocol/sdk/types.js'
).catch(failedToImport);
if (!this.session) {
throw new Error(
'Server not initialized. Make sure you call connect() first.',
);
}
const response = await this.session.callTool(
{
name: toolName,
arguments: args ?? {},
},
undefined,
{
timeout: this.timeout,
},
);
const parsed = CallToolResultSchema.parse(response);
const result = parsed.content;
this.debugLog(
() =>
`Called tool ${toolName} (args: ${JSON.stringify(args)}, result: ${JSON.stringify(result)})`,
);
return result as CallToolResultContent;
}

get name() {
return this._name;
}

async close(): Promise<void> {
if (this.transport) {
await this.transport.close();
this.transport = null;
}
if (this.session) {
await this.session.close();
this.session = null;
}
}
}

export class NodeMCPServerStreamableHttp extends BaseMCPServerStreamableHttp {
protected session: Client | null = null;
protected _cacheDirty = true;
Expand Down
6 changes: 5 additions & 1 deletion packages/agents-core/src/shims/shims-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ export function isTracingLoopRunningByDefault(): boolean {
return false;
}

export { MCPServerStdio, MCPServerStreamableHttp } from './mcp-server/browser';
export {
MCPServerStdio,
MCPServerStreamableHttp,
MCPServerSSE,
} from './mcp-server/browser';

class BrowserTimer implements Timer {
constructor() {}
Expand Down
1 change: 1 addition & 0 deletions packages/agents-core/src/shims/shims-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export function isBrowserEnvironment(): boolean {
export {
NodeMCPServerStdio as MCPServerStdio,
NodeMCPServerStreamableHttp as MCPServerStreamableHttp,
NodeMCPServerSSE as MCPServerSSE,
} from './mcp-server/node';

export { clearTimeout } from 'node:timers';
Expand Down