Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

21.4. Webrtc/Ortc

21.4.1. coturn - ICE Server

coturn 主要有两个包,分别是 coturn 和 coturn-utils,coturn-client-devel 我们用不到

			
[root@netkiller ~]# dnf search coturn
Last metadata expiration check: 1:40:06 ago on Fri 07 Feb 2025 10:27:51 PM CST.
========================================= Name Exactly Matched: coturn =========================================
coturn.x86_64 : TURN/STUN & ICE Server
======================================== Name & Summary Matched: coturn ========================================
coturn-client-devel.x86_64 : Coturn client development headers
coturn-utils.x86_64 : Coturn utils
============================================= Name Matched: coturn =============================================
coturn-client-libs.x86_64 : TURN client static library			
			
		

安装 coturn 服务器

			
[root@netkiller ~]# dnf install coturn coturn-utils -y			
			
		

查看一下包内有那些工具

			
[root@netkiller ~]# rpm -ql coturn | egrep "bin|etc"
/etc/coturn
/etc/coturn/turnserver.conf
/etc/logrotate.d/coturn
/etc/pki/coturn
/etc/pki/coturn/private
/etc/pki/coturn/public
/usr/bin/turnadmin
/usr/bin/turnserver
/usr/share/doc/coturn/etc
/usr/share/doc/coturn/etc/turnserver.conf

[root@netkiller ~]# rpm -ql coturn-utils | grep bin
/usr/bin/turnutils_natdiscovery
/usr/bin/turnutils_oauth
/usr/bin/turnutils_peer
/usr/bin/turnutils_stunclient
/usr/bin/turnutils_uclient	
			
		

备份配置文件

			
cp /etc/coturn/turnserver.conf{,.original}
			
		

生成证书

			
openssl req -x509 -newkey rsa:2048 -keyout /etc/pki/coturn/private/turn_server_pkey.pem -out /etc/pki/coturn/public/turn_server_cert.pem -days 365 -nodes			
			
		

创建用户和密码

			
turnadmin -a -u netkiller -p 123456 -r rtc.netkiller.cn
			
		

也可以通过配置文件 /etc/coturn/turnserver.conf 创建静态用户和密码

			
user=netkiller:123456
realm=rtc.netkiller.cn		
			
		

开放防火墙端口

			
firewall-cmd --zone=public --add-port=3478/udp --permanent
firewall-cmd --zone=public --add-port=5349/udp --permanent
firewall-cmd --reload			
			
		

/etc/coturn/turnserver.conf 配置文件

			
listening-ip=0.0.0.0 	# 配置为0.0.0.0即可,会监听所有ip请求
listening-port=3478 	# STUN服务端口为3478
tls-listening-port=5349	# STUN的 TLS 监听端口
relay-ip		配置为服务器的外网ip地址
external-ip 	配置为服务器的外网ip地址			
			
		

仅供参考

		
[root@netkiller ~]# grep -v ^# /etc/coturn/turnserver.conf | grep -v "^$"
relay-ip=192.168.0.71
external-ip=139.29.154.210
user=neo:netkiller
realm=netkiller.cn
cert=/etc/pki/coturn/public/turn_server_cert.pem
pkey=/etc/pki/coturn/private/turn_server_pkey.pem
log-file=/var/log/coturn/turnserver.log
simple-log
cli-ip=127.0.0.1
cli-port=5766
cli-password=qwerty
no-rfc5780
no-stun-backward-compatibility
response-origin-only-with-rfc5780		
		
		

启动服务

			
systemctl enable coturn
systemctl start coturn			
			
		

21.4.1.1. Javascript 连接 ICE Server 例子

			
var iceServers = {
    iceServers: [
        {
            urls: 'turn:your-external-ip-address:3478',
            username: 'netkiller',
            credential: '123456'
        }]
};

connection = new RTCPeerConnection(iceServers);
			
			
			

21.4.1.2. 测试

确认端口状态

			
[root@netkiller ~]# lsof -i :3478
COMMAND       PID   USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
turnserve 2003533 coturn   24u  IPv4 25225693      0t0  TCP localhost:stun (LISTEN)
turnserve 2003533 coturn   25u  IPv4 25231980      0t0  TCP localhost:stun (LISTEN)
turnserve 2003533 coturn   26u  IPv4 25225694      0t0  TCP netkiller:stun (LISTEN)
turnserve 2003533 coturn   27u  IPv4 25225695      0t0  TCP netkiller:stun (LISTEN)			
			
			

