Skip to content

Commit

Permalink
Merge pull request #43 from Jigsaw-Code/bemasc-replay-min
Browse files Browse the repository at this point in the history
Add a minimal replay defense
  • Loading branch information
Benjamin M. Schwartz authored Jan 8, 2020
2 parents 76edca1 + 681f7f9 commit 3c2b0f1
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 52 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The Outline Shadowsocks service allows for:
- Whitebox monitoring of the service using [prometheus.io](https://prometheus.io)
- Includes traffic measurements and other health indicators.
- Live updates via config change + SIGHUP
- Experimental: optional replay defense (--replay_history).

![Graphana Dashboard](https://user-images.githubusercontent.com/113565/44177062-419d7700-a0ba-11e8-9621-db519692ff6c.png "Graphana Dashboard")

Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ require (
github.com/prometheus/client_golang v0.9.2
github.com/shadowsocks/go-shadowsocks2 v0.0.11
github.com/stretchr/testify v1.2.2 // indirect
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9
golang.org/x/sys v0.0.0-20181210030007-2a47403f2ae5 // indirect
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
gopkg.in/yaml.v2 v2.2.2
)

go 1.13
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ github.com/shadowsocks/go-shadowsocks2 v0.0.11 h1:dXloqEhYnZV40jblWTK8kWeC0Eb+dg
github.com/shadowsocks/go-shadowsocks2 v0.0.11/go.mod h1:R+KWaoIwRRhnpw6XV+dZil0XHi64Hc1D7hXUyXTjUzQ=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181210030007-2a47403f2ae5 h1:SlFRMb9PEnqzqnBRCynVOhxv4vHjB2lnIoxK6p5nzFM=
golang.org/x/sys v0.0.0-20181210030007-2a47403f2ae5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
6 changes: 3 additions & 3 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ShadowsocksMetrics interface {
// TCP metrics
AddOpenTCPConnection(clientLocation string)
AddClosedTCPConnection(clientLocation, accessKey, status string, data ProxyMetrics, timeToCipher, duration time.Duration)
AddTCPProbe(clientLocation, drainResult string, port int, data ProxyMetrics)
AddTCPProbe(clientLocation, status, drainResult string, port int, data ProxyMetrics)

// UDP metrics
AddUDPPacketFromClient(clientLocation, accessKey, status string, clientProxyBytes, proxyTargetBytes int, timeToCipher time.Duration)
Expand Down Expand Up @@ -193,8 +193,8 @@ func (m *shadowsocksMetrics) AddClosedTCPConnection(clientLocation, accessKey, s
m.dataBytes.WithLabelValues("c<p", "tcp", clientLocation, status, accessKey).Add(float64(data.ProxyClient))
}

func (m *shadowsocksMetrics) AddTCPProbe(clientLocation, drainResult string, port int, data ProxyMetrics) {
m.tcpProbes.WithLabelValues(clientLocation, strconv.Itoa(port), drainResult).Observe(float64(data.ClientProxy))
func (m *shadowsocksMetrics) AddTCPProbe(clientLocation, status, drainResult string, port int, data ProxyMetrics) {
m.tcpProbes.WithLabelValues(clientLocation, strconv.Itoa(port), status, drainResult).Observe(float64(data.ClientProxy))
}

func (m *shadowsocksMetrics) AddUDPPacketFromClient(clientLocation, accessKey, status string, clientProxyBytes, proxyTargetBytes int, timeToCipher time.Duration) {
Expand Down
33 changes: 21 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ type SSPort struct {
}

type SSServer struct {
natTimeout time.Duration
m metrics.ShadowsocksMetrics
ports map[int]*SSPort
natTimeout time.Duration
m metrics.ShadowsocksMetrics
replayCache shadowsocks.ReplayCache
ports map[int]*SSPort
}

func (s *SSServer) startPort(portNum int) error {
Expand All @@ -78,7 +79,7 @@ func (s *SSServer) startPort(portNum int) error {
logger.Infof("Listening TCP and UDP on port %v", portNum)
port := &SSPort{cipherList: shadowsocks.NewCipherList()}
// TODO: Register initial data metrics at zero.
port.tcpService = shadowsocks.NewTCPService(listener, &port.cipherList, s.m, tcpReadTimeout)
port.tcpService = shadowsocks.NewTCPService(listener, &port.cipherList, &s.replayCache, s.m, tcpReadTimeout)
port.udpService = shadowsocks.NewUDPService(packetConn, s.natTimeout, &port.cipherList, s.m)
s.ports[portNum] = port
go port.udpService.Start()
Expand Down Expand Up @@ -154,8 +155,13 @@ func (s *SSServer) loadConfig(filename string) error {
return nil
}

func runSSServer(filename string, natTimeout time.Duration, sm metrics.ShadowsocksMetrics) error {
server := &SSServer{natTimeout: natTimeout, m: sm, ports: make(map[int]*SSPort)}
func runSSServer(filename string, natTimeout time.Duration, sm metrics.ShadowsocksMetrics, replayHistory int) error {
server := &SSServer{
natTimeout: natTimeout,
m: sm,
replayCache: shadowsocks.NewReplayCache(replayHistory),
ports: make(map[int]*SSPort),
}
err := server.loadConfig(filename)
if err != nil {
return fmt.Errorf("Failed to load config file %v: %v", filename, err)
Expand Down Expand Up @@ -194,16 +200,18 @@ func readConfig(filename string) (*Config, error) {

func main() {
var flags struct {
ConfigFile string
MetricsAddr string
IPCountryDB string
Verbose bool
natTimeout time.Duration
ConfigFile string
MetricsAddr string
IPCountryDB string
natTimeout time.Duration
replayHistory int
Verbose bool
}
flag.StringVar(&flags.ConfigFile, "config", "", "Configuration filename")
flag.StringVar(&flags.MetricsAddr, "metrics", "", "Address for the Prometheus metrics")
flag.StringVar(&flags.IPCountryDB, "ip_country_db", "", "Path to the GeoLite2-Country.mmdb file")
flag.DurationVar(&flags.natTimeout, "udptimeout", 5*time.Minute, "UDP tunnel timeout")
flag.IntVar(&flags.replayHistory, "replay_history", 0, "Replay buffer size (# of handshakes)")
flag.BoolVar(&flags.Verbose, "verbose", false, "Enables verbose logging output")

flag.Parse()
Expand Down Expand Up @@ -237,7 +245,8 @@ func main() {
}
defer ipCountryDB.Close()
}
err = runSSServer(flags.ConfigFile, flags.natTimeout, metrics.NewShadowsocksMetrics(ipCountryDB))
m := metrics.NewShadowsocksMetrics(ipCountryDB)
err = runSSServer(flags.ConfigFile, flags.natTimeout, m, flags.replayHistory)
if err != nil {
logger.Fatal(err)
}
Expand Down
102 changes: 102 additions & 0 deletions shadowsocks/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2020 Jigsaw Operations LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package shadowsocks

import (
"encoding/binary"
"sync"
)

// MaxCapacity is the largest allowed size of ReplayCache.
//
// Capacities in excess of 20,000 are not recommended, due to the false
// positive rate of up to 2 * capacity / 2^32 = 1 / 100,000. If larger
// capacities are desired, the key type should be changed to uint64.
const MaxCapacity = 20_000

type empty struct{}

// ReplayCache allows us to check whether a handshake salt was used within
// the last `capacity` handshakes. It requires approximately 20*capacity
// bytes of memory (as measured by BenchmarkReplayCache_Creation).
//
// The nil and zero values represent a cache with capacity 0, i.e. no cache.
type ReplayCache struct {
mutex sync.Mutex
capacity int
active map[uint32]empty
archive map[uint32]empty
}

// NewReplayCache returns a fresh ReplayCache that promises to remember at least
// the most recent `capacity` handshakes.
func NewReplayCache(capacity int) ReplayCache {
if capacity > MaxCapacity {
panic("ReplayCache capacity would result in too many false positives")
}
return ReplayCache{
capacity: capacity,
active: make(map[uint32]empty, capacity),
// `archive` is read-only and initially empty.
}
}

// Trivially reduces the key and salt to a uint32, avoiding collisions
// in case of salts with a shared prefix or suffix. Salts are normally
// random, but in principle a client might use a counter instead, so
// using only the prefix or suffix is not sufficient. Including the key
// ID in the hash avoids accidental collisions when the same salt is used
// by different access keys, as might happen in the case of a counter.
//
// Secure hashing is not required, because only authenticated handshakes
// are added to the cache. A hostile client could produce colliding salts,
// but this would not impact other users. Each map uses a new random hash
// function, so it is not trivial for a hostile client to mount an
// algorithmic complexity attack with nearly-colliding hashes:
// https://dave.cheney.net/2018/05/29/how-the-go-runtime-implements-maps-efficiently-without-generics
func preHash(id string, salt []byte) uint32 {
buf := [4]byte{}
for i := 0; i < len(id); i++ {
buf[i&0x3] ^= id[i]
}
for i, v := range salt {
buf[i&0x3] ^= v
}
return binary.BigEndian.Uint32(buf[:])
}

// Add a handshake with this key ID and salt to the cache.
// Returns false if it is already present.
func (c *ReplayCache) Add(id string, salt []byte) bool {
if c == nil || c.capacity == 0 {
// Cache is disabled, so every salt is new.
return true
}
hash := preHash(id, salt)
c.mutex.Lock()
defer c.mutex.Unlock()
if _, ok := c.active[hash]; ok {
// Fast replay: `salt` is already in the active set.
return false
}
_, inArchive := c.archive[hash]
if len(c.active) == c.capacity {
// Discard the archive and move active to archive.
c.archive = c.active
c.active = make(map[uint32]empty, c.capacity)
}
c.active[hash] = empty{}
return !inArchive
}
137 changes: 137 additions & 0 deletions shadowsocks/replay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2020 Jigsaw Operations LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package shadowsocks

import (
"encoding/binary"
"testing"
)

const keyID = "the key"

var counter uint32 = 0

func makeSalts(n int) [][]byte {
salts := make([][]byte, n)
for i := 0; i < n; i++ {
salts[i] = make([]byte, 4)
binary.BigEndian.PutUint32(salts[i], counter)
counter++
if counter == 0 {
panic("Salt counter overflow")
}
}
return salts
}

func TestReplayCache_Active(t *testing.T) {
salts := makeSalts(2)
cache := NewReplayCache(10)
if !cache.Add(keyID, salts[0]) {
t.Error("First addition to a clean cache should succeed")
}
if cache.Add(keyID, salts[0]) {
t.Error("Duplicate add should fail")
}
if !cache.Add(keyID, salts[1]) {
t.Error("Addition of a new vector should succeed")
}
if cache.Add(keyID, salts[1]) {
t.Error("Second duplicate add should fail")
}
}

func TestReplayCache_Archive(t *testing.T) {
salts0 := makeSalts(10)
salts1 := makeSalts(10)
cache := NewReplayCache(10)
// Add vectors to the active set until it hits the limit
// and spills into the archive.
for _, s := range salts0 {
if !cache.Add(keyID, s) {
t.Error("Addition of a new vector should succeed")
}
}

for _, s := range salts0 {
if cache.Add(keyID, s) {
t.Error("Duplicate add should fail")
}
}

// Repopulate the active set.
for _, s := range salts1 {
if !cache.Add(keyID, s) {
t.Error("Addition of a new vector should succeed")
}
}

// Both active and archive are full. Adding another vector
// should wipe the archive.
lastStraw := makeSalts(1)[0]
if !cache.Add(keyID, lastStraw) {
t.Error("Addition of a new vector should succeed")
}
for _, s := range salts0 {
if !cache.Add(keyID, s) {
t.Error("First 10 vectors should have been forgotten")
}
}
}

// Benchmark to determine the memory usage of ReplayCache.
// Note that NewReplayCache only allocates the active set,
// so the eventual memory usage will be roughly double.
func BenchmarkReplayCache_Creation(b *testing.B) {
for i := 0; i < b.N; i++ {
NewReplayCache(MaxCapacity)
}
}

func BenchmarkReplayCache_Max(b *testing.B) {
salts := makeSalts(b.N)
// Archive replacements will be infrequent.
cache := NewReplayCache(MaxCapacity)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Add(keyID, salts[i])
}
}

func BenchmarkReplayCache_Min(b *testing.B) {
salts := makeSalts(b.N)
// Every addition will archive the active set.
cache := NewReplayCache(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Add(keyID, salts[i])
}
}

func BenchmarkReplayCache_Parallel(b *testing.B) {
c := make(chan []byte, b.N)
for _, s := range makeSalts(b.N) {
c <- s
}
close(c)
// Exercise both expansion and archiving.
cache := NewReplayCache(100)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
cache.Add(keyID, <-c)
}
})
}
Loading

0 comments on commit 3c2b0f1

Please sign in to comment.