| 知乎专栏 |
import asyncore, socket
class AsyncoreServerUDP(asyncore.dispatcher):
def __init__(self):
asyncore.dispatcher.__init__(self)
# Bind to port 5005 on all interfaces
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
self.bind(('', 5005))
# Even though UDP is connectionless this is called when it binds to a port
def handle_connect(self):
print "Server Started..."
# This is called everytime there is something to read
def handle_read(self):
data, addr = self.recvfrom(2048)
print str(addr)+" >> "+data
# This is called all the time and causes errors if you leave it out.
def handle_write(self):
pass
AsyncoreServerUDP()
asyncore.loop()
import socket, asyncore
class AsyncoreClientUDP(asyncore.dispatcher):
def __init__(self, server, port):
asyncore.dispatcher.__init__(self)
self.server = server
self.port = port
self.buffer = ""
# Network Connection Magic!
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
self.bind( ('', 0) ) # bind to all interfaces and a "random" free port.
print "Connecting..."
# Once a "connection" is made do this stuff.
def handle_connect(self):
print "Connected"
# If a "connection" is closed do this stuff.
def handle_close(self):
self.close()
# If a message has arrived, process it.
def handle_read(self):
data, addr = self.recv(2048)
print data
# Actually sends the message if there was something in the buffer.
def handle_write(self):
if self.buffer != "":
print self.buffer
sent = self.sendto(self.buffer, (self.server, self.port))
self.buffer = self.buffer[sent:]
connection = AsyncoreClientUDP("127.0.0.1",5005) # create the "connection"
while 1:
asyncore.loop(count = 10) # Check for upto 10 packets this call?
connection.buffer += raw_input(" Chat > ") # raw_input (this is a blocking call)
服务器端分成三块,启动程序,Websocket 服务,网络协议处理
protocol.py 是网络协议处理程序,主要定义自己私有网络协议,对用户请求做出响应。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################
# Home : https://www.netkiller.cn
# Author: Neo <netkiller@msn.com>
# Upgrade: 2025-01-07
##############################################
import os
class Protocol:
def __init__(self, request = None):
pass
def request(self,message):
self.message = message
def response(self):
if self.message == 'listdir':
return os.listdir("/tmp")
else:
return self.message
websoket.py 是 Websocket 核心服务,用于处理 Websocket 端口监听,客户端与服务器心跳检测,维护Online在线用户列表,等等
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################
# Home : https://www.netkiller.cn
# Author: Neo <netkiller@msn.com>
# Upgrade: 2025-01-07
##############################################
import asyncio
import websockets
from protocol import Protocol
class WebSocketServer:
def __init__(self, host="localhost", port=8765):
self.host = host
self.port = port
# 存储所有连接的客户端
self.clients = set()
self.protocol = Protocol()
async def handle_client(self, websocket):
# 新的客户端连接
self.clients.add(websocket)
try:
async for message in websocket:
print(f"收到消息: {message}")
self.protocol.request(message)
response = self.protocol.response()
# 回显消息给客户端
await websocket.send(f"已收到: {response}")
except websockets.ConnectionClosed as e:
print(f"客户端断开连接: {e}")
# finally:
# 移除断开的客户端
# self.clients.remove(websocket)
async def send(self, message):
"""向所有连接的客户端发送消息"""
if self.clients:
disconnected = set()
for client in self.clients:
try:
await client.send(message)
except websockets.ConnectionClosed:
disconnected.add(client)
# 清理已断开的客户端
self.clients -= disconnected
async def start(self):
print(f"启动 WebSocket 服务器: ws://{self.host}:{self.port}")
async with websockets.serve(self.handle_client, self.host, self.port):
await asyncio.Future() # 持续运行直到手动停止
最后 main.py 是主程序入口,负责启动 Websocket 转为守护进程在后台运行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##############################################
# Home : https://www.netkiller.cn
# Author: Neo <netkiller@msn.com>
# Upgrade: 2025-01-07
##############################################
import asyncio,os,sys
from websocket import WebSocketServer
async def main():
# 创建 WebSocket 服务器实例
server = WebSocketServer()
# 启动 WebSocket 服务器
task = asyncio.create_task(server.start())
# 每 5 秒发送一次心跳数据
async def periodically():
while True:
await asyncio.sleep(5)
await server.send("ping")
periodic_task = asyncio.create_task(periodically())
# 并发运行所有任务
await asyncio.gather(task, periodic_task)
if __name__ == "__main__":
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError as e:
print("fork #1 failed: %d (%s)" % e.errno, e.strerror)
sys.exit(1)
try:
asyncio.run(main())
except KeyboardInterrupt as e:
print(e)
下面演示一下启动过程
(.venv) neo@Neo-Mac-mini-M4 netkiller % python main.py 启动 WebSocket 服务器: ws://localhost:8765 (.venv) neo@Neo-Mac-mini-M4 netkiller % ps ax | grep main.py 39326 s001 S 0:00.03 /opt/homebrew/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/Resources/Python.app/Contents/MacOS/Python main.py 39331 s001 R+ 0:00.00 grep main.py (.venv) neo@Neo-Mac-mini-M4 netkiller % kill 39326 (.venv) neo@Neo-Mac-mini-M4 netkiller % ps ax | grep main.py 39373 s001 S+ 0:00.00 grep main.py (.venv) neo@Neo-Mac-mini-M4 netkiller %
#!/usr/bin/env python
import asyncio
from websockets.asyncio.client import connect
async def hello():
uri = "ws://localhost:8765"
async with connect(uri) as websocket:
name = input("What's your name?\n> ")
await websocket.send(name)
print(f">>> {name}")
greeting = await websocket.recv()
print(f"<<< {greeting}")
if __name__ == "__main__":
asyncio.run(hello())
#!/usr/bin/env python
import asyncio
from http import HTTPStatus
from websockets.asyncio.server import serve
def health_check(connection, request):
if request.path == "/status":
return connection.respond(HTTPStatus.OK, "OK\n")
async def echo(websocket):
async for message in websocket:
await websocket.send(message)
async def main():
async with serve(echo, "localhost", 8765, process_request=health_check):
await asyncio.get_running_loop().create_future() # run forever
asyncio.run(main())
(.venv) neo@Neo-Mac-mini-M4 netkiller % curl http://localhost:8765/status OK