知乎专栏 |
SSE 协议格式
协议字段之间使用\r\n 分割,数据结尾处使用两个\r\n。
event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n
event: 表示事件,message和error,对应前端会分别触发onmessage或onerror事件。
retry: 重试时间,让客户端在retry时间后进行重试,单位是毫秒。
data: 具体的数据。
pip install sse_starlette
服务器端
from fastapi import FastAPI, Request from sse_starlette.sse import EventSourceResponse import asyncio import uvicorn app = FastAPI() @app.get("/") async def home(): return {"message": "Hello World"} @app.get("/sse") async def sse(request: Request): async def ServerSendEvents(request: Request): books = ["Netkiller Linux 手札", "Netkiller MySQL 手札", "Netkiller Python 手札", "Netkiller Spring 手札", "Netkiller Java 手札", "Netkiller FreeBSD 手札", "Netkiller Network 手札", "Netkiller Blockchain 手札"] for book in books: if await request.is_disconnected(): print("连接已中断") break yield {"event": "message", "retry": 15000, "data": book} await asyncio.sleep(0.5) g = ServerSendEvents(request) return EventSourceResponse(g) if __name__ == "__main__": try: uvicorn.run(app=app, host="0.0.0.0", port=8080, log_level="info") except KeyboardInterrupt: print("Crtl+C Pressed. Shutting down.")
客户端
#!/usr/bin/python # -*-coding:utf-8-*- import requests def test(): url = r"http://127.0.0.1:8080/sse" headers = {"Content-Type": "text/event-stream"} response = requests.get(url, headers=headers, stream=True) for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): print(chunk) if __name__ == "__main__": test()