Skip to content

Commit

Permalink
Merge pull request #721 from rod-hynes/webrtc-media-streams
Browse files Browse the repository at this point in the history
Add in-proxy WebRTC media stream mode
  • Loading branch information
rod-hynes authored Jan 31, 2025
2 parents c34e0c6 + a5f3343 commit fa267dc
Show file tree
Hide file tree
Showing 35 changed files with 2,429 additions and 786 deletions.
18 changes: 18 additions & 0 deletions psiphon/common/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package errors

import (
"fmt"
"io"
"runtime"

"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/stacktrace"
Expand Down Expand Up @@ -72,6 +73,23 @@ func Trace(err error) error {
return fmt.Errorf("%s#%d: %w", stacktrace.GetFunctionName(pc), line, err)
}

// TraceReader wraps the given error with the caller stack frame information,
// except in the case of io.EOF, which is returned unwrapped. This is used to
// preserve io.Reader.Read io.EOF error returns.
func TraceReader(err error) error {
if err == nil {
return nil
}
if err == io.EOF {
return io.EOF
}
pc, _, line, ok := runtime.Caller(1)
if !ok {
return fmt.Errorf("[unknown]: %w", err)
}
return fmt.Errorf("%s#%d: %w", stacktrace.GetFunctionName(pc), line, err)
}

// TraceMsg wraps the given error with the caller stack frame information
// and the given message.
func TraceMsg(err error, message string) error {
Expand Down
226 changes: 149 additions & 77 deletions psiphon/common/inproxy/api.go

Large diffs are not rendered by default.

71 changes: 49 additions & 22 deletions psiphon/common/inproxy/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ type BrokerConfig struct {
AllowProxy func(common.GeoIPData) bool

// PrioritizeProxy is a callback which can indicate whether proxy
// announcements from proxies with the specified GeoIPData and
// APIParameters should be prioritized in the matcher queue. Priority
// proxy announcements match ahead of other proxy announcements,
// regardless of announcement age/deadline. Priority status takes
// precedence over preferred NAT matching. Prioritization applies only to
// common compartment IDs and not personal pairing mode.
PrioritizeProxy func(common.GeoIPData, common.APIParameters) bool
// announcements from proxies with the specified in-proxy protocol
// version, GeoIPData, and APIParameters should be prioritized in the
// matcher queue. Priority proxy announcements match ahead of other proxy
// announcements, regardless of announcement age/deadline. Priority
// status takes precedence over preferred NAT matching. Prioritization
// applies only to common compartment IDs and not personal pairing mode.
PrioritizeProxy func(int, common.GeoIPData, common.APIParameters) bool

// AllowClient is a callback which can indicate whether a client with the
// given GeoIP data is allowed to match with common compartment ID
Expand Down Expand Up @@ -483,6 +483,7 @@ func (b *Broker) handleProxyAnnounce(
startTime := time.Now()

var logFields common.LogFields
var isPriority bool
var newTacticsTag string
var clientOffer *MatchOffer
var matchMetrics *MatchMetrics
Expand Down Expand Up @@ -529,6 +530,7 @@ func (b *Broker) handleProxyAnnounce(
logFields["broker_event"] = "proxy-announce"
logFields["broker_id"] = b.brokerID
logFields["proxy_id"] = proxyID
logFields["is_priority"] = isPriority
logFields["elapsed_time"] = time.Since(startTime) / time.Millisecond
logFields["connection_id"] = connectionID
if newTacticsTag != "" {
Expand All @@ -538,6 +540,7 @@ func (b *Broker) handleProxyAnnounce(
// Log the target Psiphon server ID (diagnostic ID). The presence
// of this field indicates that a match was made.
logFields["destination_server_id"] = clientOffer.DestinationServerID
logFields["use_media_streams"] = clientOffer.UseMediaStreams
}
if timedOut {
logFields["timed_out"] = true
Expand Down Expand Up @@ -571,7 +574,7 @@ func (b *Broker) handleProxyAnnounce(

// Return MustUpgrade when the proxy's protocol version is less than the
// minimum required.
if announceRequest.Metrics.ProxyProtocolVersion < MinimumProxyProtocolVersion {
if announceRequest.Metrics.ProtocolVersion < minimumProxyProtocolVersion {
responsePayload, err := MarshalProxyAnnounceResponse(
&ProxyAnnounceResponse{
NoMatch: true,
Expand Down Expand Up @@ -643,7 +646,6 @@ func (b *Broker) handleProxyAnnounce(
// In the common compartment ID case, invoke the callback to check if the
// announcement should be prioritized.

isPriority := false
if b.config.PrioritizeProxy != nil && !hasPersonalCompartmentIDs {

// Limitation: Of the two return values from
Expand All @@ -659,7 +661,8 @@ func (b *Broker) handleProxyAnnounce(
// calls for range filtering, which is not yet supported in the
// psiphon/server.MeekServer PrioritizeProxy provider.

isPriority = b.config.PrioritizeProxy(geoIPData, apiParams)
isPriority = b.config.PrioritizeProxy(
int(announceRequest.Metrics.ProtocolVersion), geoIPData, apiParams)
}

// Await client offer.
Expand All @@ -685,6 +688,7 @@ func (b *Broker) handleProxyAnnounce(
&MatchAnnouncement{
Properties: MatchProperties{
IsPriority: isPriority,
ProtocolVersion: announceRequest.Metrics.ProtocolVersion,
CommonCompartmentIDs: commonCompartmentIDs,
PersonalCompartmentIDs: announceRequest.PersonalCompartmentIDs,
GeoIPData: geoIPData,
Expand Down Expand Up @@ -746,6 +750,17 @@ func (b *Broker) handleProxyAnnounce(
return responsePayload, nil
}

// Select the protocol version. The matcher has already checked
// negotiateProtocolVersion, so failure is not expected.

negotiatedProtocolVersion, ok := negotiateProtocolVersion(
announceRequest.Metrics.ProtocolVersion,
clientOffer.Properties.ProtocolVersion,
clientOffer.UseMediaStreams)
if !ok {
return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
}

// Respond with the client offer. The proxy will follow up with an answer
// request, which is relayed to the client, and then the WebRTC dial begins.

Expand All @@ -760,10 +775,11 @@ func (b *Broker) handleProxyAnnounce(
&ProxyAnnounceResponse{
TacticsPayload: tacticsPayload,
ConnectionID: connectionID,
ClientProxyProtocolVersion: clientOffer.ClientProxyProtocolVersion,
SelectedProtocolVersion: negotiatedProtocolVersion,
ClientOfferSDP: clientOffer.ClientOfferSDP,
ClientRootObfuscationSecret: clientOffer.ClientRootObfuscationSecret,
DoDTLSRandomization: clientOffer.DoDTLSRandomization,
UseMediaStreams: clientOffer.UseMediaStreams,
TrafficShapingParameters: clientOffer.TrafficShapingParameters,
NetworkProtocol: clientOffer.NetworkProtocol,
DestinationAddress: clientOffer.DestinationAddress,
Expand Down Expand Up @@ -907,7 +923,7 @@ func (b *Broker) handleClientOffer(

// Return MustUpgrade when the client's protocol version is less than the
// minimum required.
if offerRequest.Metrics.ProxyProtocolVersion < MinimumProxyProtocolVersion {
if offerRequest.Metrics.ProtocolVersion < minimumClientProtocolVersion {
responsePayload, err := MarshalClientOfferResponse(
&ClientOfferResponse{
NoMatch: true,
Expand Down Expand Up @@ -944,17 +960,18 @@ func (b *Broker) handleClientOffer(

clientMatchOffer = &MatchOffer{
Properties: MatchProperties{
ProtocolVersion: offerRequest.Metrics.ProtocolVersion,
CommonCompartmentIDs: offerRequest.CommonCompartmentIDs,
PersonalCompartmentIDs: offerRequest.PersonalCompartmentIDs,
GeoIPData: geoIPData,
NetworkType: GetNetworkType(offerRequest.Metrics.BaseAPIParameters),
NATType: offerRequest.Metrics.NATType,
PortMappingTypes: offerRequest.Metrics.PortMappingTypes,
},
ClientProxyProtocolVersion: offerRequest.Metrics.ProxyProtocolVersion,
ClientOfferSDP: offerSDP,
ClientRootObfuscationSecret: offerRequest.ClientRootObfuscationSecret,
DoDTLSRandomization: offerRequest.DoDTLSRandomization,
UseMediaStreams: offerRequest.UseMediaStreams,
TrafficShapingParameters: offerRequest.TrafficShapingParameters,
NetworkProtocol: offerRequest.NetworkProtocol,
DestinationAddress: offerRequest.DestinationAddress,
Expand Down Expand Up @@ -1069,14 +1086,25 @@ func (b *Broker) handleClientOffer(
return nil, errors.Trace(err)
}

// Select the protocol version. The matcher has already checked
// negotiateProtocolVersion, so failure is not expected.

negotiatedProtocolVersion, ok := negotiateProtocolVersion(
proxyMatchAnnouncement.Properties.ProtocolVersion,
offerRequest.Metrics.ProtocolVersion,
offerRequest.UseMediaStreams)
if !ok {
return nil, errors.TraceNew("unexpected negotiateProtocolVersion failure")
}

// Respond with the proxy answer and initial broker/server session packet.

responsePayload, err := MarshalClientOfferResponse(
&ClientOfferResponse{
ConnectionID: proxyAnswer.ConnectionID,
SelectedProxyProtocolVersion: proxyAnswer.SelectedProxyProtocolVersion,
ProxyAnswerSDP: proxyAnswer.ProxyAnswerSDP,
RelayPacketToServer: relayPacket,
ConnectionID: proxyAnswer.ConnectionID,
SelectedProtocolVersion: negotiatedProtocolVersion,
ProxyAnswerSDP: proxyAnswer.ProxyAnswerSDP,
RelayPacketToServer: relayPacket,
})
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1180,11 +1208,10 @@ func (b *Broker) handleProxyAnswer(
// These fields are used internally in the matcher.

proxyAnswer = &MatchAnswer{
ProxyIP: proxyIP,
ProxyID: initiatorID,
ConnectionID: answerRequest.ConnectionID,
SelectedProxyProtocolVersion: answerRequest.SelectedProxyProtocolVersion,
ProxyAnswerSDP: answerSDP,
ProxyIP: proxyIP,
ProxyID: initiatorID,
ConnectionID: answerRequest.ConnectionID,
ProxyAnswerSDP: answerSDP,
}

err = b.matcher.Answer(proxyAnswer)
Expand Down
31 changes: 18 additions & 13 deletions psiphon/common/inproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ func dialClientWebRTCConn(
// Initialize the WebRTC offer

doTLSRandomization := config.WebRTCDialCoordinator.DoDTLSRandomization()
trafficShapingParameters := config.WebRTCDialCoordinator.DataChannelTrafficShapingParameters()
useMediaStreams := config.WebRTCDialCoordinator.UseMediaStreams()
trafficShapingParameters := config.WebRTCDialCoordinator.TrafficShapingParameters()
clientRootObfuscationSecret := config.WebRTCDialCoordinator.ClientRootObfuscationSecret()

webRTCConn, SDP, SDPMetrics, err := newWebRTCConnForOffer(
Expand All @@ -369,6 +370,7 @@ func dialClientWebRTCConn(
WebRTCDialCoordinator: config.WebRTCDialCoordinator,
ClientRootObfuscationSecret: clientRootObfuscationSecret,
DoDTLSRandomization: doTLSRandomization,
UseMediaStreams: useMediaStreams,
TrafficShapingParameters: trafficShapingParameters,
ReliableTransport: config.ReliableTransport,
},
Expand Down Expand Up @@ -411,17 +413,18 @@ func dialClientWebRTCConn(
ctx,
&ClientOfferRequest{
Metrics: &ClientMetrics{
BaseAPIParameters: packedParams,
ProxyProtocolVersion: proxyProtocolVersion,
NATType: config.WebRTCDialCoordinator.NATType(),
PortMappingTypes: config.WebRTCDialCoordinator.PortMappingTypes(),
BaseAPIParameters: packedParams,
ProtocolVersion: LatestProtocolVersion,
NATType: config.WebRTCDialCoordinator.NATType(),
PortMappingTypes: config.WebRTCDialCoordinator.PortMappingTypes(),
},
CommonCompartmentIDs: brokerCoordinator.CommonCompartmentIDs(),
PersonalCompartmentIDs: personalCompartmentIDs,
ClientOfferSDP: SDP,
ICECandidateTypes: SDPMetrics.iceCandidateTypes,
ClientRootObfuscationSecret: clientRootObfuscationSecret,
DoDTLSRandomization: doTLSRandomization,
UseMediaStreams: useMediaStreams,
TrafficShapingParameters: trafficShapingParameters,
PackedDestinationServerEntry: config.PackedDestinationServerEntry,
NetworkProtocol: config.DialNetworkProtocol,
Expand Down Expand Up @@ -458,11 +461,13 @@ func dialClientWebRTCConn(
return nil, true, errors.TraceNew("no match")
}

if offerResponse.SelectedProxyProtocolVersion < MinimumProxyProtocolVersion ||
offerResponse.SelectedProxyProtocolVersion > proxyProtocolVersion {
if offerResponse.SelectedProtocolVersion < ProtocolVersion1 ||
(useMediaStreams &&
offerResponse.SelectedProtocolVersion < ProtocolVersion2) ||
offerResponse.SelectedProtocolVersion > LatestProtocolVersion {
return nil, false, errors.Tracef(
"Unsupported proxy protocol version: %d",
offerResponse.SelectedProxyProtocolVersion)
"Unsupported protocol version: %d",
offerResponse.SelectedProtocolVersion)
}

// Establish the WebRTC DataChannel connection
Expand All @@ -473,13 +478,13 @@ func dialClientWebRTCConn(
return nil, true, errors.Trace(err)
}

awaitDataChannelCtx, awaitDataChannelCancelFunc := context.WithTimeout(
awaitReadyToProxyCtx, awaitReadyToProxyCancelFunc := context.WithTimeout(
ctx,
common.ValueOrDefault(
config.WebRTCDialCoordinator.WebRTCAwaitDataChannelTimeout(), dataChannelAwaitTimeout))
defer awaitDataChannelCancelFunc()
config.WebRTCDialCoordinator.WebRTCAwaitReadyToProxyTimeout(), readyToProxyAwaitTimeout))
defer awaitReadyToProxyCancelFunc()

err = webRTCConn.AwaitInitialDataChannel(awaitDataChannelCtx)
err = webRTCConn.AwaitReadyToProxy(awaitReadyToProxyCtx, offerResponse.ConnectionID)
if err != nil {
return nil, true, errors.Trace(err)
}
Expand Down
14 changes: 9 additions & 5 deletions psiphon/common/inproxy/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,14 @@ type WebRTCDialCoordinator interface {
// the value.
DoDTLSRandomization() bool

// DataChannelTrafficShapingParameters returns parameters specifying how
// to perform data channel traffic shapping -- random padding and decoy
// message. Returns nil when no traffic shaping is to be performed.
DataChannelTrafficShapingParameters() *DataChannelTrafficShapingParameters
// UseMediaStreams indicates whether to use WebRTC media streams to tunnel
// traffic. When false, a WebRTC data channel is used to tunnel traffic.
UseMediaStreams() bool

// TrafficShapingParameters returns parameters specifying how to perform
// data channel or media stream traffic shapping -- random padding and
// decoy messages. Returns nil when no traffic shaping is to be performed.
TrafficShapingParameters() *TrafficShapingParameters

// STUNServerAddress selects a STUN server to use for this dial. When
// RFC5780 is true, the STUN server must support RFC5780 NAT discovery;
Expand Down Expand Up @@ -363,7 +367,7 @@ type WebRTCDialCoordinator interface {
DiscoverNATTimeout() time.Duration
WebRTCAnswerTimeout() time.Duration
WebRTCAwaitPortMappingTimeout() time.Duration
WebRTCAwaitDataChannelTimeout() time.Duration
WebRTCAwaitReadyToProxyTimeout() time.Duration
ProxyDestinationDialTimeout() time.Duration
ProxyRelayInactivityTimeout() time.Duration
}
Loading

0 comments on commit fa267dc

Please sign in to comment.