21.4.2. NVR(Network Video Recorder)网络视频录像机
import argparse
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta
import numpy as np
import websockets
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack, RTCConfiguration, RTCIceCandidate, \
RTCIceServer, MediaStreamTrack
import cv2
from aiortc.contrib.media import MediaRecorder, MediaPlayer, MediaRelay
from av import VideoFrame
logger = logging.getLogger("webrtc")
pcs = set()
config = RTCConfiguration()
# iceServer = RTCIceServer(urls='stun:stun.l.google.com:19302')
iceServer = RTCIceServer(urls='turn:www.netkiller.cn:3478',username= "neo",credential= "netkiller")
config.iceServers = []
config.iceServers.append(iceServer)
# prepare local media
# player = MediaPlayer(os.path.join(ROOT, "demo-instruct.mp4"))
# videoStreamTrack = VideoStreamTrack()
# pc.addTrack(videoStreamTrack)
class VideoTransformTrack(MediaStreamTrack):
kind = "video"
def __init__(self, track):
super().__init__() # don't forget this!
self.track = track
async def recv(self):
frame = await self.track.recv()
if frame is None:
return None
# logger.info(f"Frame: {frame}")
return frame
class VideoReceiver:
def __init__(self):
self.track = None
async def handle_track(self, track):
# print("处理帧")
self.track = track
frame_count = 0
while True:
try:
# print("等待帧...")
frame = await asyncio.wait_for(track.recv(), timeout=5.0)
frame_count += 1
# print(f"接收第 {frame_count} 帧")
if isinstance(frame, VideoFrame):
# print(f"帧类型: VideoFrame, pts: {frame.pts}, time_base: {frame.time_base}")
frame = frame.to_ndarray(format="bgr24")
elif isinstance(frame, np.ndarray):
# print(f"帧类型: numpy 数组")
pass
else:
# print(f"意外的帧类型: {type(frame)}")
continue
# 在帧上添加时间戳
current_time = datetime.now()
new_time = current_time - timedelta(seconds=55)
timestamp = new_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
cv2.putText(frame, timestamp, (10, frame.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
# cv2.imwrite(f"received_frame_{frame_count}.jpg", frame)
# print(f"将帧 {frame_count} 保存至文件")
cv2.imshow("Frame", frame)
# 按 'q' 键退出程序
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except asyncio.TimeoutError:
logger.info("Wait...")
await asyncio.sleep(1000) # 等待接收帧 35 秒
except Exception as e:
logger.error(f"VideoReceiver Exception: {str(e)}")
# if "Connection" in str(e):
break
logger.info("Quit VideoReceiver")
video_receiver = VideoReceiver()
async def signaling_server(websocket):
# pc = RTCPeerConnection()
pc = RTCPeerConnection(config)
pc.addTransceiver('video', direction='recvonly')
pc.addTransceiver('audio', direction='recvonly')
pcs.add(pc)
relay = MediaRelay()
@pc.on("datachannel")
def on_datachannel(channel):
@channel.on("message")
def on_message(message):
if isinstance(message, str) and message.startswith("ping"):
channel.send("pong" + message[4:])
logger.info("ping:", message)
else:
logger.info("Received:", message)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
logger.info("Connection state is %s", pc.connectionState)
if pc.connectionState == "connected":
logger.info("WebRTC Connection")
if pc.connectionState == "closed":
await pc.close()
logger.info("WebRTC close")
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
@pc.on("track")
def on_track(track):
logger.info("Track %s received", track.kind)
# if track.kind == "audio":
# # pc.addTrack(player.audio)
# recorder.addTrack(relay.subscribe(track))
if track.kind == "video":
pc.addTrack(
VideoTransformTrack(relay.subscribe(track))
)
recorder.addTrack(relay.subscribe(track))
# recorder.addTrack(track)
# if isinstance(track, MediaStreamTrack):
# print(f"接收视频 {track.kind}")
# asyncio.ensure_future(video_receiver.handle_track(track))
@track.on("ended")
async def on_ended():
logger.info("Track %s ended", track.kind)
await recorder.stop()
try:
# async for message in websocket:
# logger.info(f"收到消息: {message}")
# 回显消息给客户端
# await websocket.send(f"服务器已收到: {message}")
filename = f"{uuid.uuid4()}.mp4"
recorder = MediaRecorder(filename)
# recorder = MediaBlackhole()
logger.info(f"Recorder {filename}")
# while True:
message = await websocket.recv()
logger.info(f">>> {message}")
jsonObject = json.loads(message)
if jsonObject['type'] == 'offer':
# await recorder.stop()
# Create offer
# offer = await pc.createOffer()
offer = RTCSessionDescription(sdp=jsonObject["sdp"], type=jsonObject["type"])
await pc.setRemoteDescription(offer)
logger.info(f"Recorder start")
await recorder.start()
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
# Send offer to the signaling server
jsonString = json.dumps({
"type": pc.localDescription.type,
"sdp": pc.localDescription.sdp
})
await websocket.send(jsonString)
logger.info(f"<<< {jsonString}")
# print("等待连接建立...")
while pc.connectionState != "connected":
await asyncio.sleep(0.1)
except websockets.ConnectionClosed as e:
await pc.close()
logger.error(f"ConnectionClosed: {e}")
# finally:
# await websockets.close()
async def run(host,port):
try:
# start_server = await websockets.serve(signaling_server, "localhost", 8765)
async with websockets.serve(signaling_server, "0.0.0.0", 5349):
await asyncio.Future() # 持续运行直到手动停止
except KeyboardInterrupt:
pass
finally:
asyncio.run(on_shutdown());
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="WebRTC audio / video / data-channels demo"
)
parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
parser.add_argument(
"--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
)
parser.add_argument("--record-to", help="Write received media to a file.")
parser.add_argument("--verbose", "-v", action="count")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# if args.cert_file:
# ssl_context = ssl.SSLContext()
# ssl_context.load_cert_chain(args.cert_file, args.key_file)
# else:
# ssl_context = None
asyncio.run(run(args.host,args.port))
import argparse
import asyncio
import json
import logging
import os
import platform
import ssl
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer, MediaRelay
from aiortc.rtcrtpsender import RTCRtpSender
ROOT = os.path.dirname(__file__)
relay = None
webcam = None
def create_local_tracks(play_from, decode):
global relay, webcam
if play_from:
player = MediaPlayer(play_from, decode=decode)
return player.audio, player.video
else:
options = {"framerate": "30", "video_size": "640x480"}
if relay is None:
if platform.system() == "Darwin":
webcam = MediaPlayer(
"default:none", format="avfoundation", options=options
)
elif platform.system() == "Windows":
webcam = MediaPlayer(
"video=Integrated Camera", format="dshow", options=options
)
else:
webcam = MediaPlayer("/dev/video0", format="v4l2", options=options)
relay = MediaRelay()
return None, relay.subscribe(webcam.video)
def force_codec(pc, sender, forced_codec):
kind = forced_codec.split("/")[0]
codecs = RTCRtpSender.getCapabilities(kind).codecs
transceiver = next(t for t in pc.getTransceivers() if t.sender == sender)
transceiver.setCodecPreferences(
[codec for codec in codecs if codec.mimeType == forced_codec]
)
async def index(request):
content = open(os.path.join(ROOT, "index.html"), "r").read()
return web.Response(content_type="text/html", text=content)
async def javascript(request):
content = open(os.path.join(ROOT, "client.js"), "r").read()
return web.Response(content_type="application/javascript", text=content)
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
print("Connection state is %s" % pc.connectionState)
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
# open media source
audio, video = create_local_tracks(
args.play_from, decode=not args.play_without_decoding
)
if audio:
audio_sender = pc.addTrack(audio)
if args.audio_codec:
force_codec(pc, audio_sender, args.audio_codec)
elif args.play_without_decoding:
raise Exception("You must specify the audio codec using --audio-codec")
if video:
video_sender = pc.addTrack(video)
if args.video_codec:
force_codec(pc, video_sender, args.video_codec)
elif args.play_without_decoding:
raise Exception("You must specify the video codec using --video-codec")
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
),
)
pcs = set()
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WebRTC webcam demo")
parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
parser.add_argument("--play-from", help="Read the media from a file and sent it.")
parser.add_argument(
"--play-without-decoding",
help=(
"Read the media without decoding it (experimental). "
"For now it only works with an MPEGTS container with only H.264 video."
),
action="store_true",
)
parser.add_argument(
"--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
)
parser.add_argument("--verbose", "-v", action="count")
parser.add_argument(
"--audio-codec", help="Force a specific audio codec (e.g. audio/opus)"
)
parser.add_argument(
"--video-codec", help="Force a specific video codec (e.g. video/H264)"
)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if args.cert_file:
ssl_context = ssl.SSLContext()
ssl_context.load_cert_chain(args.cert_file, args.key_file)
else:
ssl_context = None
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_get("/", index)
app.router.add_get("/client.js", javascript)
app.router.add_post("/offer", offer)
web.run_app(app, host=args.host, port=args.port, ssl_context=ssl_context)