From 1fa41af918f7253d4ceb9d78ab25c1415614508f Mon Sep 17 00:00:00 2001 From: Shreya Keshive Date: Fri, 13 Jun 2025 20:18:06 +0000 Subject: [PATCH] Support MCP StreamableHTTPClientTransport (#1014) --- packages/core/src/config/config.ts | 2 + packages/core/src/tools/mcp-client.ts | 18 ++-- .../src/tools/websocket-client-transport.ts | 97 ------------------- 3 files changed, 12 insertions(+), 105 deletions(-) delete mode 100644 packages/core/src/tools/websocket-client-transport.ts diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 48408201..4bc5d08b 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -45,6 +45,8 @@ export class MCPServerConfig { readonly cwd?: string, // For sse transport readonly url?: string, + // For streamable http transport + readonly httpUrl?: string, // For websocket transport readonly tcp?: string, // Common diff --git a/packages/core/src/tools/mcp-client.ts b/packages/core/src/tools/mcp-client.ts index 6f498730..0ea0bd0e 100644 --- a/packages/core/src/tools/mcp-client.ts +++ b/packages/core/src/tools/mcp-client.ts @@ -7,12 +7,12 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; import { parse } from 'shell-quote'; import { MCPServerConfig } from '../config/config.js'; import { DiscoveredMCPTool } from './mcp-tool.js'; import { CallableTool, FunctionDeclaration, mcpToTool } from '@google/genai'; import { ToolRegistry } from './tool-registry.js'; -import { WebSocketClientTransport } from './websocket-client-transport.js'; export const MCP_DEFAULT_TIMEOUT_MSEC = 10 * 60 * 1000; // default to 10 minutes @@ -163,10 +163,12 @@ async function connectAndDiscover( updateMCPServerStatus(mcpServerName, MCPServerStatus.CONNECTING); let transport; - if (mcpServerConfig.url) { + if (mcpServerConfig.httpUrl) { + transport = new StreamableHTTPClientTransport( + new URL(mcpServerConfig.httpUrl), + ); + } else if (mcpServerConfig.url) { transport = new SSEClientTransport(new URL(mcpServerConfig.url)); - } else if (mcpServerConfig.tcp) { - transport = new WebSocketClientTransport(new URL(mcpServerConfig.tcp)); } else if (mcpServerConfig.command) { transport = new StdioClientTransport({ command: mcpServerConfig.command, @@ -180,7 +182,7 @@ async function connectAndDiscover( }); } else { console.error( - `MCP server '${mcpServerName}' has invalid configuration: missing url (for SSE), tcp (for websocket), and command (for stdio). Skipping.`, + `MCP server '${mcpServerName}' has invalid configuration: missing httpUrl (for Streamable HTTP), url (for SSE), and command (for stdio). Skipping.`, ); // Update status to disconnected updateMCPServerStatus(mcpServerName, MCPServerStatus.DISCONNECTED); @@ -260,7 +262,7 @@ async function connectAndDiscover( if ( transport instanceof StdioClientTransport || transport instanceof SSEClientTransport || - transport instanceof WebSocketClientTransport + transport instanceof StreamableHTTPClientTransport ) { await transport.close(); } @@ -321,7 +323,7 @@ async function connectAndDiscover( if ( transport instanceof StdioClientTransport || transport instanceof SSEClientTransport || - transport instanceof WebSocketClientTransport + transport instanceof StreamableHTTPClientTransport ) { await transport.close(); } @@ -341,7 +343,7 @@ async function connectAndDiscover( if ( transport instanceof StdioClientTransport || transport instanceof SSEClientTransport || - transport instanceof WebSocketClientTransport + transport instanceof StreamableHTTPClientTransport ) { await transport.close(); // Update status to disconnected diff --git a/packages/core/src/tools/websocket-client-transport.ts b/packages/core/src/tools/websocket-client-transport.ts deleted file mode 100644 index ff754c0a..00000000 --- a/packages/core/src/tools/websocket-client-transport.ts +++ /dev/null @@ -1,97 +0,0 @@ -/** - * @license - * Copyright 2025 Google LLC - * SPDX-License-Identifier: Apache-2.0 - */ - -import WebSocket from 'ws'; -import { - Transport, - TransportSendOptions, -} from '@modelcontextprotocol/sdk/shared/transport.js'; -import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js'; -import { AuthInfo } from '@modelcontextprotocol/sdk/server/auth/types.js'; - -export class WebSocketClientTransport implements Transport { - private socket: WebSocket | null = null; - onclose?: () => void; - onerror?: (error: Error) => void; - onmessage?: ( - message: JSONRPCMessage, - extra?: { authInfo?: AuthInfo }, - ) => void; - - constructor(private readonly url: URL) {} - - async start(): Promise { - return new Promise((resolve, reject) => { - const handshakeTimeoutDuration = 10000; - let connectionTimeout: NodeJS.Timeout | null = null; - - try { - this.socket = new WebSocket(this.url.toString(), { - handshakeTimeout: handshakeTimeoutDuration, - }); - - connectionTimeout = setTimeout(() => { - this.socket?.close(); - reject( - new Error( - `WebSocket connection timed out after ${handshakeTimeoutDuration}ms`, - ), - ); - }, handshakeTimeoutDuration); - - this.socket.on('open', () => { - clearTimeout(connectionTimeout!); - resolve(); - }); - - this.socket.on('message', (data) => { - try { - const parsedMessage: JSONRPCMessage = JSON.parse(data.toString()); - this.onmessage?.(parsedMessage, { authInfo: undefined }); // Auth unsupported currently - } catch (error: unknown) { - this.onerror?.( - error instanceof Error ? error : new Error(String(error)), - ); - } - }); - - this.socket.on('error', (error) => { - clearTimeout(connectionTimeout!); - this.onerror?.(error); - reject(error); - }); - - this.socket.on('close', () => { - clearTimeout(connectionTimeout!); - this.onclose?.(); - this.socket = null; - }); - } catch (error: unknown) { - clearTimeout(connectionTimeout!); - reject(error instanceof Error ? error : new Error(String(error))); - } - }); - } - - async close(): Promise { - if (this.socket) { - this.socket.close(); - this.socket = null; - } - } - - async send( - message: JSONRPCMessage, - _options?: TransportSendOptions, - ): Promise { - if (!this.socket || this.socket.readyState !== WebSocket.OPEN) { - throw new Error( - 'WebSocket is not connected or not open. Cannot send message.', - ); - } - this.socket.send(JSON.stringify(message)); - } -}