From 9f0c2a214e9648a86d10602998749e24f7c80947 Mon Sep 17 00:00:00 2001 From: cgojin Date: Fri, 27 Nov 2020 22:55:01 +0800 Subject: [PATCH] feat: updae to pub/sub pc (two-pc) --- configs/docker/sfu.toml | 20 +- examples/echotest/index.html | 518 +++++++++++++++++++++++++++++++++++ go.mod | 8 +- go.sum | 38 +-- pkg/node/avp/sfu.go | 46 +--- pkg/node/biz/peer.go | 293 +++++++++----------- pkg/node/biz/room.go | 4 +- pkg/node/biz/server.go | 19 +- pkg/node/sfu/internal.go | 39 ++- pkg/node/sfu/server.go | 17 +- pkg/proto/biz.go | 47 ++-- pkg/proto/proto.go | 1 + 12 files changed, 761 insertions(+), 289 deletions(-) create mode 100644 examples/echotest/index.html diff --git a/configs/docker/sfu.toml b/configs/docker/sfu.toml index 2cf76ebb9..fa432ad5b 100644 --- a/configs/docker/sfu.toml +++ b/configs/docker/sfu.toml @@ -3,9 +3,6 @@ pprof = ":6062" # data center id dc = "dc1" -[log] -level = "info" - [etcd] # ["ip:port", "ip:port"] addrs = ["etcd:2379"] @@ -13,6 +10,12 @@ addrs = ["etcd:2379"] [nats] url = "nats://nats:4222" +[sfu] +# Ballast size in MiB, will allocate memory to reduce the GC trigger upto 2x the +# size of ballast. Be aware that the ballast should be less than the half of memory +# available. +ballast = 0 + [router] # Limit the remb bandwidth in kbps # zero means no limits @@ -39,6 +42,13 @@ enabletemporallayer = false # username = "awsome" # credential = "awsome" +# sdp semantics: +# "unified-plan" +# "plan-b" +# "unified-plan-with-fallback" +sdpsemantics = "unified-plan" + +[webrtc.candidates] # In case you're deploying ion-sfu on a server which is configured with # a 1:1 NAT (e.g., Amazon EC2), you might want to also specify the public # address of the machine using the setting below. This will result in @@ -49,3 +59,7 @@ enabletemporallayer = false # if the sfu is deployed in a DMZ between two 1-1 NAT for internal and # external users. # nat1to1 = ["1.2.3.4"] +# icelite = true + +[log] +level = "info" diff --git a/examples/echotest/index.html b/examples/echotest/index.html new file mode 100644 index 000000000..345504b54 --- /dev/null +++ b/examples/echotest/index.html @@ -0,0 +1,518 @@ + + + + + + + + + + + + + Pion ion | Echotest + + + + +
+
+
+ + +
+
+ +
+
Media
+
+
+
+ Local + + +
+
+ Remote + + + + +
+
+
+
Data
+
+
+
+ + +
+
+

