| 知乎专栏 |
SSE 协议格式
协议字段之间使用\r\n 分割,数据结尾处使用两个\r\n。
event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n
event: 表示事件,message和error,对应前端会分别触发onmessage或onerror事件。
retry: 重试时间,让客户端在retry时间后进行重试,单位是毫秒。
data: 具体的数据。
pip install sse_starlette
服务器端
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import uvicorn
app = FastAPI()
@app.get("/")
async def home():
return {"message": "Hello World"}
@app.get("/sse")
async def sse(request: Request):
async def ServerSendEvents(request: Request):
books = ["Netkiller Linux 手札", "Netkiller MySQL 手札", "Netkiller Python 手札", "Netkiller Spring 手札", "Netkiller Java 手札", "Netkiller FreeBSD 手札", "Netkiller Network 手札", "Netkiller Blockchain 手札"]
for book in books:
if await request.is_disconnected():
print("连接已中断")
break
yield {"event": "message", "retry": 15000, "data": book}
await asyncio.sleep(0.5)
g = ServerSendEvents(request)
return EventSourceResponse(g)
if __name__ == "__main__":
try:
uvicorn.run(app=app, host="0.0.0.0", port=8080, log_level="info")
except KeyboardInterrupt:
print("Crtl+C Pressed. Shutting down.")
客户端
#!/usr/bin/python
# -*-coding:utf-8-*-
import requests
def test():
url = r"http://127.0.0.1:8080/sse"
headers = {"Content-Type": "text/event-stream"}
response = requests.get(url, headers=headers, stream=True)
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
print(chunk)
if __name__ == "__main__":
test()