Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

21.4. Webrtc/Ortc/Voip

21.4.1. ICE Server

请看 《Netkiller Linux 手札》 WebRTC/Ortc 章节

21.4.2. NVR(Network Video Recorder)网络视频录像机

		
import argparse
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta
import numpy as np

import websockets
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack, RTCConfiguration, RTCIceCandidate, \
    RTCIceServer, MediaStreamTrack
import cv2
from aiortc.contrib.media import MediaRecorder, MediaPlayer, MediaRelay
from av import VideoFrame

logger = logging.getLogger("webrtc")
pcs = set()
config = RTCConfiguration()
# iceServer = RTCIceServer(urls='stun:stun.l.google.com:19302')
iceServer = RTCIceServer(urls='turn:www.netkiller.cn:3478',username= "neo",credential= "netkiller")
config.iceServers = []
config.iceServers.append(iceServer)
# prepare local media
# player = MediaPlayer(os.path.join(ROOT, "demo-instruct.mp4"))

# videoStreamTrack = VideoStreamTrack()
# pc.addTrack(videoStreamTrack)


class VideoTransformTrack(MediaStreamTrack):
    kind = "video"

    def __init__(self, track):
        super().__init__()  # don't forget this!
        self.track = track

    async def recv(self):
        frame = await self.track.recv()
        if frame is None:
            return None
        # logger.info(f"Frame: {frame}")
        return frame

