Featured image of post 用Fastapi和React实现http流式响应

用Fastapi和React实现http流式响应

背景

随着ChatGPT等AI大语言模型的普及,人们对于流式响应也越来越理解和认可。本质上讲,流式响应就是服务器处理一点就返回一点,由于LLM处理起来相对慢一些(即使有海量的服务器资源),为了避免用户等待太长时间,就把数据逐步返回。

流式响应也经常用于传输大文件。今天我使用流式响应的场景是这样的:

我在服务器上运行一个python进程,其运行结果会逐行返回到React前端

技术选型

像我描述的场景,除了http流式响应,更加容易想到的是方案是websocket,不过websocket有点大材小用了,因为websocket可以建立完整是输入和输出通道,而我只需要输出就可以。

所以,经过比较,我决定使用FASTAPI作为后端,REACT作为前端来实现这个场景。

后端

首先FASTAPI是支持完整的RESTful API的,并且自带SWAGGER OPENAPI接口文档。

要实现后端的一个api接口endpoint只需引入对应的注解即可


from fastapi.responses import StreamingResponse

@router.post("/{id}/run", response_model=CodeCardPublic)
async def run_me(
    *,
    session: SessionDep,
    current_user: CurrentUser,
    book_id: int,
    id: int,
    item_in: CodeCardPublic,
) -> Any:
    """
    run a code card
    """
    book, card = validate_book_and_card(
        session=session, current_user=current_user, book_id=book_id, card_id=id
    )
    update_dict = item_in.model_dump(exclude_unset=True)
    card.sqlmodel_update(update_dict)
    session.add(card)
    session.commit()
    session.refresh(card)

    return StreamingResponse(
        content=run_python_script(item_in.code, book_id, item_in.id, current_user.id),
    )

这里有两个要点

  1. 要使用"StreamingResponse"作为返回值,而不是普通的Response
  2. “StreamingResponse"的第一个参数必须是一个generator函数
# 参考
def fake_run_python_script():
  for i in range(100):
    yield f"this is {i}"

这样后端就完成了,现在来看看前端,前端有点小trick

前端

StreamingResponse到达前端后,需要对response的body进行流式读区。一开始我使用axios,但是无法实现流式读区,放狗搜了一圈,发现得用fetch,下面是fetch的实现方式:

fetch(url, {
	method: options.method,
	headers: { ...headers, 'Accept': 'text/plain' },
	body: JSON.stringify(body),
}).then(response => {
	resolve(response)
}).catch(err => onFailure(err))

这里整个response都会返回,然后就可以读区body

CodeCardService.executeCodeCardStream({
            "book_id": book_id,
            "card_id": card_id,
            "requestBody": { ...card, code: code }
        }).then(rsp => {
            setOutput("");
            if (rsp.body == null) {
                setOutput("System Error!")
                setIsRunning(false);
                // props.onUpdateCodeCard({ card: card, output: output })
                return
            }
            const reader = rsp.body?.getReader()
            const processStream = async () => {
                while (true && reader != null) {
                    const { done, value } = await reader.read();

                    if (done) {
                        console.log('Stream finished');
                        setOutput(output => output + "\n\n");
                        
                        setIsRunning(false);
                        // props.onUpdateCodeCard({ card: card, output: output })
                        break;
                    }

                    const decoded_value = decoder.decode(value)
                    // Update state to render byte
                    setOutput(output => output + decoded_value);
                }
            };
            processStream();
        }).catch(e => {
            console.debug(e); 
            setIsRunning(false);
        }
        )

重点是Line13打开response body,Line 14是一个异步函数不停地读取输入流里的数据,直到读完。

总结

上述方法就描述的了用restful api实现http流式请求和响应的处理

By 大可出奇迹