背景
随着ChatGPT等AI大语言模型的普及,人们对于流式响应也越来越理解和认可。本质上讲,流式响应就是服务器处理一点就返回一点,由于LLM处理起来相对慢一些(即使有海量的服务器资源),为了避免用户等待太长时间,就把数据逐步返回。
流式响应也经常用于传输大文件。今天我使用流式响应的场景是这样的:
我在服务器上运行一个python进程,其运行结果会逐行返回到React前端
技术选型
像我描述的场景,除了http流式响应,更加容易想到的是方案是websocket,不过websocket有点大材小用了,因为websocket可以建立完整是输入和输出通道,而我只需要输出就可以。
所以,经过比较,我决定使用FASTAPI作为后端,REACT作为前端来实现这个场景。
后端
首先FASTAPI是支持完整的RESTful API的,并且自带SWAGGER OPENAPI接口文档。
要实现后端的一个api接口endpoint只需引入对应的注解即可
from fastapi.responses import StreamingResponse
@router.post("/{id}/run", response_model=CodeCardPublic)
async def run_me(
*,
session: SessionDep,
current_user: CurrentUser,
book_id: int,
id: int,
item_in: CodeCardPublic,
) -> Any:
"""
run a code card
"""
book, card = validate_book_and_card(
session=session, current_user=current_user, book_id=book_id, card_id=id
)
update_dict = item_in.model_dump(exclude_unset=True)
card.sqlmodel_update(update_dict)
session.add(card)
session.commit()
session.refresh(card)
return StreamingResponse(
content=run_python_script(item_in.code, book_id, item_in.id, current_user.id),
)
这里有两个要点
- 要使用"StreamingResponse"作为返回值,而不是普通的Response
- “StreamingResponse"的第一个参数必须是一个generator函数
# 参考
def fake_run_python_script():
for i in range(100):
yield f"this is {i}"
这样后端就完成了,现在来看看前端,前端有点小trick
前端
StreamingResponse到达前端后,需要对response的body进行流式读区。一开始我使用axios,但是无法实现流式读区,放狗搜了一圈,发现得用fetch,下面是fetch的实现方式:
fetch(url, {
method: options.method,
headers: { ...headers, 'Accept': 'text/plain' },
body: JSON.stringify(body),
}).then(response => {
resolve(response)
}).catch(err => onFailure(err))
这里整个response都会返回,然后就可以读区body
CodeCardService.executeCodeCardStream({
"book_id": book_id,
"card_id": card_id,
"requestBody": { ...card, code: code }
}).then(rsp => {
setOutput("");
if (rsp.body == null) {
setOutput("System Error!")
setIsRunning(false);
// props.onUpdateCodeCard({ card: card, output: output })
return
}
const reader = rsp.body?.getReader()
const processStream = async () => {
while (true && reader != null) {
const { done, value } = await reader.read();
if (done) {
console.log('Stream finished');
setOutput(output => output + "\n\n");
setIsRunning(false);
// props.onUpdateCodeCard({ card: card, output: output })
break;
}
const decoded_value = decoder.decode(value)
// Update state to render byte
setOutput(output => output + decoded_value);
}
};
processStream();
}).catch(e => {
console.debug(e);
setIsRunning(false);
}
)
重点是Line13打开response body,Line 14是一个异步函数不停地读取输入流里的数据,直到读完。
总结
上述方法就描述的了用restful api实现http流式请求和响应的处理