class VideoReceiver:
    def __init__(self):
        self.track = None

    async def handle_track(self, track):
        # print("处理帧")
        self.track = track
        frame_count = 0
        while True:
            try:
                # print("等待帧...")
                frame = await asyncio.wait_for(track.recv(), timeout=5.0)
                frame_count += 1
                # print(f"接收第 {frame_count} 帧")

                if isinstance(frame, VideoFrame):
                    # print(f"帧类型: VideoFrame, pts: {frame.pts}, time_base: {frame.time_base}")
                    frame = frame.to_ndarray(format="bgr24")
                elif isinstance(frame, np.ndarray):
                    # print(f"帧类型: numpy 数组")
                    pass
                else:
                    # print(f"意外的帧类型: {type(frame)}")
                    continue

                 # 在帧上添加时间戳
                current_time = datetime.now()
                new_time = current_time - timedelta(seconds=55)
                timestamp = new_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                cv2.putText(frame, timestamp, (10, frame.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

                # cv2.imwrite(f"received_frame_{frame_count}.jpg", frame)
                # print(f"将帧 {frame_count} 保存至文件")

                cv2.imshow("Frame", frame)

                # 按 'q' 键退出程序
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            except asyncio.TimeoutError:
                logger.info("Wait...")
                await asyncio.sleep(1000)  # 等待接收帧 35 秒
            except Exception as e:
                logger.error(f"VideoReceiver Exception: {str(e)}")
                # if "Connection" in str(e):
                break
        logger.info("Quit VideoReceiver")

video_receiver = VideoReceiver()

async def signaling_server(websocket):
    # pc = RTCPeerConnection()
    pc = RTCPeerConnection(config)
    pc.addTransceiver('video', direction='recvonly')
    pc.addTransceiver('audio', direction='recvonly')
    pcs.add(pc)

    relay = MediaRelay()

    @pc.on("datachannel")
    def on_datachannel(channel):
        @channel.on("message")
        def on_message(message):
            if isinstance(message, str) and message.startswith("ping"):
                channel.send("pong" + message[4:])
                logger.info("ping:", message)
            else:
                logger.info("Received:", message)

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        logger.info("Connection state is %s", pc.connectionState)

        if pc.connectionState == "connected":
            logger.info("WebRTC Connection")

        if pc.connectionState == "closed":
            await pc.close()
            logger.info("WebRTC close")

        if pc.connectionState == "failed":
            await pc.close()
            pcs.discard(pc)


    @pc.on("track")
    def on_track(track):
        logger.info("Track %s received", track.kind)

        # if track.kind == "audio":
        # # pc.addTrack(player.audio)
        #     recorder.addTrack(relay.subscribe(track))

        if track.kind == "video":
            pc.addTrack(
                            VideoTransformTrack(relay.subscribe(track))
                        )
            recorder.addTrack(relay.subscribe(track))
            # recorder.addTrack(track)
            # if isinstance(track, MediaStreamTrack):
            #     print(f"接收视频 {track.kind}")
            #     asyncio.ensure_future(video_receiver.handle_track(track))

        @track.on("ended")
        async def on_ended():
            logger.info("Track %s ended", track.kind)
            await recorder.stop()

    try:
        # async for message in websocket:
        #     logger.info(f"收到消息: {message}")
            # 回显消息给客户端
            # await websocket.send(f"服务器已收到: {message}")

        filename = f"{uuid.uuid4()}.mp4"
        recorder =  MediaRecorder(filename)
        # recorder = MediaBlackhole()
        logger.info(f"Recorder {filename}")

        # while True:
        message = await websocket.recv()
        logger.info(f">>> {message}")
        jsonObject =  json.loads(message)
        if jsonObject['type'] == 'offer':
            # await recorder.stop()
            # Create offer
            # offer = await pc.createOffer()
            offer = RTCSessionDescription(sdp=jsonObject["sdp"], type=jsonObject["type"])
            await pc.setRemoteDescription(offer)

            logger.info(f"Recorder start")
            await recorder.start()

            answer = await pc.createAnswer()
            await pc.setLocalDescription(answer)

            # Send offer to the signaling server
            jsonString = json.dumps({
                "type": pc.localDescription.type,
                "sdp": pc.localDescription.sdp
            })
            await websocket.send(jsonString)
            logger.info(f"<<< {jsonString}")

            # print("等待连接建立...")
            while pc.connectionState != "connected":
                await asyncio.sleep(0.1)

    except websockets.ConnectionClosed as e:
        await pc.close()
        logger.error(f"ConnectionClosed: {e}")
    # finally:
    #     await websockets.close()

async def run(host,port):
    try:
        # start_server = await websockets.serve(signaling_server, "localhost", 8765)
        async with websockets.serve(signaling_server, "0.0.0.0", 5349):
            await asyncio.Future()  # 持续运行直到手动停止
    except KeyboardInterrupt:
        pass
    finally:
        asyncio.run(on_shutdown());

async def on_shutdown(app):
    # close peer connections
    coros = [pc.close() for pc in pcs]
    await asyncio.gather(*coros)
    pcs.clear()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="WebRTC audio / video / data-channels demo"
    )
    parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
    parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
    parser.add_argument(
        "--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
    )
    parser.add_argument("--record-to", help="Write received media to a file.")
    parser.add_argument("--verbose", "-v", action="count")
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    # if args.cert_file:
    #     ssl_context = ssl.SSLContext()
    #     ssl_context.load_cert_chain(args.cert_file, args.key_file)
    # else:
    #     ssl_context = None
    asyncio.run(run(args.host,args.port))
		
		
		

21.4.3. Webcam 网络直播

		
import argparse
import asyncio
import json
import logging
import os
import platform
import ssl

from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer, MediaRelay
from aiortc.rtcrtpsender import RTCRtpSender

ROOT = os.path.dirname(__file__)


relay = None
webcam = None


def create_local_tracks(play_from, decode):
    global relay, webcam

    if play_from:
        player = MediaPlayer(play_from, decode=decode)
        return player.audio, player.video
    else:
        options = {"framerate": "30", "video_size": "640x480"}
        if relay is None:
            if platform.system() == "Darwin":
                webcam = MediaPlayer(
                    "default:none", format="avfoundation", options=options
                )
            elif platform.system() == "Windows":
                webcam = MediaPlayer(
                    "video=Integrated Camera", format="dshow", options=options
                )
            else:
                webcam = MediaPlayer("/dev/video0", format="v4l2", options=options)
            relay = MediaRelay()
        return None, relay.subscribe(webcam.video)


def force_codec(pc, sender, forced_codec):
    kind = forced_codec.split("/")[0]
    codecs = RTCRtpSender.getCapabilities(kind).codecs
    transceiver = next(t for t in pc.getTransceivers() if t.sender == sender)
    transceiver.setCodecPreferences(
        [codec for codec in codecs if codec.mimeType == forced_codec]
    )


async def index(request):
    content = open(os.path.join(ROOT, "index.html"), "r").read()
    return web.Response(content_type="text/html", text=content)


async def javascript(request):
    content = open(os.path.join(ROOT, "client.js"), "r").read()
    return web.Response(content_type="application/javascript", text=content)


async def offer(request):
    params = await request.json()
    offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

    pc = RTCPeerConnection()
    pcs.add(pc)

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        print("Connection state is %s" % pc.connectionState)
        if pc.connectionState == "failed":
            await pc.close()
            pcs.discard(pc)

    # open media source
    audio, video = create_local_tracks(
        args.play_from, decode=not args.play_without_decoding
    )

    if audio:
        audio_sender = pc.addTrack(audio)
        if args.audio_codec:
            force_codec(pc, audio_sender, args.audio_codec)
        elif args.play_without_decoding:
            raise Exception("You must specify the audio codec using --audio-codec")

    if video:
        video_sender = pc.addTrack(video)
        if args.video_codec:
            force_codec(pc, video_sender, args.video_codec)
        elif args.play_without_decoding:
            raise Exception("You must specify the video codec using --video-codec")

    await pc.setRemoteDescription(offer)

    answer = await pc.createAnswer()
    await pc.setLocalDescription(answer)

    return web.Response(
        content_type="application/json",
        text=json.dumps(
            {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
        ),
    )


pcs = set()


async def on_shutdown(app):
    # close peer connections
    coros = [pc.close() for pc in pcs]
    await asyncio.gather(*coros)
    pcs.clear()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="WebRTC webcam demo")
    parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
    parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
    parser.add_argument("--play-from", help="Read the media from a file and sent it.")
    parser.add_argument(
        "--play-without-decoding",
        help=(
            "Read the media without decoding it (experimental). "
            "For now it only works with an MPEGTS container with only H.264 video."
        ),
        action="store_true",
    )
    parser.add_argument(
        "--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
    )
    parser.add_argument("--verbose", "-v", action="count")
    parser.add_argument(
        "--audio-codec", help="Force a specific audio codec (e.g. audio/opus)"
    )
    parser.add_argument(
        "--video-codec", help="Force a specific video codec (e.g. video/H264)"
    )

    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    if args.cert_file:
        ssl_context = ssl.SSLContext()
        ssl_context.load_cert_chain(args.cert_file, args.key_file)
    else:
        ssl_context = None

    app = web.Application()
    app.on_shutdown.append(on_shutdown)
    app.router.add_get("/", index)
    app.router.add_get("/client.js", javascript)
    app.router.add_post("/offer", offer)
    web.run_app(app, host=args.host, port=args.port, ssl_context=ssl_context)