知乎专栏 |
import asyncore, socket class AsyncoreServerUDP(asyncore.dispatcher): def __init__(self): asyncore.dispatcher.__init__(self) # Bind to port 5005 on all interfaces self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.bind(('', 5005)) # Even though UDP is connectionless this is called when it binds to a port def handle_connect(self): print "Server Started..." # This is called everytime there is something to read def handle_read(self): data, addr = self.recvfrom(2048) print str(addr)+" >> "+data # This is called all the time and causes errors if you leave it out. def handle_write(self): pass AsyncoreServerUDP() asyncore.loop()
import socket, asyncore class AsyncoreClientUDP(asyncore.dispatcher): def __init__(self, server, port): asyncore.dispatcher.__init__(self) self.server = server self.port = port self.buffer = "" # Network Connection Magic! asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.bind( ('', 0) ) # bind to all interfaces and a "random" free port. print "Connecting..." # Once a "connection" is made do this stuff. def handle_connect(self): print "Connected" # If a "connection" is closed do this stuff. def handle_close(self): self.close() # If a message has arrived, process it. def handle_read(self): data, addr = self.recv(2048) print data # Actually sends the message if there was something in the buffer. def handle_write(self): if self.buffer != "": print self.buffer sent = self.sendto(self.buffer, (self.server, self.port)) self.buffer = self.buffer[sent:] connection = AsyncoreClientUDP("127.0.0.1",5005) # create the "connection" while 1: asyncore.loop(count = 10) # Check for upto 10 packets this call? connection.buffer += raw_input(" Chat > ") # raw_input (this is a blocking call)
服务器端分成三块,启动程序,Websocket 服务,网络协议处理
protocol.py 是网络协议处理程序,主要定义自己私有网络协议,对用户请求做出响应。
#!/usr/bin/env python # -*- coding: utf-8 -*- ############################################## # Home : https://www.netkiller.cn # Author: Neo <netkiller@msn.com> # Upgrade: 2025-01-07 ############################################## import os class Protocol: def __init__(self, request = None): pass def request(self,message): self.message = message def response(self): if self.message == 'listdir': return os.listdir("/tmp") else: return self.message
websoket.py 是 Websocket 核心服务,用于处理 Websocket 端口监听,客户端与服务器心跳检测,维护Online在线用户列表,等等
#!/usr/bin/env python # -*- coding: utf-8 -*- ############################################## # Home : https://www.netkiller.cn # Author: Neo <netkiller@msn.com> # Upgrade: 2025-01-07 ############################################## import asyncio import websockets from protocol import Protocol class WebSocketServer: def __init__(self, host="localhost", port=8765): self.host = host self.port = port # 存储所有连接的客户端 self.clients = set() self.protocol = Protocol() async def handle_client(self, websocket): # 新的客户端连接 self.clients.add(websocket) try: async for message in websocket: print(f"收到消息: {message}") self.protocol.request(message) response = self.protocol.response() # 回显消息给客户端 await websocket.send(f"已收到: {response}") except websockets.ConnectionClosed as e: print(f"客户端断开连接: {e}") # finally: # 移除断开的客户端 # self.clients.remove(websocket) async def send(self, message): """向所有连接的客户端发送消息""" if self.clients: disconnected = set() for client in self.clients: try: await client.send(message) except websockets.ConnectionClosed: disconnected.add(client) # 清理已断开的客户端 self.clients -= disconnected async def start(self): print(f"启动 WebSocket 服务器: ws://{self.host}:{self.port}") async with websockets.serve(self.handle_client, self.host, self.port): await asyncio.Future() # 持续运行直到手动停止
最后 main.py 是主程序入口,负责启动 Websocket 转为守护进程在后台运行
#!/usr/bin/env python # -*- coding: utf-8 -*- ############################################## # Home : https://www.netkiller.cn # Author: Neo <netkiller@msn.com> # Upgrade: 2025-01-07 ############################################## import asyncio,os,sys from websocket import WebSocketServer async def main(): # 创建 WebSocket 服务器实例 server = WebSocketServer() # 启动 WebSocket 服务器 task = asyncio.create_task(server.start()) # 每 5 秒发送一次心跳数据 async def periodically(): while True: await asyncio.sleep(5) await server.send("ping") periodic_task = asyncio.create_task(periodically()) # 并发运行所有任务 await asyncio.gather(task, periodic_task) if __name__ == "__main__": try: pid = os.fork() if pid > 0: # exit first parent sys.exit(0) except OSError as e: print("fork #1 failed: %d (%s)" % e.errno, e.strerror) sys.exit(1) try: asyncio.run(main()) except KeyboardInterrupt as e: print(e)
下面演示一下启动过程
(.venv) neo@Neo-Mac-mini-M4 netkiller % python main.py 启动 WebSocket 服务器: ws://localhost:8765 (.venv) neo@Neo-Mac-mini-M4 netkiller % ps ax | grep main.py 39326 s001 S 0:00.03 /opt/homebrew/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/Resources/Python.app/Contents/MacOS/Python main.py 39331 s001 R+ 0:00.00 grep main.py (.venv) neo@Neo-Mac-mini-M4 netkiller % kill 39326 (.venv) neo@Neo-Mac-mini-M4 netkiller % ps ax | grep main.py 39373 s001 S+ 0:00.00 grep main.py (.venv) neo@Neo-Mac-mini-M4 netkiller %
#!/usr/bin/env python import asyncio from websockets.asyncio.client import connect async def hello(): uri = "ws://localhost:8765" async with connect(uri) as websocket: name = input("What's your name?\n> ") await websocket.send(name) print(f">>> {name}") greeting = await websocket.recv() print(f"<<< {greeting}") if __name__ == "__main__": asyncio.run(hello())
#!/usr/bin/env python import asyncio from http import HTTPStatus from websockets.asyncio.server import serve def health_check(connection, request): if request.path == "/status": return connection.respond(HTTPStatus.OK, "OK\n") async def echo(websocket): async for message in websocket: await websocket.send(message) async def main(): async with serve(echo, "localhost", 8765, process_request=health_check): await asyncio.get_running_loop().create_future() # run forever asyncio.run(main())
(.venv) neo@Neo-Mac-mini-M4 netkiller % curl http://localhost:8765/status OK