测试程序 turnutils_uclient

			
接着对使用coturn搭建的STUN/TURN服务使用turnutils_uclient程序测试其TURN服务是否正常。
直接连接服务测试服务是否正常。为保证测试使用的服务是TURN服务,在TURN服务启动时,关掉STUN服务。
在TURN服务启动时,如果是命令行,加入"--no-stun"配置;如果使用配置文件的话,加入"no-stun"选项。
使用coTurn服务启动TURN服务后,执行以下命令即可:

turnutils_uclient -v -t -T -u <user> -w <password> xxx.xxx.xxx.xxx

参数说明:

-v 表示给出详细提示
-t 使用TCP协议(默认使用UDP)
-T TCP协议中继传输(默认是UDP)
-u TURN的用户名
-w TURN服务对应用户的密码
xxx.xxx.xxx.xxx TURN服务的IP地址
			
			
			
[root@netkiller ~]# turnutils_uclient -v -t -T -u neo -w netkiller 127.0.0.1
0: (2003881): INFO: IPv4. Connected from: 127.0.0.1:58632
0: (2003881): INFO: IPv4. Connected to: 127.0.0.1:3478
0: (2003881): INFO: allocate sent
0: (2003881): INFO: allocate response received: 
0: (2003881): INFO: allocate sent
0: (2003881): INFO: allocate response received: 
0: (2003881): INFO: success
0: (2003881): INFO: IPv4. Received relay addr: 139.9.54.21:64324
0: (2003881): INFO: clnet_allocate: rtv=0
0: (2003881): INFO: refresh sent
0: (2003881): INFO: refresh response received: 
0: (2003881): INFO: success
0: (2003881): INFO: IPv4. Connected from: 127.0.0.1:58642
0: (2003881): INFO: IPv4. Connected to: 127.0.0.1:3478
0: (2003881): INFO: IPv4. Connected from: 127.0.0.1:58644
0: (2003881): INFO: IPv4. Connected to: 127.0.0.1:3478
0: (2003881): INFO: allocate sent
0: (2003881): INFO: allocate response received: 
0: (2003881): INFO: allocate sent
0: (2003881): INFO: allocate response received: 
0: (2003881): INFO: success
0: (2003881): INFO: IPv4. Received relay addr: 139.9.54.21:60677
0: (2003881): INFO: clnet_allocate: rtv=0
0: (2003881): INFO: refresh sent
0: (2003881): INFO: refresh response received: 
0: (2003881): INFO: success
0: (2003881): INFO: allocate sent
0: (2003881): INFO: allocate response received: 
0: (2003881): INFO: allocate sent
0: (2003881): INFO: allocate response received: 
0: (2003881): INFO: success
0: (2003881): INFO: IPv4. Received relay addr: 139.9.54.21:57197
0: (2003881): INFO: clnet_allocate: rtv=0
0: (2003881): INFO: refresh sent
0: (2003881): INFO: refresh response received: 
0: (2003881): INFO: success
0: (2003881): INFO: create perm sent: 139.9.54.21:57197
0: (2003881): INFO: cp response received: 
0: (2003881): INFO: success
0: (2003881): INFO: create perm sent: 139.9.54.21:60677
0: (2003881): INFO: cp response received: 
0: (2003881): INFO: success
0: (2003881): INFO: tcp connect sent
0: (2003881): INFO: connection bind sent
0: (2003881): INFO: connect bind response received: 
0: (2003881): INFO: success
0: (2003881): INFO: IPv4. TCP data network connected to: 127.0.0.1:3478
0: (2003881): INFO: connection bind sent
0: (2003881): INFO: connect bind response received: 
0: (2003881): INFO: success
0: (2003881): INFO: IPv4. TCP data network connected to: 127.0.0.1:3478
0: (2003881): INFO: Total connect time is 0
0: (2003881): INFO: 2 connections are completed
1: (2003881): INFO: start_mclient: msz=2, tot_send_msgs=0, tot_recv_msgs=0, tot_send_bytes ~ 0, tot_recv_bytes ~ 0
2: (2003881): INFO: start_mclient: msz=2, tot_send_msgs=5, tot_recv_msgs=5, tot_send_bytes ~ 500, tot_recv_bytes ~ 500
3: (2003881): INFO: start_mclient: msz=2, tot_send_msgs=5, tot_recv_msgs=5, tot_send_bytes ~ 500, tot_recv_bytes ~ 500
3: (2003881): INFO: done, connection 0x7f009c80e010 closed.
3: (2003881): INFO: done, connection 0x7f009c82f010 closed.
3: (2003881): INFO: start_mclient: tot_send_msgs=10, tot_recv_msgs=10
3: (2003881): INFO: start_mclient: tot_send_bytes ~ 1000, tot_recv_bytes ~ 1000
3: (2003881): INFO: Total transmit time is 3
3: (2003881): INFO: Total lost packets 0 (0.000000%), total send dropped 0 (0.000000%)
3: (2003881): INFO: Average round trip delay 4.300000 ms; min = 0 ms, max = 21 ms
3: (2003881): INFO: Average jitter 8.400000 ms; min = 0 ms, max = 21 ms			
			
			
			
