Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

4.11. SSE

SSE 协议格式



协议字段之间使用\r\n 分割,数据结尾处使用两个\r\n。

event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n

event: 表示事件,message和error,对应前端会分别触发onmessage或onerror事件。
retry: 重试时间,让客户端在retry时间后进行重试,单位是毫秒。
data: 具体的数据。

		
pip install sse_starlette		
		
		

服务器端

		
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import uvicorn

app = FastAPI()


@app.get("/")
async def home():
    return {"message": "Hello World"}


@app.get("/sse")
async def sse(request: Request):
    async def ServerSendEvents(request: Request):
        books = ["Netkiller Linux 手札", "Netkiller MySQL 手札", "Netkiller Python 手札", "Netkiller Spring 手札", "Netkiller Java 手札", "Netkiller FreeBSD 手札", "Netkiller Network 手札", "Netkiller Blockchain 手札"]
        for book in books:
            if await request.is_disconnected():
                print("连接已中断")
                break
            yield {"event": "message", "retry": 15000, "data": book}

            await asyncio.sleep(0.5)

    g = ServerSendEvents(request)
    return EventSourceResponse(g)


if __name__ == "__main__":
    try:
        uvicorn.run(app=app, host="0.0.0.0", port=8080, log_level="info")
    except KeyboardInterrupt:
        print("Crtl+C Pressed. Shutting down.")		
		
		

客户端

		
#!/usr/bin/python
# -*-coding:utf-8-*-
import requests


def test():
    url = r"http://127.0.0.1:8080/sse"
    headers = {"Content-Type": "text/event-stream"}
    response = requests.get(url, headers=headers, stream=True)
    for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
        print(chunk)


if __name__ == "__main__":
    test()