admin管理员组文章数量:1130349
一、创建fastapi的项目
不会可以参考:快速创建一个FastAPI项目(精简)
二、一个简易循环发送
1、main.py代码如下
app=FastAPI()
@app.get("/",response_class=StreamingResponse)
async def start():
return await send_sse()
app.include_router(sse.router, prefix="/v1")
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
2、sse.py
async def event_generator():
for i in range(5):
# 标准 SSE 格式:data: 内容\n\n
yield f"data: {{\"message\": \"Data chunk {i}\"}}\n\n".encode()
await asyncio.sleep(1) # 异步非阻塞延迟
# 返回体头
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
@router.post("/completions")
async def send_sse():
return StreamingResponse(event_generator(), headers=headers)
解释:整个代码很容易理解,调用send_see函数,其实工作中复杂的点在于,你是每一步都在发送。比如现在流行的fastgpt、dify这些基于工作流构建应用,每次返回都是节点信息,将每个节点的结果通过stream返回。
问题
1、如何动态实现发送?
2、return后我还有其他操作怎么办?
解决思路
题外话:解决问题的思路,才是你的成长,一味地依靠AI,永远在原地踏步。AI只是辅助,思路才是你的价值
问题一
因为只要return之后,只能通过event_generator函数进行操作,所以要让event_generator这个函数的循环是可控,因为外部无法直接调用event_generator。所以可以采用一个队列实现。
1、为什么是队列而不是集合?
因为要采用队列的先进先出的思想,保证数据的先后顺序。
2、是否队列为空,整个循环就结束呢?
不是,因为每个节点执行会有时间差,甚至说处理的比较慢从而导致,数据还没进队列,整个循环就已经结束了。
3、循环结束的节点怎么做?
根据业务来看,因为发送事件信息是有个event鉴别数据的类型,可以通过这个确定最后一个事件是什么从而结束整个循环。如果无法确定,可以设置具体的超时时间比如10s。
下面是具体的代码实现,以3s过期时间为例子。
async def event_generator(messags:deque):
timeout_seconds = 3 # 从props获取超时时间,默认3秒
last_data_time = asyncio.get_running_loop().time()
start = True
while start:
# 标准 SSE 格式:data: 内容\n\n
if messags:
message = messags.popleft()
yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
last_data_time = asyncio.get_running_loop().time() # 重置计时器
else:
# 检查是否超时
current_time = asyncio.get_running_loop().time()
if current_time - last_data_time > timeout_seconds:
logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds)
break
# 无数据时发送心跳,避免客户端断开连接
await asyncio.sleep(0.5) # 降低 CPU 使用率
# 返回体头
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
messags = deque()
message=0
@router.post("/completions/send")
async def send_sse():
return StreamingResponse(event_generator(messags), headers=headers)
@router.get("/completions/addDeque")
async def send_sse(): # 定义发送信息,每次对message+1操作
global message
message=message+1
messags.append(message)
return "ok"
效果图:
问题二
流式的想法,是每次调用把结果给到前端。那么问题是我们写这块代码是个很长的模块,在中间会进行流式输入,如果不return,所有的信息全部进了队列,最后return其实是一个一次性返回,跟流式的理念相违背。那么如何去做,这里可以采用携程去实现这个功能。
我们可以把自己的代码块逻辑丢到携程让携程去做。整个思想逻辑,是用队列的延展性实现流式的输出。所以我们只需要保证,在发送的时候把数据给到队列就行。代码如下:
async def event_generator(messages:deque):
timeout_seconds = 3 # 从props获取超时时间,默认3秒
last_data_time = asyncio.get_running_loop().time()
start = True
while start:
# 标准 SSE 格式:data: 内容\n\n
if messages:
message = messages.popleft()
yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
last_data_time = asyncio.get_running_loop().time() # 重置计时器
else:
# 检查是否超时
current_time = asyncio.get_running_loop().time()
if current_time - last_data_time > timeout_seconds:
logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds)
break
# 无数据时发送心跳,避免客户端断开连接
await asyncio.sleep(0.5) # 降低 CPU 使用率
# 返回体头
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
async def do_something(messages):
# 操作一
message1="节点1开始"
messages.append(message1)
#模拟每次操作耗时
await asyncio.sleep(0.5)
message2="节点1answer"
messages.append(message2)
#模拟每次操作耗时
await asyncio.sleep(0.5)
message2="节点1结束"
messages.append(message2)
#模拟每次操作耗时
await asyncio.sleep(0.5)
@router.post("/completions/send")
async def send_sse():
messages = deque()
asyncio.create_task(do_something(messages))
return StreamingResponse(event_generator(messages), headers=headers)
注:messages是局部变量,这样可以保证,线程安全。
结果如下:
一、创建fastapi的项目
不会可以参考:快速创建一个FastAPI项目(精简)
二、一个简易循环发送
1、main.py代码如下
app=FastAPI()
@app.get("/",response_class=StreamingResponse)
async def start():
return await send_sse()
app.include_router(sse.router, prefix="/v1")
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
2、sse.py
async def event_generator():
for i in range(5):
# 标准 SSE 格式:data: 内容\n\n
yield f"data: {{\"message\": \"Data chunk {i}\"}}\n\n".encode()
await asyncio.sleep(1) # 异步非阻塞延迟
# 返回体头
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
@router.post("/completions")
async def send_sse():
return StreamingResponse(event_generator(), headers=headers)
解释:整个代码很容易理解,调用send_see函数,其实工作中复杂的点在于,你是每一步都在发送。比如现在流行的fastgpt、dify这些基于工作流构建应用,每次返回都是节点信息,将每个节点的结果通过stream返回。
问题
1、如何动态实现发送?
2、return后我还有其他操作怎么办?
解决思路
题外话:解决问题的思路,才是你的成长,一味地依靠AI,永远在原地踏步。AI只是辅助,思路才是你的价值
问题一
因为只要return之后,只能通过event_generator函数进行操作,所以要让event_generator这个函数的循环是可控,因为外部无法直接调用event_generator。所以可以采用一个队列实现。
1、为什么是队列而不是集合?
因为要采用队列的先进先出的思想,保证数据的先后顺序。
2、是否队列为空,整个循环就结束呢?
不是,因为每个节点执行会有时间差,甚至说处理的比较慢从而导致,数据还没进队列,整个循环就已经结束了。
3、循环结束的节点怎么做?
根据业务来看,因为发送事件信息是有个event鉴别数据的类型,可以通过这个确定最后一个事件是什么从而结束整个循环。如果无法确定,可以设置具体的超时时间比如10s。
下面是具体的代码实现,以3s过期时间为例子。
async def event_generator(messags:deque):
timeout_seconds = 3 # 从props获取超时时间,默认3秒
last_data_time = asyncio.get_running_loop().time()
start = True
while start:
# 标准 SSE 格式:data: 内容\n\n
if messags:
message = messags.popleft()
yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
last_data_time = asyncio.get_running_loop().time() # 重置计时器
else:
# 检查是否超时
current_time = asyncio.get_running_loop().time()
if current_time - last_data_time > timeout_seconds:
logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds)
break
# 无数据时发送心跳,避免客户端断开连接
await asyncio.sleep(0.5) # 降低 CPU 使用率
# 返回体头
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
messags = deque()
message=0
@router.post("/completions/send")
async def send_sse():
return StreamingResponse(event_generator(messags), headers=headers)
@router.get("/completions/addDeque")
async def send_sse(): # 定义发送信息,每次对message+1操作
global message
message=message+1
messags.append(message)
return "ok"
效果图:
问题二
流式的想法,是每次调用把结果给到前端。那么问题是我们写这块代码是个很长的模块,在中间会进行流式输入,如果不return,所有的信息全部进了队列,最后return其实是一个一次性返回,跟流式的理念相违背。那么如何去做,这里可以采用携程去实现这个功能。
我们可以把自己的代码块逻辑丢到携程让携程去做。整个思想逻辑,是用队列的延展性实现流式的输出。所以我们只需要保证,在发送的时候把数据给到队列就行。代码如下:
async def event_generator(messages:deque):
timeout_seconds = 3 # 从props获取超时时间,默认3秒
last_data_time = asyncio.get_running_loop().time()
start = True
while start:
# 标准 SSE 格式:data: 内容\n\n
if messages:
message = messages.popleft()
yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
last_data_time = asyncio.get_running_loop().time() # 重置计时器
else:
# 检查是否超时
current_time = asyncio.get_running_loop().time()
if current_time - last_data_time > timeout_seconds:
logging.info("消息列表已空 {} 秒,结束任务",timeout_seconds)
break
# 无数据时发送心跳,避免客户端断开连接
await asyncio.sleep(0.5) # 降低 CPU 使用率
# 返回体头
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
async def do_something(messages):
# 操作一
message1="节点1开始"
messages.append(message1)
#模拟每次操作耗时
await asyncio.sleep(0.5)
message2="节点1answer"
messages.append(message2)
#模拟每次操作耗时
await asyncio.sleep(0.5)
message2="节点1结束"
messages.append(message2)
#模拟每次操作耗时
await asyncio.sleep(0.5)
@router.post("/completions/send")
async def send_sse():
messages = deque()
asyncio.create_task(do_something(messages))
return StreamingResponse(event_generator(messages), headers=headers)
注:messages是局部变量,这样可以保证,线程安全。
结果如下:
版权声明:本文标题:python如何实现流式输出 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://it.en369.cn/jiaocheng/1754772040a2726311.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。


发表评论