Skip to main content

流式消息

SSE

SSE(Server-Sent Event) 是一种浏览器原生支持的、单向的流式通信协议。SSE 基于 HTTP 协议,使用 MIME 类型 text/event-stream,消息由服务器推送到客户端。

核心特性

  1. 单向通信:只能从服务端推送到客户端,不能反向。
  2. 基于 HTTP/1.x:无需额外协议,浏览器天然支持。
  3. 保持连接、实时推送:服务端主动发送数据,客户端自动接收。

使用场景

  • 流式大模型输出。
  • 实时日志、监控信息推送。
  • 股票/天气/订单状态实时更新。
  • 打字机式的问答机器人。

前后端示例

后端(Node.js + Express):

import express from 'express';
const app = express();

app.get('/api/sse', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');

// 心跳,防止中间层因空闲断开(可选)
const heartbeat = setInterval(() => {
res.write(': ping\n\n');
}, 15000);

let i = 0;
const timer = setInterval(() => {
i += 1;
const payload = { tick: i, time: new Date().toISOString() };
res.write(`data: ${JSON.stringify(payload)}\n\n`);

if (i >= 5) {
res.write('event: end\n');
res.write('data: bye\n\n');
clearInterval(timer);
clearInterval(heartbeat);
res.end();
}
}, 1000);

req.on('close', () => {
clearInterval(timer);
clearInterval(heartbeat);
res.end();
});
});

app.listen(3000, () => {
console.log('SSE: http://localhost:3000/api/sse');
});

前端(浏览器 EventSource):

const es = new EventSource('/api/sse');

es.onmessage = (evt) => {
const data = JSON.parse(evt.data);
console.log('message:', data);
};

es.addEventListener('end', () => {
console.log('server ends stream');
es.close();
});

es.onerror = (err) => {
console.error('sse error:', err);
es.close();
};

流式处理

在流式请求中(如 Ollama、OpenAI 的 stream: true 模式),响应体不是一次性完整返回,而是按块分批返回

流式模式下,大模型的响应,不会一次性响应所有内容:

"你"
"好"
","
"请问"
"需要"
"帮助吗?"

这些字符被拆成若干个 块(chunk),逐个返回。

假设开启流式模式:

const response = await fetch("/api/generate", {
method: "POST",
body: JSON.stringify({ prompt: "你是谁?", stream: true }),
});

response.body 是一个 ReadableStream 可读流,可读流有一个 getReader 的方法,可以拿到一个 Reader 对象。

const reader = response.body.getReader();

Reader 对象上面有一个 read 方法,可以读取每一个块。

while (true) {
const { done, value } = await reader.read();
if (done) break;
}
  • value: 一个 Uint8Array,代表当前的字节数据块(二进制)
  • done: 是否已经读完(true 表示所有数据已读取完毕)。

但读取到的是二进制数据,因此需要去解码,因此需要创建一个 UTF-8 解码器,将 reader.read() 拿到的二进制 Uint8Array 解码为字符串。

const decoder = new TextDecoder("utf-8");

TextDecoder 会自动处理中文、emoji 这类多字节字符。

const chunk = decoder.decode(value, { stream: true });

例如其中一个 value 解码后结果是:

chunk = '{"response":"你好"}\n{"response":","}\n'

这里设置 stream: true 的作用是:保留未完整字符到下一次解码继续拼接。

比如如果只读到 {"response":"你,不会报错,而是等待下次补上 好"} 再组合起来。

假设将 stream 设置为 false,遇到以下情况会出现问题:

Uint8Array.from([123, 34, 114, 101, 115, 112, 111, 110, 115, 101, 34, 58, 34, 228])

这是部分字符 "你"utf-8,但还没完整;stream: false 会报错(因为字符不完整);stream: true 会“记住”这个未完整字符,等下一次拼接回来。