[root@netkiller ~]# turnutils_stunclient -p 3478 127.0.0.1
0: (2004030): INFO: IPv4. UDP reflexive addr: 127.0.0.1:45826				
			
			
			
[root@netkiller ~]# turnutils_natdiscovery -m 127.0.0.1

-= Mapping Behavior Discovery =-
0: (2003693): INFO: IPv4. UDP reflexive addr: 127.0.0.1:39570
0: (2003693): INFO: IPv4. Local addr: : 0.0.0.0:39570

[root@netkiller ~]# turnutils_natdiscovery -f 127.0.0.1

-= Filtering Behavior Discovery =-
0: (2003696): INFO: IPv4. UDP reflexive addr: 127.0.0.1:57186
0: (2003696): INFO: IPv4. Local addr: : 0.0.0.0:57186			
			
			
			
[root@netkiller ~]# turnutils_peer -v
0: (2003645): INFO: Start
0: (2003645): INFO: End
0: (2003645): INFO: Start
0: (2003645): INFO: End
0: (2003645): INFO: Start
0: (2003645): INFO: End
0: (2003645): INFO: Start
0: (2003645): INFO: End			
			
			

			
[root@netkiller ~]# journalctl -f -u coturn.service
Feb 09 12:17:58 netkiller systemd[1]: Stopping coturn...
Feb 09 12:17:58 netkiller systemd[1]: coturn.service: Deactivated successfully.
Feb 09 12:17:58 netkiller systemd[1]: Stopped coturn.
Feb 09 12:17:58 netkiller systemd[1]: Starting coturn...
Feb 09 12:17:58 netkiller systemd[1]: Started coturn.
Feb 09 12:29:50 netkiller systemd[1]: Stopping coturn...
Feb 09 12:29:53 netkiller systemd[1]: coturn.service: Deactivated successfully.
Feb 09 12:29:53 netkiller systemd[1]: Stopped coturn.
Feb 09 12:29:53 netkiller systemd[1]: Starting coturn...
Feb 09 12:29:53 netkiller systemd[1]: Started coturn.			
			
			

21.4.2. NVR(Network Video Recorder)网络视频录像机

		
import argparse
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta
import numpy as np

import websockets
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack, RTCConfiguration, RTCIceCandidate, \
    RTCIceServer, MediaStreamTrack
import cv2
from aiortc.contrib.media import MediaRecorder, MediaPlayer, MediaRelay
from av import VideoFrame

logger = logging.getLogger("webrtc")
pcs = set()
config = RTCConfiguration()
# iceServer = RTCIceServer(urls='stun:stun.l.google.com:19302')
iceServer = RTCIceServer(urls='turn:www.netkiller.cn:3478',username= "neo",credential= "netkiller")
config.iceServers = []
config.iceServers.append(iceServer)
# prepare local media
# player = MediaPlayer(os.path.join(ROOT, "demo-instruct.mp4"))

# videoStreamTrack = VideoStreamTrack()
# pc.addTrack(videoStreamTrack)


class VideoTransformTrack(MediaStreamTrack):
    kind = "video"

    def __init__(self, track):
        super().__init__()  # don't forget this!
        self.track = track

    async def recv(self):
        frame = await self.track.recv()
        if frame is None:
            return None
        # logger.info(f"Frame: {frame}")
        return frame

