本文对 deepchat 开源项目的源码进行学习分析,研究其 MCP 和基础 Agent 的实现原理
MCP Client 和 Server 交互机制
1. 架构概览
在这个 Electron 项目中,MCP 的交互采用典型的客户端-服务器架构:
- Client 端:运行在 Electron 主进程中的
McpClient类
- Server 端:分为两种类型
- 内存服务器 (In-Memory):内置在应用中的 MCP 服务器实现
- 外部服务器:通过
stdio、SSE或HTTP与外部 MCP 服务器通信
2. 传输层 (Transport Layer)
// 支持四种传输方式 if (this.serverConfig.type === 'inmemory') { // 内存传输:直接在同一进程中通信 const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair() this.transport = clientTransport } else if (this.serverConfig.type === 'stdio') { // 标准输入输出:通过子进程通信 this.transport = new StdioClientTransport({ command, args, env }) } else if (this.serverConfig.type === 'sse') { // Server-Sent Events:HTTP长连接 this.transport = new SSEClientTransport(new URL(baseUrl)) } else if (this.serverConfig.type === 'http') { // HTTP:基于HTTP协议的流式传输 this.transport = new StreamableHTTPClientTransport(new URL(baseUrl)) }
2.1 SSE
SSE 是一种基于 HTTP 的单向通信协议,允许服务器主动向客户端推送数据。它的核心特点是:
- 单向通信:服务器 → 客户端
- 基于 HTTP:复用现有的 HTTP 连接
- 自动重连:客户端自动处理断线重连
- 事件驱动:支持命名事件和数据传输
sequenceDiagram participant Client as 前端客户端 participant Server as 后端服务器 Note over Client, Server: SSE 基本交互流程 Client->>Server: EventSource('/api/sse') Note right of Client: Accept: text/event-stream Server->>Client: HTTP 200 + SSE Headers Note left of Server: Content-Type: text/event-stream<br/>Connection: keep-alive Server->>Client: data: {"type":"connected"}\n\n Note over Client: onopen 事件 loop 实时数据推送 Server->>Client: data: {"content":"Hello"}\n\n Note over Client: onmessage 处理数据 Server->>Client: data: {"content":" World"}\n\n Note over Client: 累积显示内容 end Server->>Client: data: [DONE]\n\n Note over Client: 传输完成 alt 连接关闭 Client->>Server: eventSource.close() end
3. 通信协议流程
3.1 连接建立
async connect(): Promise<void> { // 1. 创建 MCP 客户端 this.client = new Client( { name: 'DeepChat', version: app.getVersion() }, { capabilities: { resources: {}, tools: {}, prompts: {} } } ) // 2. 连接到服务器 await this.client.connect(this.transport) // 3. 发出状态变更事件 eventBus.emit(MCP_EVENTS.SERVER_STATUS_CHANGED, { name: this.serverName, status: 'running' }) }
3.2 服务器端实现 (以 ArtifactsServer 为例)
export class ArtifactsServer { private server: Server constructor() { // 创建服务器实例 this.server = new Server( { name: 'artifacts-server', version: '0.1.0' }, { capabilities: { tools: {} } } ) this.setupRequestHandlers() } public startServer(transport: Transport): void { this.server.connect(transport) } private setupRequestHandlers(): void { // 处理工具列表请求 this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [/* 工具定义 */] })) // 处理工具调用请求 this.server.setRequestHandler(CallToolRequestSchema, async (request) => { // 执行具体的工具逻辑 return { content: [/* 结果 */] } }) } }
- 初始化阶段:
McpPresenter.initialize() → ServerManager.testNpmRegistrySpeed() → 启动默认服务器 → 发出 MCP_EVENTS.INITIALIZED
- 工具调用流程:
LLM Provider 调用 → McpPresenter.callTool() → ToolManager.callTool() → McpClient.callTool() → MCP Server 处理 → 返回结果 → 发出 TOOL_CALL_RESULT 事件
- 配置管理:
用户修改配置 → McpConfHelper → 发出 CONFIG_CHANGED 事件 → McpPresenter 重启相关服务器
4. 内存服务器的特殊机制
对于内置的内存服务器(如 Artifacts、文件系统等),使用
InMemoryTransport 实现零开销的进程内通信:export function getInMemoryServer(serverName: string, args: string[], env?: Record<string, unknown>) { switch (serverName) { case 'Artifacts': return new ArtifactsServer() case 'buildInFileSystem': return new FileSystemServer(args) // ... 其他内置服务器 } }
这种架构设计实现了:
- 灵活的传输方式:支持多种通信协议
- 统一的接口:所有 MCP 服务器都遵循相同的协议
- 事件驱动:组件间松耦合,通过事件进行通信
- 生命周期管理:自动化的服务器启停和状态管理
- 权限控制:基于配置的工具调用权限管理
大语言模型调用关键函数流程解析
以 用户问"deepseek 公司的发展历程"为例子
1. 初始化阶段 (第272-310行)
async *startStreamCompletion( providerId: string, initialMessages: ChatMessage[], modelId: string, eventId: string, temperature: number = 0.6, maxTokens: number = 4096 ): AsyncGenerator<LLMAgentEvent, void, unknown>
示例场景:用户问"deepseek 公司的发展历程"
providerId: "deepseek"
initialMessages:[{ role: "user", content: "deepseek 公司的发展历程" }]
modelId: "deepseek-chat"
eventId: "chat_12345"
初始化操作:
- 检查并发流限制
- 获取 DeepSeek Provider 实例
- 创建 AbortController 用于取消控制
- 初始化对话历史记录
conversationMessages
- 设置工具调用计数器和使用统计
2. Agent Loop 主循环 (第319-855行)
这是核心的
while (needContinueConversation) 循环:2.1 第一轮循环 - LLM 调用
// 获取所有可用的 MCP 工具 const mcpTools = await presenter.mcpPresenter.getAllToolDefinitions() // 调用 Provider 的核心流式方法 const stream = provider.coreStream( conversationMessages, modelId, modelConfig, temperature, maxTokens, mcpTools )
对于"deepseek 公司的发展历程"查询:
- DeepSeek Provider 会调用 DeepSeek API
- 传入用户问题和可用工具列表
- LLM 可能决定是否需要使用工具(如搜索工具)
你具备调用外部工具的能力来协助解决用户的问题 ==== 可用的工具列表定义在 <tool_list> 标签中: <tool_list> <tool name="web_search" description="搜索网络内容,获取最新信息"> <parameter name="query" required="true" description="搜索查询词" type="string"></parameter> <parameter name="num_results" description="返回结果数量" type="number"></parameter> </tool> </tool_list> 当你判断调用工具是**解决用户问题的唯一或最佳方式**时,**必须**严格遵循以下流程进行回复。 ... 用户指令如下: deepseek 公司的发展历程
2.2 处理标准化流事件 (第362-470行)
Provider 返回标准化的
LLMCoreStreamEvent,主循环处理各种事件类型:文本内容事件:
case 'text': if (chunk.content) { currentContent += chunk.content yield { type: 'response', data: { eventId, content: chunk.content } } } break
工具调用事件:
case 'tool_call_start': // 工具调用开始 case 'tool_call_chunk': // 流式接收工具参数 case 'tool_call_end': // 工具调用完成,收集完整参数
可能的场景:
如果系统配置了搜索工具,LLM 可能会说:"我需要搜索一下 deepseek 公司的最新信息",然后调用
web_search 工具。2.3 工具执行阶段 (第550-820行)
如果检测到工具调用,进入工具执行逻辑:
if (needContinueConversation && currentToolCalls.length > 0) { for (const toolCall of currentToolCalls) { // 查找工具定义 const toolDef = (await presenter.mcpPresenter.getAllToolDefinitions()).find( (t) => t.function.name === toolCall.name ) // 执行工具 const toolResponse = await presenter.mcpPresenter.callTool(mcpToolInput) // 将工具结果添加到对话历史 } }
示例工具调用:
{ "id": "call_123", "name": "web_search", "arguments": "{\\"query\\": \\"deepseek 公司发展历程 融资 产品\\"}" }
3. 根据模型支持的函数调用方式处理对话历史
3.1 原生函数调用支持 (第640-670行)
if (supportsFunctionCall) { // 添加工具调用到 assistant 消息 lastAssistantMsg.tool_calls.push({ function: { arguments: toolCall.arguments, name: toolCall.name }, id: toolCall.id, type: 'function' }) // 添加工具响应消息 conversationMessages.push({ role: 'tool', content: toolResponse.content, tool_call_id: toolCall.id }) }
3.2 非原生函数调用 (第672-730行)
大模型会以下面格式返回
<function_call> { "function_call": { "name": "web_search", "arguments": { "query": "deepseek 公司发展历程 融资 产品", "num_results": 5 } } } </function_call>
然后调用返回结果,并将调用信息附加到 assistant 消息
else { // 将工具调用信息附加到 assistant 消息 const toolCallInfo = `\\n<function_call> { "function_call": ${JSON.stringify({ id: toolCall.id, name: toolCall.name, arguments: toolCall.arguments })} } </function_call>\\n` // 创建包含工具响应的用户消息 const toolResponseContent = '以下是刚刚执行的工具调用响应,请根据响应内容更新你的回答:\\n' + JSON.stringify(toolResponse) }
4. 第二轮循环 - 基于工具结果的回答
工具执行完成后,
needContinueConversation = true,进入下一轮循环:对话历史现在包含:
- 用户问题:"deepseek 公司的发展历程"
- Assistant 的工具调用记录
- 工具搜索结果(包含 deepseek 公司的实际信息)
LLM 现在基于搜索结果生成最终回答。
5. 完成和清理阶段 (第830-855行)
finally { const userStop = abortController.signal.aborted if (!userStop) { // 发送最终使用统计 yield { type: 'response', data: { eventId, totalUsage } } } // 发送结束事件 yield { type: 'end', data: { eventId, userStop } } // 清理活跃流 this.activeStreams.delete(eventId) }
完整流程示例
对于"deepseek 公司的发展历程"查询:
- 用户输入 →
conversationMessages = [{ role: "user", content: "deepseek 公司的发展历程" }]
- 第一轮 LLM 调用 → LLM 识别需要搜索最新信息,调用
web_search工具
- 工具执行 → 搜索引擎返回 deepseek 公司的发展历程信息
- 更新对话历史 → 添加工具调用和搜索结果到对话记录
- 第二轮 LLM 调用 → LLM 基于搜索结果生成完整的回答
- 流式输出 → 用户看到:"DeepSeek 公司成立于2023年,是一家专注于AI大模型的公司..."
- 完成清理 → 发送使用统计和结束事件
这个架构的优势在于:
- 模块化设计:Agent 循环与具体 Provider 解耦
- 标准化事件:所有 Provider 都输出相同格式的事件
- 工具集成:无缝支持多种 MCP 工具
- 错误处理:完善的异常和中断处理机制
- 实时反馈:通过流式输出提供实时用户体验
flowchart TD A[用户输入:<br/>deepseek 公司的发展历程] --> B[初始化 Agent Loop] B --> C[获取可用工具列表] C --> D[调用 LLM API<br/>传入消息 + 工具定义] D --> E{LLM 响应类型} E -->|纯文本回答| F[流式输出文本内容] E -->|工具调用| G[解析工具调用参数] F --> END[结束会话] G --> H[执行 MCP 工具<br/>例:web_search 搜索 deepseek] H --> I[获取工具执行结果] I --> J[将工具结果添加到对话历史] J --> K[继续 Agent Loop<br/>LLM 基于工具结果回答] K --> L[LLM 生成最终回答] L --> M[流式输出最终内容] M --> END style A fill:#e3f2fd style END fill:#c8e6c9 style H fill:#fff3e0 style D fill:#f3e5f5 style K fill:#f3e5f5