2.4 MCP 传输机制:stdio / SSE / Streamable HTTP
MCP 传输机制(Transport)是 MCP 客户端(以下简称客户端)与 MCP 服务器(以下简称服务器)通信的一个桥梁,定义了客户端与服务器通信的细节,帮助客户端和服务器交换消息。
MCP 目前定义了三种传输机制用于客户端-服务器通信:
- stdio:通过标准输入和标准输出进行通信
- SSE:通过 HTTP 进行通信,支持流式传输。(协议版本 2024-11-05 开始支持,即将废弃)
- Streamable HTTP:通过 HTTP 进行通信,支持流式传输。(协议版本 2025-03-26 开始支持,用于替代 SSE)
MCP 要求客户端应尽可能支持 stdio。
MCP 的传输机制是可插拔的,也就是说,客户端和服务器不局限于 MCP 定义的这几种传输机制,也可以通过自定义的传输机制来实现通信。
2.4.1 stdio 传输
stdio 即 standard input & output(标准输入 / 输出)。是 MCP 推荐使用的一种传输机制,主要用于本地进程通信。
一、stdio 传输通信流程
基于 stdio 传输的通信流程如图所示:
图:stdio 传输通信流程
通信步骤如下:
- 客户端以子进程的方式启动服务器
- 客户端往服务器的 stdin 写入消息
- 服务器从自身的 stdin 读取消息
- 服务端往自身的 stdout 写入消息
- 客户端从服务器的 stdout 读取消息
- 客户端终止子进程,关闭服务器的 stdin
- 服务器关闭自身的 stdout
二、stdio 传输实现
参考 MCP 官方的 typescript-sdk 来看 stdio 传输机制是如何实现的:
- 启动服务器
以命令行的方式,在本地启动服务器:
npx -y mcp-server-time- 创建 stdio 通信管道
服务器启动时,会创建 stdio 通信管道(pipeline),用于跟客户端进行双向通信。在客户端发送关闭信号,或者服务器异常退出之前,这个通信管道会一直保持,常驻进程。
stdio 传输类实现了 MCP 的 Transport 接口,实现逻辑如下:
export class StdioServerTransport implements Transport { private _readBuffer: ReadBuffer = new ReadBuffer(); private _started = false;
constructor( private _stdin: Readable = process.stdin, private _stdout: Writable = process.stdout ) {}
onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void;}- 从 stdin 读取请求消息
客户端把消息发到通信管道。服务器通过标准输入 stdin 读取客户端发送的消息,以换行符:\n 作为读取完成标识。
服务器读取消息的实现逻辑如下,读取到的消息最终以 JSON-RPC 编码的结构体形式返回:
readMessage(): JSONRPCMessage | null { if (!this._buffer) { return null; }
const index = this._buffer.indexOf("\n"); if (index === -1) { return null; }
const line = this._buffer.toString("utf8", 0, index).replace(/\r$/, ''); this._buffer = this._buffer.subarray(index + 1); return deserializeMessage(line);}- 往 stdout 写入响应消息
服务器运行完内部逻辑,需要给客户端响应消息。
服务器先用 JSON-RPC 编码消息,再把消息写入标准输出 stdout,实现逻辑如下:
send(message: JSONRPCMessage): Promise<void> { return new Promise((resolve) => { const json = serializeMessage(message); if (this._stdout.write(json)) { resolve(); } else { this._stdout.once("drain", resolve); } }); }}上面的第 3,4 两个步骤,演示了客户端请求服务器单向通信的过程:客户端往 stdin 写入消息,服务器从 stdin 读取消息;服务器往 stdout 写入消息,客户端从 stdout 读取消息。同理,如果是服务器给客户端发送通知的单向通信场景,这个步骤应该反过来,变成:服务器往 stdin 写入消息,客户端从 stdin 读取消息;客户端往 stdout 写入消息,服务器从 stdout 读取消息。
- 关闭 stdio 通信管道
客户端退出,给服务器发送关闭信号。服务器通过 stdio 通信管道读到客户端发送的终止信号,或者内部运行错误,主动关闭 stdio 通信管道。
stdio 通信管道关闭之后,客户端与服务器之间不能再相互发送消息,直到再次建立 stdio 通信管道。以下是关闭通信管道的实现逻辑:
async close(): Promise<void> { // Remove our event listeners first this._stdin.off("data", this._ondata); this._stdin.off("error", this._onerror);
// Check if we were the only data listener const remainingDataListeners = this._stdin.listenerCount('data'); if (remainingDataListeners === 0) { // Only pause stdin if we were the only listener // This prevents interfering with other parts of the application that might be using stdin this._stdin.pause(); }
// Clear the buffer and notify closure this._readBuffer.clear(); this.onclose?.();}三、stdio 传输的利弊与适用场景
stdio 传输机制依靠本地进程通信实现,主要优势是:
- 无外部依赖,实现简单
- 无网络传输,通信速度快
- 本地通信,安全性高,无网络攻击风险
也有一些局限性,主要体现在:
- 单进程通信,无法并行处理多个客户端请求
- 进程通信的资源开销大,很难在本地运行非常多的服务
stdio 传输适用于要操作的数据资源位于本地环境,不希望暴露外部访问的场景,比如:通过一个聊天客户端,来总结你的微信消息,微信消息文件存储在你的本地电脑,外部访问不了,也不应该访问。
如果你要访问的是一个远程服务器上的文件,也可以使用 stdio 传输,流程会复杂一些:
- 先写一个 API 服务,部署在远程服务器,操作远程服务器上的资源,暴露公网访问
- 写一个 MCP 服务器,对接远程 API,再通过 stdio 传输与客户端本地通信
既然 stdio 传输访问远程资源这么麻烦,是不是应该有一种更适合远程资源访问的传输机制?
当然有。可以使用 SSE 传输。
2.4.2 SSE 传输
MCP 使用 SSE(Server-Sent Events,服务器发送事件) 传输来解决远程资源访问的问题。底层是基于 HTTP 通信,通过类似 API 的方式,让客户端直接访问远程资源,而不用通过 stdio 传输做中转。
在 SSE 传输中,服务器作为一个独立进程运行,可以处理多个客户端连接。服务器必须提供两个端点,分别是:
- SSE 端点,供客户端与服务器建立双向通信连接(GET 请求)
- 消息端点,供客户端向服务器发送消息(POST 请求)
一、SSE 传输通信流程
基于 SSE 传输的通信流程如图所示:
图:SSE 传输通信流程
通信步骤如下:
- 客户端向服务器的 /sse 端点发送请求(一般是 GET 请求),建立 SSE 连接
- 服务器给客户端发送包含消息端点地址的事件消息
- 客户端给消息端点发送消息
- 服务器给客户端响应消息已接收状态码
- 服务器给双方建立的 SSE 连接推送事件消息
- 客户端从 SSE 连接读取服务器发送的事件消息
- 客户端关闭 SSE 连接
二、SSE 传输实现
参考 MCP 官方的 typescript-sdk 来看 SSE 传输机制是如何实现的:
- 启动服务器
以命令行的方式,启动服务器(一般是在远程服务器上运行),实际启动的是一个 HTTP 服务,通过监听端口的方式,对外提供 HTTP 接口。服务器定义与启动的主要实现逻辑如下:
const server = new McpServer({ name: "example-server", version: "1.0.0",});
const app = express();
app.get("/sse", async (_: Request, res: Response) => { const transport = new SSEServerTransport("/messages", res);
await server.connect(transport);});
app.post("/messages", async (req: Request, res: Response) => { await transport.handlePostMessage(req, res);});
app.listen(3001);在这个示例中,使用 express 框架,启动了一个 HTTP 服务,监听在 3001 端口,对外暴露了两个端点:
- /sse:GET 请求,用于建立 SSE 连接
- /messages:POST 请求,用于接收客户端发送的消息
服务器启动后,需要通过 DNS 解析,绑定一个可公开访问的域名,比如:example.com。
- 建立 SSE 连接
客户端请求服务器的 SSE 端点地址:https://example.com:3001/sse 与服务器建立双向通信连接,服务器在建立连接之后,给客户端发送一条事件消息,包含消息端点地址,实现逻辑如下:
res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive",});
const messagesUrl = "https://example.com:3001/messages?sessionId=xxx";
res.write(`event: endpoint\ndata: ${messagesUrl}\n\n`);客户端从服务器返回的 endpoint 事件中读取消息端点地址,与服务器建立 SSE 连接成功。
- 消息交互
客户端与服务器建立 SSE 连接之后,开始给消息端点地址发送消息。
MCP 中的 SSE 传输是双通道响应机制。也就是说,服务器在接收到客户端的请求消息之后,既要给当前的请求回复一条响应消息,也要给之前建立的 SSE 连接发送一条事件消息。(通知类型消息除外)
举个例子,客户端与服务器建立 SSE 连接之后,给服务器发送的第一条消息,用于初始化阶段做能力协商。
客户端给消息端点发送的请求消息示例:
curl -X POST https://example.com/messages?sessionId=xxx \-H "Content-Type: application/json" \-d '{ "jsonrpc": "2.0", "id": "1", "method": "initialize", "params": { "protocolVersion": "1.0", "capabilities": {}, "clientInfo": { "name": "mcp-client", "version": "1.0.0" } }}'服务器从 HTTP 请求体里面读取客户端发送的消息,在执行完内部逻辑之后,给客户端响应 202 状态码(无响应体),表示请求已收到。实现逻辑如下:
async handlePostMessage( req: IncomingMessage, res: ServerResponse, parsedBody?: unknown,): Promise<void> { if (!this._sseResponse) { const message = "SSE connection not established"; res.writeHead(500).end(message); throw new Error(message); }
let body: string | unknown; try { const ct = contentType.parse(req.headers["content-type"] ?? ""); if (ct.type !== "application/json") { throw new Error(`Unsupported content-type: ${ct}`); }
body = parsedBody ?? await getRawBody(req, { limit: MAXIMUM_MESSAGE_SIZE, encoding: ct.parameters.charset ?? "utf-8", }); } catch (error) { res.writeHead(400).end(String(error)); this.onerror?.(error as Error); return; }
try { await this.handleMessage(typeof body === 'string' ? JSON.parse(body) : body); } catch { res.writeHead(400).end(`Invalid message: ${body}`); return; }
res.writeHead(202).end("Accepted");}然后,服务器把响应给客户端的消息内容,通过之前建立的 SSE 连接,以事件消息的形式发送,消息内容使用 JSON-RPC 编码,实现逻辑如下:
async send(message: JSONRPCMessage): Promise<void> { if (!this._sseResponse) { throw new Error("Not connected"); }
this._sseResponse.write( `event: message\ndata: ${JSON.stringify(message)}\n\n`, );}客户端根据服务器同步响应的 202 状态码,判断服务器已经接到请求,开始从之前建立的 SSE 连接中读取服务器发送的响应内容。
客户端与服务器建立的 SSE 连接,应该是 1:1 的。为了防止串数据的问题,在建立 SSE 连接阶段,服务器发送的消息端点地址,应该为当前连接分配一个唯一标识,叫做
sessionId,并在消息端点地址带上这个标识,比如/messages?sessionId=xxx。在消息交互阶段,服务器根据客户端请求地址参数里面的 sessionId,找到之前建立的 SSE 连接,并只给这个 SSE 连接发送事件消息。
- 断开 SSE 连接
服务器与客户端双方都可能会主动断开 SSE 连接。
还保持连接的一方,应该加上必要的连接检测和超时关闭机制。比如通过 SSE 连接,给对方定时发送一条心跳检测消息,如果多次无响应,可以认作对方已断开连接,此时可以主动关闭 SSE 连接,避免资源泄露。
一个用 go 实现的心跳检测和超时关闭示例:
// Setup heartbeat tickerheartbeatInterval := 30 * time.SecondheartbeatTicker := time.NewTicker(heartbeatInterval)defer heartbeatTicker.Stop()
// Setup idle timeoutidleTimeout := 5 * time.MinuteidleTimer := time.NewTimer(idleTimeout)defer idleTimer.Stop()
go func() { for { select { case <-session.Done(): return case <-heartbeatTicker.C: // Send heartbeat if err := writer.SendHeartbeat(); err != nil { session.Close() return } case <-idleTimer.C: // Close connection due to inactivity session.Close() return } }}()三、SSE 传输的利弊与适用场景
SSE 传输适用于客户端与服务器不在同一个网络下的通信场景。比如,你想要通过对话客户端查询你云服务器上的数据库。可以在云服务器上部署一个 MCP 服务器,去读取数据库,再以 SSE URL 的方式在对话客户端配置使用。
当然,所有用 SSE 传输实现的 MCP 服务器,理论上都可以通过 stdio 传输 + API 的方式实现。区别在于:
- 用 SSE 传输,客户端直接与服务器通信,而不用通过本地的 stdio 传输调用 API 进行中转。
- 用 SSE 传输,只需要一个 URL 即可接入,对本地环境无要求,也无需在本地运行服务器,用户侧的使用门槛更低。
SSE 传输的主要优势:
- 支持远程资源访问,解决了 stdio 传输仅适用于本地资源的局限性
- 基于 HTTP 协议实现,兼容性好,便于与现有 Web 基础设施集成
- 服务器可作为独立进程运行,支持处理多个客户端连接
- 相比 WebSocket 实现简单,是 HTTP 的扩展,不需要协议升级
SSE 传输的主要劣势与问题:
- 复杂的双通道响应机制:SSE 传输要求服务器在接收客户端消息后,既要给当前请求响应,也要给之前建立的 SSE 连接发送事件消息,实现复杂
- 连接不稳定:在无服务器(serverless)环境中,SSE 连接会随机、频繁断开,不能提供可靠连接
- 不方便持久连接:无服务器架构通常自动扩缩容,不适合长时间连接,而 SSE 需要维持持久连接
- 需要大量会话管理:需要为每个 SSE 连接分配唯一标识(sessionId)来防止数据混淆,实现方需要实现会话管理,增加了实现复杂度
- 额外实现成本:需要实现心跳检测和超时机制来避免资源泄露
正因为这些问题,MCP 在新的协议(2025-03-26 版本)中引入了 Streamable HTTP 传输机制来替代 SSE,并计划废弃 SSE 传输。新的传输机制保留了 HTTP 的基础,但支持更灵活的连接方式,更适合现代云原生架构和无服务器环境。
2.4.3 Streamable HTTP 传输
在 Streamable HTTP 传输中,服务器必须提供一个同时支持 POST 和 GET 请求的 HTTP 端点。
一、Streamable HTTP 通信流程
基于 Streamable HTTP 传输的通信流程如图所示:
图:Streamable HTTP 传输通信流程
主要步骤如下:
- 客户端给服务器的通信端点发消息
- 服务器给客户端响应消息
- 客户端继续给服务器发消息
- 服务器继续响应客户端消息
跟 SSE 传输的双通道响应机制不同,Streamable HTTP 传输中,客户端与服务器的消息交互,基本上是“一来一回”的(单通道响应)。Streamable HTTP 传输中,客户端与服务器之间的消息交互可以是同步响应,也可以是异步响应(基于 SSE 连接);可以是有状态交互(需要通过 Mcp-Session-Id 保持会话),也可以是无状态交互(每次请求都是独立的)。
二、Streamable HTTP 传输实现
参考 MCP 官方的 typescript-sdk 来看 Streamable HTTP 传输机制是如何实现的:
- 启动服务器
跟 SSE 传输机制一样,Streamable HTTP 传输本质上也是基于 HTTP 通信,需要先通过命令行运行服务器,启动 HTTP 服务。服务器定义与启动的主要实现逻辑如下:
const server = new McpServer({ name: "example-server", version: "1.0.0",});
const app = express();
app.all("/mcp", async (req: Request, res: Response) => { const transport = new StreamableHTTPServerTransport();
await server.connect(transport);
await transport.handleMessage(req, res);});
app.listen(3002);在这个例子,使用 express 框架,启动了一个 HTTP 服务,监听在 3002 端口,对外暴露了一个端点:
- /mcp:接收客户端建立连接、交换消息的请求
解析一个可公开访问的域名到服务器。比如:example.com
跟 SSE 传输需要暴露两个端点(SSE 端点与消息端点)不同,Streamable HTTP 传输只需要暴露一个端点,来接收各种类型的客户端请求(GET / POST / DELETE)。
- 消息交互
在服务器启动成功之后,客户端可以直接给服务器的端点地址发消息,而无需先建立连接。
客户端可以通过 GET 或者 POST 方式给服务器发消息,每个请求必须设置请求头 Accept,传递以下两个值:
- application/json 接收服务器响应的 JSON-RPC 编码消息
- text/event-stream 由服务器开启流式传输通道,客户端从这个流里面读取事件消息
MCP 客户端请求示例:
curl -X POST https://example.com/mcp \-H "Content-Type: application/json" \-H "Accept: application/json, text/event-stream" \-d '{ "jsonrpc": "2.0", "id": "1", "method": "initialize", "params": { "protocolVersion": "1.0", "capabilities": {}, "clientInfo": { "name": "mcp-client", "version": "1.0.0" } }}'Streamable HTTP 传输机制下,客户端与服务器通信的几个要点:
- 客户端可以给服务器发送不包含请求体的 GET 请求,用于建立 SSE 连接;服务器可以主动给客户端先发消息
- 客户端给服务器发送 JSON-RPC 消息的情况,必须使用 POST 请求,并设置请求头
Accept为application/json, text/event-stream - 服务器接到客户端的 GET 请求时,要么返回
Contet-Type: text/event-stream开启 SSE 连接,要么返回 HTTP 405 状态码,表示不支持 SSE 连接。 - 服务器接到客户端的 POST 请求时,从请求体读取 JSON-RPC 消息,如果是通知消息,就响应 HTTP 202 状态码,表示消息已收到。如果是非通知消息,服务器可以选择返回
Content-Type: text/event-stream开启 SSE 传输,或者返回Content-Type: application/json同步响应一条 JSON-RPC 消息。
- 会话保持
Streamable HTTP 传输既支持无状态交互:每一次请求都是独立的,无需记录状态。也支持有状态交互:一次新的请求,可能需要同步之前的请求 / 响应信息作为参考。这种情况叫做:会话保持。
会话保持情况下,服务器与客户端之间的交互应该遵守以下原则:
- 使用 Streamable HTTP 传输的服务器可以在初始化时,通过
Mcp-Session-Id响应头返回会话 ID。 - 如果服务器在初始化时返回了
Mcp-Session-Id,客户端必须在所有后续请求中通过Mcp-Session-Id请求头带上会话 ID。 - 服务器可以随时终止会话,之后它必须使用 HTTP 404 Not Found 响应包含该会话 ID 的请求。
- 当客户端收到对包含
Mcp-Session-Id请求的 HTTP 404 响应时,它必须通过发送一个不带会话 ID 的新InitializeRequest来启动新会话。(可选) - 不再需要特定会话的客户端应该发送一个带有
Mcp-Session-Id请求头的 HTTP DELETE 请求到服务器,以显式终止会话。(可选,但推荐)
服务器验证会话的实现逻辑如下:
/** * Validates session ID for non-initialization requests * Returns true if the session is valid, false otherwise */private validateSession(req: IncomingMessage, res: ServerResponse): boolean { if (!this._initialized) { // If the server has not been initialized yet, reject all requests res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: Server not initialized" }, id: null })); return false; } if (this.sessionId === undefined) { // If the session ID is not set, the session management is disabled // and we don't need to validate the session ID return true; } const sessionId = req.headers["mcp-session-id"];
if (!sessionId) { // Non-initialization requests without a session ID should return 400 Bad Request res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: Mcp-Session-Id header is required" }, id: null })); return false; } else if (Array.isArray(sessionId)) { res.writeHead(400).end(JSON.stringify({ jsonrpc: "2.0", error: { code: -32000, message: "Bad Request: Mcp-Session-Id header must be a single value" }, id: null })); return false; } else if (sessionId !== this.sessionId) { // Reject requests with invalid session ID with 404 Not Found res.writeHead(404).end(JSON.stringify({ jsonrpc: "2.0", error: { code: -32001, message: "Session not found" }, id: null })); return false; }
return true;}- 连接断开与重连
Streamable HTTP 传输中,如果客户端与服务器使用 SSE 连接通信,断开连接的方式跟 SSE 传输断开连接的方式一致:可以由连接的任意一方主动断开连接,还保持着连接的一方,需要实现心跳检测和超时机制,以便能及时关闭连接,避免资源泄露。
Streamable HTTP 传输比起 SSE 传输,做了一些改进,支持恢复已中断的连接,重新发送可能丢失的消息。
- 服务器可以在其 SSE 事件中附加一个 ID 字段。如果存在,ID 必须在所有会话所有事件流中全局唯一。
- 如果客户端希望在断开连接后恢复,它应该向服务器发出 HTTP GET 请求,并包含 Last-Event-ID 请求头,告知服务器它接收到的最后一个事件 ID。服务器可以重放在最后一个事件 ID 之后将发送的消息,并从该点恢复事件流。
三、Streamable HTTP 传输的利弊与适用场景
Streamable HTTP 传输机制结合了 SSE 传输的远程访问能力和无状态 HTTP 的灵活性,同时解决了 SSE 传输中的许多问题。
主要优势:
- 兼容无服务器环境,可以在短连接模式下工作
- 灵活的连接模式,支持简单的请求-响应和流式传输
- 会话管理更加标准化和清晰
- 支持断开连接恢复和消息重传,比 SSE 传输更加可靠
- 保留了 SSE 的流式传输能力,同时解决了其稳定性问题
主要劣势:
- 状态判断过多,实现复杂度很高
- 处理连接断开和恢复的逻辑复杂,实现成本高
- 会话管理需要服务器引入额外的组件(比如用 Redis 来存储 Session)
Streamable HTTP 传输适用于:
- 需要远程访问服务的场景,特别是云原生和无服务器环境
- 需要支持流式输出的 AI 服务
- 需要服务器主动推送消息给客户端的场景
- 大规模部署需要高可靠性和可扩展性的服务
- 需要在不稳定网络环境中保持可靠通信的场景
与 SSE 传输相比,Streamable HTTP 传输是一个更全面、更灵活的解决方案,也是 MCP 在访问远程资源场景主推的传输机制。