class VideoReceiver:
    def __init__(self):
        self.track = None

    async def handle_track(self, track):
        # print("处理帧")
        self.track = track
        frame_count = 0
        while True:
            try:
                # print("等待帧...")
                frame = await asyncio.wait_for(track.recv(), timeout=5.0)
                frame_count += 1
                # print(f"接收第 {frame_count} 帧")

                if isinstance(frame, VideoFrame):
                    # print(f"帧类型: VideoFrame, pts: {frame.pts}, time_base: {frame.time_base}")
                    frame = frame.to_ndarray(format="bgr24")
                elif isinstance(frame, np.ndarray):
                    # print(f"帧类型: numpy 数组")
                    pass
                else:
                    # print(f"意外的帧类型: {type(frame)}")
                    continue

                 # 在帧上添加时间戳
                current_time = datetime.now()
                new_time = current_time - timedelta(seconds=55)
                timestamp = new_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                cv2.putText(frame, timestamp, (10, frame.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

                # cv2.imwrite(f"received_frame_{frame_count}.jpg", frame)
                # print(f"将帧 {frame_count} 保存至文件")

                cv2.imshow("Frame", frame)

                # 按 'q' 键退出程序
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            except asyncio.TimeoutError:
                logger.info("Wait...")
                await asyncio.sleep(1000)  # 等待接收帧 35 秒
            except Exception as e:
                logger.error(f"VideoReceiver Exception: {str(e)}")
                # if "Connection" in str(e):
                break
        logger.info("Quit VideoReceiver")

video_receiver = VideoReceiver()

async def signaling_server(websocket):
    # pc = RTCPeerConnection()
    pc = RTCPeerConnection(config)
    pc.addTransceiver('video', direction='recvonly')
    pc.addTransceiver('audio', direction='recvonly')
    pcs.add(pc)

    relay = MediaRelay()

    @pc.on("datachannel")
    def on_datachannel(channel):
        @channel.on("message")
        def on_message(message):
            if isinstance(message, str) and message.startswith("ping"):
                channel.send("pong" + message[4:])
                logger.info("ping:", message)
            else:
                logger.info("Received:", message)

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        logger.info("Connection state is %s", pc.connectionState)

        if pc.connectionState == "connected":
            logger.info("WebRTC Connection")

        if pc.connectionState == "closed":
            await pc.close()
            logger.info("WebRTC close")

        if pc.connectionState == "failed":
            await pc.close()
            pcs.discard(pc)


    @pc.on("track")
    def on_track(track):
        logger.info("Track %s received", track.kind)

        # if track.kind == "audio":
        # # pc.addTrack(player.audio)
        #     recorder.addTrack(relay.subscribe(track))

        if track.kind == "video":
            pc.addTrack(
                            VideoTransformTrack(relay.subscribe(track))
                        )
            recorder.addTrack(relay.subscribe(track))
            # recorder.addTrack(track)
            # if isinstance(track, MediaStreamTrack):
            #     print(f"接收视频 {track.kind}")
            #     asyncio.ensure_future(video_receiver.handle_track(track))

        @track.on("ended")
        async def on_ended():
            logger.info("Track %s ended", track.kind)
            await recorder.stop()

    try:
        # async for message in websocket:
        #     logger.info(f"收到消息: {message}")
            # 回显消息给客户端
            # await websocket.send(f"服务器已收到: {message}")

        filename = f"{uuid.uuid4()}.mp4"
        recorder =  MediaRecorder(filename)
        # recorder = MediaBlackhole()
        logger.info(f"Recorder {filename}")

        # while True:
        message = await websocket.recv()
        logger.info(f">>> {message}")
        jsonObject =  json.loads(message)
        if jsonObject['type'] == 'offer':
            # await recorder.stop()
            # Create offer
            # offer = await pc.createOffer()
            offer = RTCSessionDescription(sdp=jsonObject["sdp"], type=jsonObject["type"])
            await pc.setRemoteDescription(offer)

            logger.info(f"Recorder start")
            await recorder.start()

            answer = await pc.createAnswer()
            await pc.setLocalDescription(answer)

            # Send offer to the signaling server
            jsonString = json.dumps({
                "type": pc.localDescription.type,
                "sdp": pc.localDescription.sdp
            })
            await websocket.send(jsonString)
            logger.info(f"<<< {jsonString}")

            # print("等待连接建立...")
            while pc.connectionState != "connected":
                await asyncio.sleep(0.1)

    except websockets.ConnectionClosed as e:
        await pc.close()
        logger.error(f"ConnectionClosed: {e}")
    # finally:
    #     await websockets.close()

async def run(host,port):
    try:
        # start_server = await websockets.serve(signaling_server, "localhost", 8765)
        async with websockets.serve(signaling_server, "0.0.0.0", 5349):
            await asyncio.Future()  # 持续运行直到手动停止
    except KeyboardInterrupt:
        pass
    finally:
        asyncio.run(on_shutdown());

async def on_shutdown(app):
    # close peer connections
    coros = [pc.close() for pc in pcs]
    await asyncio.gather(*coros)
    pcs.clear()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="WebRTC audio / video / data-channels demo"
    )
    parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
    parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
    parser.add_argument(
        "--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
    )
    parser.add_argument("--record-to", help="Write received media to a file.")
    parser.add_argument("--verbose", "-v", action="count")
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    # if args.cert_file:
    #     ssl_context = ssl.SSLContext()
    #     ssl_context.load_cert_chain(args.cert_file, args.key_file)
    # else:
    #     ssl_context = None
    asyncio.run(run(args.host,args.port))
		
		
		

21.4.3. Webcam 网络直播

		
import argparse
import asyncio
import json
import logging
import os
import platform
import ssl

from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer, MediaRelay
from aiortc.rtcrtpsender import RTCRtpSender

ROOT = os.path.dirname(__file__)


relay = None
webcam = None


def create_local_tracks(play_from, decode):
    global relay, webcam

    if play_from:
        player = MediaPlayer(play_from, decode=decode)
        return player.audio, player.video
    else:
        options = {"framerate": "30", "video_size": "640x480"}
        if relay is None:
            if platform.system() == "Darwin":
                webcam = MediaPlayer(
                    "default:none", format="avfoundation", options=options
                )
            elif platform.system() == "Windows":
                webcam = MediaPlayer(
                    "video=Integrated Camera", format="dshow", options=options
                )
            else:
                webcam = MediaPlayer("/dev/video0", format="v4l2", options=options)
            relay = MediaRelay()
        return None, relay.subscribe(webcam.video)


def force_codec(pc, sender, forced_codec):
    kind = forced_codec.split("/")[0]
    codecs = RTCRtpSender.getCapabilities(kind).codecs
    transceiver = next(t for t in pc.getTransceivers() if t.sender == sender)
    transceiver.setCodecPreferences(
        [codec for codec in codecs if codec.mimeType == forced_codec]
    )


async def index(request):
    content = open(os.path.join(ROOT, "index.html"), "r").read()
    return web.Response(content_type="text/html", text=content)


async def javascript(request):
    content = open(os.path.join(ROOT, "client.js"), "r").read()
    return web.Response(content_type="application/javascript", text=content)


async def offer(request):
    params = await request.json()
    offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

    pc = RTCPeerConnection()
    pcs.add(pc)

    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        print("Connection state is %s" % pc.connectionState)
        if pc.connectionState == "failed":
            await pc.close()
            pcs.discard(pc)

    # open media source
    audio, video = create_local_tracks(
        args.play_from, decode=not args.play_without_decoding
    )

    if audio:
        audio_sender = pc.addTrack(audio)
        if args.audio_codec:
            force_codec(pc, audio_sender, args.audio_codec)
        elif args.play_without_decoding:
            raise Exception("You must specify the audio codec using --audio-codec")

    if video:
        video_sender = pc.addTrack(video)
        if args.video_codec:
            force_codec(pc, video_sender, args.video_codec)
        elif args.play_without_decoding:
            raise Exception("You must specify the video codec using --video-codec")

    await pc.setRemoteDescription(offer)

    answer = await pc.createAnswer()
    await pc.setLocalDescription(answer)

    return web.Response(
        content_type="application/json",
        text=json.dumps(
            {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
        ),
    )


pcs = set()


async def on_shutdown(app):
    # close peer connections
    coros = [pc.close() for pc in pcs]
    await asyncio.gather(*coros)
    pcs.clear()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="WebRTC webcam demo")
    parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
    parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
    parser.add_argument("--play-from", help="Read the media from a file and sent it.")
    parser.add_argument(
        "--play-without-decoding",
        help=(
            "Read the media without decoding it (experimental). "
            "For now it only works with an MPEGTS container with only H.264 video."
        ),
        action="store_true",
    )
    parser.add_argument(
        "--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
    )
    parser.add_argument("--verbose", "-v", action="count")
    parser.add_argument(
        "--audio-codec", help="Force a specific audio codec (e.g. audio/opus)"
    )
    parser.add_argument(
        "--video-codec", help="Force a specific video codec (e.g. video/H264)"
    )

    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    if args.cert_file:
        ssl_context = ssl.SSLContext()
        ssl_context.load_cert_chain(args.cert_file, args.key_file)
    else:
        ssl_context = None

    app = web.Application()
    app.on_shutdown.append(on_shutdown)
    app.router.add_get("/", index)
    app.router.add_get("/client.js", javascript)
    app.router.add_post("/offer", offer)
    web.run_app(app, host=args.host, port=args.port, ssl_context=ssl_context)