+        
+
+
+ + + + + + + + + diff --git a/go.mod b/go.mod index 8e5551a6a..49709a812 100644 --- a/go.mod +++ b/go.mod @@ -9,15 +9,15 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/go-redis/redis/v7 v7.4.0 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect - github.com/google/go-cmp v0.5.2 // indirect + github.com/google/go-cmp v0.5.4 // indirect github.com/google/uuid v1.1.2 github.com/gorilla/websocket v1.4.2 github.com/nats-io/nats.go v1.10.0 github.com/notedit/sdp v0.0.4 - github.com/pion/ion-avp v1.1.1 + github.com/pion/ion-avp v1.5.0 github.com/pion/ion-log v1.0.0 - github.com/pion/ion-sfu v1.2.0 - github.com/pion/webrtc/v3 v3.0.0-beta.12 + github.com/pion/ion-sfu v1.5.0 + github.com/pion/webrtc/v3 v3.0.0-beta.12.0.20201115002753-64bbf7eea97d github.com/sourcegraph/jsonrpc2 v0.0.0-20200429184054-15c2290dcb37 github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.7.1 diff --git a/go.sum b/go.sum index 578f11a04..3edefe4b6 100644 --- a/go.sum +++ b/go.sum @@ -126,8 +126,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -196,8 +196,8 @@ github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lucas-clemente/quic-go v0.18.0/go.mod h1:yXttHsSNxQi8AWijC/vLP+OJczXqzHSOcJrM5ITUlCg= -github.com/lucas-clemente/quic-go v0.18.1 h1:DMR7guC0NtVS8zNZR3IO7NARZvZygkSC56GGtC6cyys= -github.com/lucas-clemente/quic-go v0.18.1/go.mod h1:yXttHsSNxQi8AWijC/vLP+OJczXqzHSOcJrM5ITUlCg= +github.com/lucas-clemente/quic-go v0.19.1 h1:J9TkQJGJVOR3UmGhd4zdVYwKSA0EoXbLRf15uQJ6gT4= +github.com/lucas-clemente/quic-go v0.19.1/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= @@ -205,12 +205,13 @@ github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzR github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/marten-seemann/qpack v0.2.0/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc= +github.com/marten-seemann/qpack v0.2.1/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc= github.com/marten-seemann/qtls v0.10.0 h1:ECsuYUKalRL240rRD4Ri33ISb7kAQ3qGDlrrl55b2pc= github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs= github.com/marten-seemann/qtls-go1-15 v0.1.0/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= github.com/marten-seemann/qtls-go1-15 v0.1.1 h1:LIH6K34bPVttyXnUWixk0bzH6/N07VxbSabxn5A5gZQ= github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= -github.com/matryer/moq v0.1.3/go.mod h1:9RtPYjTnH1bSBIkpvtHkFN7nbWAnO7oRpdJkEIn6UtE= +github.com/matryer/moq v0.1.4/go.mod h1:9RtPYjTnH1bSBIkpvtHkFN7nbWAnO7oRpdJkEIn6UtE= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -269,12 +270,14 @@ github.com/pion/dtls/v2 v2.0.3 h1:3qQ0s4+TXD00rsllL8g8KQcxAs+Y/Z6oz618RXX6p14= github.com/pion/dtls/v2 v2.0.3/go.mod h1:TUjyL8bf8LH95h81Xj7kATmzMRt29F/4lxpIPj2Xe4Y= github.com/pion/ice/v2 v2.0.9 h1:oHbiN6Q9tgb8Gfu3I4cbr5mHRE1uqiuFABQ8CbWjIyk= github.com/pion/ice/v2 v2.0.9/go.mod h1:NK+o39ynb+N1YSj9fPgWs3vjVcrsWw0KCr/311MqVq8= -github.com/pion/ion-avp v1.1.1 h1:bpKfN8QsPYBXtiHhLfMJosAoANqBHgjeJE5hHafuWIs= -github.com/pion/ion-avp v1.1.1/go.mod h1:/NT1pE6m7y5eZDGXOn4clpkfz4RkuNKHt3GjyU4VUJs= +github.com/pion/ice/v2 v2.0.10 h1:MrT9JfH41YwB6kLm5ZJLaPillBM4MEjPZa3hWDBLGxo= +github.com/pion/ice/v2 v2.0.10/go.mod h1:Sqdo0oy3ZkaOCsK7Ai9ksLpJkREG03R3fHMJ0PXmHO8= +github.com/pion/ion-avp v1.5.0 h1:eLTh7/g9PBXRA0JaARP0aCExah9tjpndVIJZWpuqjSY= +github.com/pion/ion-avp v1.5.0/go.mod h1:ujZMh8f73e9oyLtc0qVLqL/R5XY5Orlhk16LxtzMKOk= github.com/pion/ion-log v1.0.0 h1:2lJLImCmfCWCR38hLWsjQfBWe6NFz/htbqiYHwvOP/Q= github.com/pion/ion-log v1.0.0/go.mod h1:jwcla9KoB9bB/4FxYDSRJPcPYSLp5XiUUMnOLaqwl4E= -github.com/pion/ion-sfu v1.2.0 h1:3DYO0out8EKUU2ceKcSQy2D31QYBJZyulJYse7jBOHI= -github.com/pion/ion-sfu v1.2.0/go.mod h1:fG9wIDXGRWoVCM3M1XM9thm4qGj5YCLRWLeGyJ+t0M0= +github.com/pion/ion-sfu v1.5.0 h1:97yQxc/V8kBGGw5rN5/0YGxFSy2ILKSB5JrAZ4vNw9A= +github.com/pion/ion-sfu v1.5.0/go.mod h1:AHnKv6zXtnBZqTXmARKtUg27lhH5dQHlfjf1TZoWSAY= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY= @@ -290,8 +293,6 @@ github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= github.com/pion/sctp v1.7.11 h1:UCnj7MsobLKLuP/Hh+JMiI/6W5Bs/VF45lWKgHFjSIE= github.com/pion/sctp v1.7.11/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0= -github.com/pion/sdp/v2 v2.4.0 h1:luUtaETR5x2KNNpvEMv/r4Y+/kzImzbz4Lm1z8eQNQI= -github.com/pion/sdp/v2 v2.4.0/go.mod h1:L2LxrOpSTJbAns244vfPChbciR/ReU1KWfG04OpkR7E= github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g= github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk= github.com/pion/srtp v1.5.2 h1:25DmvH+fqKZDqvX64vTwnycVwL9ooJxHF/gkX16bDBY= @@ -308,9 +309,8 @@ github.com/pion/turn/v2 v2.0.5 h1:iwMHqDfPEDEOFzwWKT56eFmh6DYC6o/+xnLAEzgISbA= github.com/pion/turn/v2 v2.0.5/go.mod h1:APg43CFyt/14Uy7heYUOGWdkem/Wu4PhCO/bjyrTqMw= github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI= github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths= -github.com/pion/webrtc/v3 v3.0.0-beta.11/go.mod h1:UbmDN5G82nXLXAiSIo0HYU68GN6z09jeKSNEaDUzFvY= -github.com/pion/webrtc/v3 v3.0.0-beta.12 h1:Civb1OA2ACJ3jXrqU1qrRY9stecLI3pZPnILAX3IWJ4= -github.com/pion/webrtc/v3 v3.0.0-beta.12/go.mod h1:UbmDN5G82nXLXAiSIo0HYU68GN6z09jeKSNEaDUzFvY= +github.com/pion/webrtc/v3 v3.0.0-beta.12.0.20201115002753-64bbf7eea97d h1:31CZJrqVx36tw/O31uQtcPYzrlHXNjLYghCrxhU+1Tg= +github.com/pion/webrtc/v3 v3.0.0-beta.12.0.20201115002753-64bbf7eea97d/go.mod h1:UbmDN5G82nXLXAiSIo0HYU68GN6z09jeKSNEaDUzFvY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -442,8 +442,8 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= -golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9 h1:umElSU9WZirRdgu2yFHY0ayQkEnKiOC1TtM3fWXFnoU= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -495,6 +495,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c/go.mod h1:iQL9McJNjoIa5mjH6nYTCTZXUN6RP+XW3eib7Ya3XcI= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -533,8 +535,8 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 h1:iCaAy5bMeEvwANu3YnJfWwI0kWAGkEa2RXPdweI/ysk= -golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201113233024-12cec1faf1ba h1:xmhUJGQGbxlod18iJGqVEp9cHIPLl7QiX2aA3to708s= +golang.org/x/sys v0.0.0-20201113233024-12cec1faf1ba/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/node/avp/sfu.go b/pkg/node/avp/sfu.go index 6a9f2b3b2..1a52264ec 100644 --- a/pkg/node/avp/sfu.go +++ b/pkg/node/avp/sfu.go @@ -88,48 +88,36 @@ func (s *sfu) join(rid proto.RID, mid proto.MID) (*iavp.WebRTCTransport, *nats.S t := iavp.NewWebRTCTransport(string(rid), s.config) - var pendingDescriptions []webrtc.ICECandidateInit + var pendingDescriptions []*proto.SfuTrickleMsg var hasRemoteDescription util.AtomicBool // handle sfu message - rpcID := nid + "-" + string(rid) - sub, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) { + sub, err := nrpc.Subscribe(string(mid), func(msg interface{}) (interface{}, error) { log.Infof("handle sfu message: %T, %+v", msg, msg) switch v := msg.(type) { case *proto.SfuTrickleMsg: log.Infof("got remote candidate: %v", v.Candidate) if hasRemoteDescription.Get() { - if err := t.AddICECandidate(v.Candidate); err != nil { + if err := t.AddICECandidate(v.Candidate, v.Target); err != nil { log.Errorf("add ice candidate error: %s", err) return nil, err } } else { log.Infof("pending remote candidate: %v", v.Candidate) - pendingDescriptions = append(pendingDescriptions, v.Candidate) + pendingDescriptions = append(pendingDescriptions, v) } case *proto.SfuOfferMsg: - log.Infof("got remote description: %v", v.Jsep) - if err := t.SetRemoteDescription(v.Jsep); err != nil { - log.Errorf("set remote description error: ", err) - return nil, err - } - - answer, err := t.CreateAnswer() + log.Infof("got remote description: %v", v.Desc) + answer, err := t.Answer(v.Desc) if err != nil { log.Errorf("create answer error: ", err) return nil, err } - - if err = t.SetLocalDescription(answer); err != nil { - log.Errorf("set local description error: ", err) - return nil, err - } - log.Infof("send description to [%s]: %v", s.client, answer) if err := nrpc.Publish(s.client, proto.SfuAnswerMsg{ - MID: mid, - RTCInfo: proto.RTCInfo{Jsep: answer}, + MID: mid, + Desc: answer, }); err != nil { log.Errorf("send description to [%s] error: %v", s.client, err) return nil, err @@ -150,15 +138,10 @@ func (s *sfu) join(rid proto.RID, mid proto.MID) (*iavp.WebRTCTransport, *nats.S log.Errorf("creating offer error: %v", err) return nil, nil, err } - if err = t.SetLocalDescription(offer); err != nil { - log.Errorf("set local description error: %v", err) - return nil, nil, err - } req := proto.ToSfuJoinMsg{ - RPCID: rpcID, - MID: mid, - RID: rid, - RTCInfo: proto.RTCInfo{Jsep: offer}, + MID: mid, + RID: rid, + Offer: offer, } log.Infof("join to [%s]: %v", s.client, req) resp, err := nrpc.Request(s.client, req) @@ -172,20 +155,20 @@ func (s *sfu) join(rid proto.RID, mid proto.MID) (*iavp.WebRTCTransport, *nats.S return nil, nil, errors.New("join reply msg parses failed") } log.Infof("join reply: %v", msg) - if err := t.SetRemoteDescription(msg.Jsep); err != nil { + if err := t.SetRemoteDescription(msg.Answer); err != nil { log.Errorf("Error set remote description: %s", err) return nil, nil, err } hasRemoteDescription.Set(true) for _, c := range pendingDescriptions { - if err := t.AddICECandidate(c); err != nil { + if err := t.AddICECandidate(c.Candidate, c.Target); err != nil { log.Errorf("add ice candidate error: %s", err) } } // send candidates to sfu - t.OnICECandidate(func(c *webrtc.ICECandidate) { + t.OnICECandidate(func(c *webrtc.ICECandidate, target int) { if c == nil { log.Infof("candidates gathering done") return @@ -193,6 +176,7 @@ func (s *sfu) join(rid proto.RID, mid proto.MID) (*iavp.WebRTCTransport, *nats.S data := proto.SfuTrickleMsg{ MID: mid, Candidate: c.ToJSON(), + Target: target, } log.Infof("send trickle to [%s]: %v", s.client, data) if err := nrpc.Publish(s.client, data); err != nil { diff --git a/pkg/node/biz/peer.go b/pkg/node/biz/peer.go index b3a394fb8..911fb2b2f 100644 --- a/pkg/node/biz/peer.go +++ b/pkg/node/biz/peer.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "sync" "github.com/google/uuid" "github.com/gorilla/websocket" @@ -19,23 +18,25 @@ import ( // peer represents a peer for client type peer struct { - id proto.UID + uid proto.UID + mid proto.MID + rid proto.RID + info []byte conn *jsonrpc2.Conn ctx context.Context closed util.AtomicBool onCloseFun func() - rooms map[proto.RID]proto.MID - roomLook sync.Mutex auth func(proto.Authenticatable) error } // newPeer create peer instance for client -func newPeer(ctx context.Context, c *websocket.Conn, id proto.UID, auth func(proto.Authenticatable) error) *peer { +func newPeer(ctx context.Context, c *websocket.Conn, auth func(proto.Authenticatable) error) *peer { + id := uuid.New().String() p := &peer{ - ctx: ctx, - id: id, - rooms: make(map[proto.RID]proto.MID), - auth: auth, + ctx: ctx, + uid: proto.UID(id), // TODO: may be improve + mid: proto.MID(id), + auth: auth, } p.conn = jsonrpc2.NewConn(ctx, websocketjsonrpc2.NewObjectStream(c), p) return p @@ -127,75 +128,69 @@ func (p *peer) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Re } } -// join client join the room -func (p *peer) join(msg *proto.FromClientJoinMsg) (interface{}, error) { - log.Infof("peer join: uid=%s, msg=%v", p.id, msg) - - rid := proto.RID(msg.RID) - uid := p.id - - // validate - if rid == "" { - return nil, errors.New("room not found") - } - sdpInfo, err := sdp.Parse(msg.Jsep.SDP) - if err != nil { - return nil, errors.New("sdp not found") - } - - // join room - p.roomLook.Lock() - mid, joined := p.rooms[rid] - if !joined { - mid = proto.MID(uuid.New().String()) - p.rooms[rid] = mid - } - p.roomLook.Unlock() - if joined { - return nil, errors.New("peer already exists") - } - addPeer(rid, p) - - // get islb and sfu node - islb := getIslb() - if islb == "" { - return nil, errors.New("islb-node not found") - } - sfu, err := getNode("sfu", islb, uid, rid, mid) - if err != nil { - log.Errorf("getting sfu-node: %v", err) - return nil, errors.New("sfu-node not found") - } - - // join to islb - resp, err := nrpc.Request(islb, proto.ToIslbPeerJoinMsg{ - UID: uid, RID: rid, MID: mid, Info: msg.Info, - }) - if err != nil { - log.Errorf("IslbClientOnJoin failed %v", err) - } - fromIslbPeerJoinMsg := resp.(*proto.FromIslbPeerJoinMsg) - - // handle sfu message - rpcID := string(uid) - sub, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) { - log.Infof("peer(%s) handle sfu message: %+v", uid, msg) +// handleSFURequest handle sfu request +func (p *peer) handleSFURequest(islb, sfu string) { + sub, err := nrpc.Subscribe(string(p.mid), func(msg interface{}) (interface{}, error) { + log.Infof("peer(%s) handle sfu message: %T, %+v", p.uid, msg, msg) switch v := msg.(type) { case *proto.SfuOfferMsg: - log.Infof("peer(%s) got remote description: %s", uid, v.Jsep) - if err := p.notify(proto.ClientOffer, proto.ClientOfferMsg{ - RID: rid, - MID: v.MID, - RTCInfo: v.RTCInfo, + log.Infof("peer(%s) got remote description: %s", p.uid, v.Desc) + + // join to islb + resp, err := nrpc.Request(islb, proto.ToIslbPeerJoinMsg{ + UID: p.uid, RID: p.rid, MID: p.mid, Info: p.info, + }) + if err != nil { + log.Errorf("IslbClientOnJoin failed %v", err) + } + fromIslbPeerJoinMsg := resp.(*proto.FromIslbPeerJoinMsg) + if err := p.notify(proto.ClientPeers, proto.ToClientPeersMsg{ + Peers: fromIslbPeerJoinMsg.Peers, + Streams: fromIslbPeerJoinMsg.Streams, }); err != nil { log.Errorf("error sending offer %s", err) } + + // join to avp + var avp string + if len(avpElements) > 0 { + if avp, err = getNode("avp", islb, p.uid, p.rid, p.mid); err != nil { + log.Errorf("get avp-node error: %v", err) + } + } + if avp != "" { + sdpInfo, err := sdp.Parse(v.Desc.SDP) + if err != nil { + log.Errorf("parse sdp error: %v", err) + } + for _, eid := range avpElements { + for _, stream := range sdpInfo.GetStreams() { + tracks := stream.GetTracks() + for _, track := range tracks { + err = nrpc.Publish(avp, proto.ToAvpProcessMsg{ + Addr: sfu, + PID: stream.GetID(), + RID: string(p.rid), + TID: track.GetID(), + EID: eid, + Config: []byte{}, + }) + if err != nil { + log.Errorf("avp process failed %v", err) + } + } + } + } + } + + if err := p.notify(proto.ClientOffer, v.Desc); err != nil { + log.Errorf("error sending offer %s", err) + } case *proto.SfuTrickleMsg: - log.Infof("peer(%s) got a remote candidate: %s", uid, v.Candidate) + log.Infof("peer(%s) got a remote candidate: %s", p.uid, v.Candidate) if err := p.notify(proto.ClientTrickle, proto.ClientTrickleMsg{ - RID: rid, - MID: v.MID, Candidate: proto.CandidateForJSON(v.Candidate), + Target: v.Target, }); err != nil { log.Errorf("error sending ice candidate %s", err) } @@ -206,106 +201,107 @@ func (p *peer) join(msg *proto.FromClientJoinMsg) (interface{}, error) { }) if err != nil { log.Errorf("subscribe sfu failed: %v", err) - return nil, errors.New("subscribe sfu failed") } + p.setCloseFun(func() { sub.Unsubscribe() }) +} + +// join client join the room +func (p *peer) join(msg *proto.FromClientJoinMsg) (interface{}, error) { + log.Infof("peer join: uid=%s, msg=%v", p.uid, msg) + + p.rid = msg.RID + p.info = msg.Info + + // validate + if p.rid == "" { + return nil, errors.New("room not found") + } + + // join room + addPeer(p.rid, p) + + // get islb and sfu node + islb := getIslb() + if islb == "" { + return nil, errors.New("islb-node not found") + } + sfu, err := getNode("sfu", islb, p.uid, p.rid, p.mid) + if err != nil { + log.Errorf("getting sfu-node: %v", err) + return nil, errors.New("sfu-node not found") + } + + // handle sfu message + p.handleSFURequest(islb, sfu) // join to sfu - resp, err = nrpc.Request(sfu, proto.ToSfuJoinMsg{ - RPCID: rpcID, - MID: mid, - RID: rid, - RTCInfo: msg.RTCInfo, + resp, err := nrpc.Request(sfu, proto.ToSfuJoinMsg{ + MID: p.mid, + RID: p.rid, + Offer: msg.Offer, }) if err != nil { return nil, err } fromSfuJoinMsg := resp.(*proto.FromSfuJoinMsg) + return fromSfuJoinMsg.Answer, nil +} + +// offer client send offer to biz +func (p *peer) offer(msg *proto.ClientOfferMsg) (interface{}, error) { + log.Infof("peer offer: uid=%s, msg=%v", p.uid, msg) + + islb := getIslb() + if islb == "" { + return nil, errors.New("islb-node not found") + } + // associate the stream in the SDP with the UID/RID/MID. + sdpInfo, err := sdp.Parse(msg.Desc.SDP) + if err != nil { + log.Errorf("parse sdp error: %v", err) + } for key := range sdpInfo.GetStreams() { nrpc.Publish(islb, proto.ToIslbStreamAddMsg{ - UID: uid, RID: rid, MID: mid, StreamID: proto.StreamID(key), + UID: p.uid, RID: p.rid, MID: p.mid, StreamID: proto.StreamID(key), }) } - // join to avp - var avp string - if len(avpElements) > 0 { - if avp, err = getNode("avp", islb, uid, rid, mid); err != nil { - log.Errorf("get avp-node error: %v", err) - } - } - if avp != "" { - for _, eid := range avpElements { - for _, stream := range sdpInfo.GetStreams() { - tracks := stream.GetTracks() - for _, track := range tracks { - err = nrpc.Publish(avp, proto.ToAvpProcessMsg{ - Addr: sfu, - PID: stream.GetID(), - RID: string(rid), - TID: track.GetID(), - EID: eid, - Config: []byte{}, - }) - if err != nil { - log.Errorf("avp process failed %v", err) - } - } - } - } - } - - return proto.ToClientJoinMsg{ - Peers: fromIslbPeerJoinMsg.Peers, - Streams: fromIslbPeerJoinMsg.Streams, - MID: mid, - RTCInfo: fromSfuJoinMsg.RTCInfo, - }, nil -} - -// offer client send offer to biz -func (p *peer) offer(msg *proto.ClientOfferMsg) (interface{}, error) { - log.Infof("peer offer: uid=%s, msg=%v", p.id, msg) - - sfu, err := getNode("sfu", "", p.id, msg.RID, msg.MID) + sfu, err := getNode("sfu", islb, p.uid, p.rid, p.mid) if err != nil { log.Warnf("sfu-node not found, %s", err.Error()) return nil, err } resp, err := nrpc.Request(sfu, proto.SfuOfferMsg{ - MID: msg.MID, - RTCInfo: proto.RTCInfo{Jsep: msg.Jsep}, + MID: p.mid, + Desc: msg.Desc, }) if err != nil { log.Errorf("offer %s failed %v", sfu, err.Error()) return nil, err } - return proto.ClientAnswerMsg{ - RID: msg.RID, - MID: msg.MID, - RTCInfo: resp.(*proto.SfuAnswerMsg).RTCInfo, - }, nil + return resp.(*proto.SfuAnswerMsg).Desc, nil } // answer received answer of client func (p *peer) answer(msg *proto.ClientAnswerMsg) error { - log.Infof("peer answer: uid=%s, msg=%v", p.id, msg) + log.Infof("peer answer: uid=%s, msg=%v", p.uid, msg) - sfu, err := getNode("sfu", "", p.id, msg.RID, msg.MID) + sfu, err := getNode("sfu", "", p.uid, p.rid, p.mid) if err != nil { log.Warnf("sfu-node not found, %s", err.Error()) return err } if _, err := nrpc.Request(sfu, proto.SfuAnswerMsg{ - MID: msg.MID, - RTCInfo: msg.RTCInfo, + MID: p.mid, + Desc: msg.Desc, }); err != nil { log.Errorf("answer %s error: %v", sfu, err.Error()) return err @@ -316,21 +312,18 @@ func (p *peer) answer(msg *proto.ClientAnswerMsg) error { // trickle received candidate of client func (p *peer) trickle(msg *proto.ClientTrickleMsg) error { - log.Infof("peer trickle: uid=%s, msg=%v", p.id, msg) + log.Infof("peer trickle: uid=%s, msg=%v", p.uid, msg) - if msg.RID == "" { - return errors.New("room not found") - } - - sfu, err := getNode("sfu", "", p.id, msg.RID, msg.MID) + sfu, err := getNode("sfu", "", p.uid, p.rid, p.mid) if err != nil { log.Warnf("sfu-node not found, %s", err.Error()) return err } _, err = nrpc.Request(sfu, proto.SfuTrickleMsg{ - MID: msg.MID, + MID: p.mid, Candidate: msg.Candidate, + Target: msg.Target, }) if err != nil { log.Errorf("trickle %s error: %s", sfu, err.Error()) @@ -342,18 +335,15 @@ func (p *peer) trickle(msg *proto.ClientTrickleMsg) error { // leave client leave the room func (p *peer) leave(msg *proto.FromClientLeaveMsg) error { - log.Infof("peer leave: uid=%s, msg=%v", p.id, msg) + log.Infof("peer leave: uid=%s, msg=%v", p.uid, msg) // leave room - p.roomLook.Lock() - delete(p.rooms, msg.RID) - p.roomLook.Unlock() room := getRoom(msg.RID) if room == nil { log.Warnf("room not exits, rid=", msg.RID) return errors.New("room not found") } - room.delPeer(msg.UID) + room.delPeer(p.uid) islb := getIslb() if islb == "" { @@ -362,17 +352,17 @@ func (p *peer) leave(msg *proto.FromClientLeaveMsg) error { } if _, err := nrpc.Request(islb, proto.IslbPeerLeaveMsg{ - RoomInfo: proto.RoomInfo{UID: msg.UID, RID: msg.RID}, + RoomInfo: proto.RoomInfo{UID: p.uid, RID: msg.RID}, }); err != nil { log.Errorf("leave %s error: %v", islb, err.Error()) } - sfu, err := getNode("sfu", islb, msg.UID, msg.RID, msg.MID) + sfu, err := getNode("sfu", islb, p.uid, msg.RID, p.mid) if err != nil { log.Errorf("sfu-node not found: %s", err) } if _, err := nrpc.Request(sfu, proto.ToSfuLeaveMsg{ - MID: msg.MID, + MID: p.mid, }); err != nil { log.Errorf("leave %s error: %v", sfu, err.Error()) } @@ -382,7 +372,7 @@ func (p *peer) leave(msg *proto.FromClientLeaveMsg) error { // Broadcast peer send message to peers of room func (p *peer) broadcast(msg *proto.FromClientBroadcastMsg) error { - log.Infof("peer broadcast: uid=%s, msg=%v", p.id, msg) + log.Infof("peer broadcast: uid=%s, msg=%v", p.uid, msg) // Validate if msg.RID == "" { @@ -396,7 +386,7 @@ func (p *peer) broadcast(msg *proto.FromClientBroadcastMsg) error { // TODO: nrpc.Publish(roomID, ... err := nrpc.Publish(islb, proto.IslbBroadcastMsg{ - RoomInfo: proto.RoomInfo{UID: p.id, RID: msg.RID}, + RoomInfo: proto.RoomInfo{UID: p.uid, RID: msg.RID}, Info: msg.Info, }) if err != nil { @@ -444,16 +434,7 @@ func (p *peer) close() { p.closed.Set(true) // leave all rooms - p.roomLook.Lock() - rooms := p.rooms - p.roomLook.Unlock() - for rid, mid := range rooms { - p.leave(&proto.FromClientLeaveMsg{ - UID: p.id, - RID: rid, - MID: mid, - }) - } + p.leave(&proto.FromClientLeaveMsg{RID: p.rid}) if p.onCloseFun != nil { p.onCloseFun() diff --git a/pkg/node/biz/room.go b/pkg/node/biz/room.go index 9e32fdaa2..d8a449279 100644 --- a/pkg/node/biz/room.go +++ b/pkg/node/biz/room.go @@ -42,7 +42,7 @@ func (r *room) ID() proto.RID { func (r *room) addPeer(p *peer) { r.Lock() defer r.Unlock() - r.peers[p.id] = p + r.peers[p.uid] = p } // getPeer get a peer by peer id @@ -116,7 +116,7 @@ func delPeer(rid proto.RID, uid proto.UID) { // addPeer add a peer to room func addPeer(rid proto.RID, peer *peer) { - log.Infof("AddPeer rid=%s uid=%s", rid, peer.id) + log.Infof("AddPeer rid=%s uid=%s", rid, peer.uid) room := getRoom(rid) if room == nil { room = newRoom(rid) diff --git a/pkg/node/biz/server.go b/pkg/node/biz/server.go index fada7c27e..cf6523eff 100644 --- a/pkg/node/biz/server.go +++ b/pkg/node/biz/server.go @@ -114,21 +114,10 @@ func (s *server) start(conf signalConf) { } http.Handle(conf.WebSocketPath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // get user id - parms := r.URL.Query() - id := parms["id"] - if id == nil || len(id) < 1 { - log.Errorf("invalid id") - http.Error(w, "invalid id", http.StatusForbidden) - return - } - uid := proto.UID(id[0]) - log.Infof("peer connected, uid=%s", uid) - // authenticate connection var cc *claims if conf.AuthConnection.Enabled { - if token := parms["token"]; token != nil && len(token) > 0 { + if token := r.URL.Query()["token"]; token != nil && len(token) > 0 { // passing nil for keyFunc, since token is expected to be already verified (by a proxy) t, err := jwt.ParseWithClaims(token[0], &claims{}, conf.AuthConnection.KeyFunc) if err != nil { @@ -158,16 +147,16 @@ func (s *server) start(conf signalConf) { defer ws.Close() // create a peer - p := newPeer(r.Context(), ws, uid, auth) + p := newPeer(r.Context(), ws, auth) defer p.close() // wait the peer disconnecting select { case <-p.disconnectNotify(): - log.Infof("peer disconnected, uid=%s", p.id) + log.Infof("peer disconnected, uid=%s", p.uid) break case <-s.closed: - log.Infof("server closed, disconnect peer, uid=%s", p.id) + log.Infof("server closed, disconnect peer, uid=%s", p.uid) break } })) diff --git a/pkg/node/sfu/internal.go b/pkg/node/sfu/internal.go index c0b327059..57a02b190 100644 --- a/pkg/node/sfu/internal.go +++ b/pkg/node/sfu/internal.go @@ -32,11 +32,9 @@ func handleRequest(rpcID string) (*nats.Subscription, error) { } func join(msg *proto.ToSfuJoinMsg) (interface{}, error) { - log.Infof("join msg=%v", msg) - - peer := s.addPeer(msg.MID) + peer := s.getPeer(msg.MID) - answer, err := peer.Join(string(msg.RID), msg.Jsep) + answer, err := peer.Join(string(msg.RID), msg.Offer) if err != nil { log.Errorf("join error: %v", err) return nil, err @@ -45,50 +43,50 @@ func join(msg *proto.ToSfuJoinMsg) (interface{}, error) { // Notify user of new ice candidate peer.OnOffer = func(offer *webrtc.SessionDescription) { data := proto.SfuOfferMsg{ - MID: msg.MID, - RTCInfo: proto.RTCInfo{Jsep: *offer}, + MID: msg.MID, + Desc: *offer, } - log.Infof("send offer to [%s]: %v", msg.RPCID, data) - if err := nrpc.Publish(msg.RPCID, data); err != nil { + log.Infof("send offer to [%s]: %v", msg.MID, data) + if err := nrpc.Publish(string(msg.MID), data); err != nil { log.Errorf("send offer: %v", err) } } // Notify user of new offer - peer.OnIceCandidate = func(candidate *webrtc.ICECandidateInit) { + peer.OnIceCandidate = func(candidate *webrtc.ICECandidateInit, target int) { data := proto.SfuTrickleMsg{ MID: msg.MID, Candidate: *candidate, + Target: target, } - log.Infof("send candidate to [%s]: %v", msg.RPCID, data) - if err := nrpc.Publish(msg.RPCID, data); err != nil { - log.Errorf("send candidate to [%s] error: %v", msg.RPCID, err) + log.Infof("send candidate to [%s]: %v", msg.MID, data) + if err := nrpc.Publish(string(msg.MID), data); err != nil { + log.Errorf("send candidate to [%s] error: %v", msg.MID, err) } } // return answer - resp := proto.FromSfuJoinMsg{RTCInfo: proto.RTCInfo{Jsep: *answer}} + resp := proto.FromSfuJoinMsg{Answer: *answer} log.Infof("reply join: %v", resp) return resp, nil } func offer(msg *proto.SfuOfferMsg) (interface{}, error) { - log.Infof("offer msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) return nil, errors.New("peer not found") } - answer, err := peer.Answer(msg.Jsep) + answer, err := peer.Answer(msg.Desc) if err != nil { log.Errorf("peer.Answer: %v", err) return nil, errors.New("peer.Answer error") } resp := proto.SfuAnswerMsg{ - MID: msg.MID, - RTCInfo: proto.RTCInfo{Jsep: *answer}, + MID: msg.MID, + Desc: *answer, } log.Infof("reply answer: %v", resp) @@ -97,7 +95,6 @@ func offer(msg *proto.SfuOfferMsg) (interface{}, error) { } func leave(msg *proto.ToSfuLeaveMsg) (interface{}, error) { - log.Infof("leave msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) @@ -113,14 +110,13 @@ func leave(msg *proto.ToSfuLeaveMsg) (interface{}, error) { } func answer(msg *proto.SfuAnswerMsg) (interface{}, error) { - log.Infof("answer msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) return nil, errors.New("peer not found") } - if err := peer.SetRemoteDescription(msg.Jsep); err != nil { + if err := peer.SetRemoteDescription(msg.Desc); err != nil { log.Errorf("set remote description error: %v", err) return nil, errors.New("set remote description error") } @@ -128,14 +124,13 @@ func answer(msg *proto.SfuAnswerMsg) (interface{}, error) { } func trickle(msg *proto.SfuTrickleMsg) (interface{}, error) { - log.Infof("trickle msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) return nil, errors.New("peer not found") } - if err := peer.Trickle(msg.Candidate); err != nil { + if err := peer.Trickle(msg.Candidate, msg.Target); err != nil { return nil, errors.New("error adding ice candidate") } diff --git a/pkg/node/sfu/server.go b/pkg/node/sfu/server.go index e7e1c273d..5548f98b9 100644 --- a/pkg/node/sfu/server.go +++ b/pkg/node/sfu/server.go @@ -20,18 +20,15 @@ func newServer(config sfu.Config) *server { } } -func (s *server) addPeer(mid proto.MID) *sfu.Peer { - p := sfu.NewPeer(s.sfu) +func (s *server) getPeer(mid proto.MID) *sfu.Peer { s.mu.Lock() defer s.mu.Unlock() - s.peers[mid] = &p - return &p -} - -func (s *server) getPeer(mid proto.MID) *sfu.Peer { - s.mu.RLock() - defer s.mu.RUnlock() - return s.peers[mid] + p := s.peers[mid] + if p == nil { + p = sfu.NewPeer(s.sfu) + s.peers[mid] = p + } + return p } func (s *server) delPeer(mid proto.MID) { diff --git a/pkg/proto/biz.go b/pkg/proto/biz.go index 5dcc83485..5fc0ba202 100644 --- a/pkg/proto/biz.go +++ b/pkg/proto/biz.go @@ -10,6 +10,7 @@ import ( func init() { gob.Register(&FromClientJoinMsg{}) gob.Register(&ToClientJoinMsg{}) + gob.Register(&ToClientPeersMsg{}) gob.Register(&ToClientPeerJoinMsg{}) gob.Register(&ClientOfferMsg{}) gob.Register(&ClientAnswerMsg{}) @@ -61,18 +62,14 @@ type Stream struct { UID UID `json:"uid"` } -type RTCInfo struct { - Jsep webrtc.SessionDescription `json:"jsep"` -} - type TrackMap map[string][]TrackInfo // Client <-> Biz messages. type FromClientJoinMsg struct { - RID RID `json:"rid"` + RID RID `json:"sid"` + Offer webrtc.SessionDescription `json:"offer"` RoomToken - RTCInfo Info json.RawMessage `json:"info"` } @@ -84,10 +81,12 @@ func (j *FromClientJoinMsg) Room() RID { } type ToClientJoinMsg struct { + Answer webrtc.SessionDescription `json:"answer"` +} + +type ToClientPeersMsg struct { Peers []Peer `json:"peers"` Streams []Stream `json:"streams"` - MID MID `json:"mid"` - RTCInfo } type ToClientPeerJoinMsg struct { @@ -97,27 +96,20 @@ type ToClientPeerJoinMsg struct { } type ClientOfferMsg struct { - RID RID `json:"rid"` - MID MID `json:"mid"` - RTCInfo + Desc webrtc.SessionDescription `json:"desc"` } type ClientAnswerMsg struct { - RID RID `json:"rid"` - MID MID `json:"mid"` - RTCInfo + Desc webrtc.SessionDescription `json:"desc"` } type ClientTrickleMsg struct { - RID RID `json:"rid"` - MID MID `json:"mid"` Candidate webrtc.ICECandidateInit `json:"candidate"` + Target int `json:"target"` } type FromClientLeaveMsg struct { - UID UID `json:"uid"` RID RID `json:"rid"` - MID MID `json:"mid"` } type FromClientBroadcastMsg struct { @@ -133,33 +125,32 @@ type ToClientBroadcastMsg struct { // Biz to SFU type ToSfuJoinMsg struct { - RPCID string `json:"rpc"` - RID RID `json:"rid"` - MID MID `json:"mid"` - RTCInfo + RID RID `json:"rid"` + MID MID `json:"mid"` + Offer webrtc.SessionDescription `json:"offer"` } type FromSfuJoinMsg struct { - RTCInfo + Answer webrtc.SessionDescription `json:"answer"` } type ToSfuLeaveMsg struct { MID MID `json:"mid"` } - type SfuTrickleMsg struct { MID MID `json:"mid"` Candidate webrtc.ICECandidateInit `json:"candidate"` + Target int `json:"target"` } type SfuOfferMsg struct { - MID MID `json:"mid"` - RTCInfo + MID MID `json:"mid"` + Desc webrtc.SessionDescription `json:"offer"` } type SfuAnswerMsg struct { - MID MID `json:"mid"` - RTCInfo + MID MID `json:"mid"` + Desc webrtc.SessionDescription `json:"answer"` } // Biz to AVP diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index 6102b6c4e..315552c4d 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -18,6 +18,7 @@ const ( ClientTrickle = "trickle" ClientOffer = "offer" ClientAnswer = "answer" + ClientPeers = "peers" // ion to client ClientOnJoin = "peer-join"