Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/virt subnet #27

Merged
merged 3 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions internal/agent/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func subcribe(ec *nats.EncodedConn) {
}
deleteAllNetworks()
deleteAllInterfaces()
if err := saveServerNetworks(resp.Networks); err != nil {
if err := saveServerNetworks(self, resp.Networks); err != nil {
slog.Error("save networks", "error", err)
}
startAllInterfaces(self)
Expand Down Expand Up @@ -137,13 +137,19 @@ func subcribe(ec *nats.EncodedConn) {
}
return
}
if err := resetPeersOnNetworkInterface(self, network); err != nil {
slog.Error(err.Error())
if err := ec.Publish(reply, plexus.MessageResponse{Message: err.Error()}); err != nil {
slog.Error(err.Error())
}
return
if err := deleteInterface(network.Interface); err != nil {
slog.Error("delete interface", "iface", network.Interface, "error", err)
}
if err := startInterface(self, network); err != nil {
slog.Error("start interface", "iface", network.Interface, "error", err)
}
//if err := resetPeersOnNetworkInterface(self, network); err != nil {
// slog.Error(err.Error())
// if err := ec.Publish(reply, plexus.MessageResponse{Message: err.Error()}); err != nil {
// slog.Error(err.Error())
// }
// return
//}
if err := ec.Publish(reply, plexus.MessageResponse{Message: "interface reset"}); err != nil {
slog.Error(err.Error())
}
Expand Down Expand Up @@ -242,7 +248,6 @@ func subcribeToServerTopics(self Device) {
slog.Error("join network subscription", "error", err)
}
subscriptions = append(subscriptions, joinNet)

sendListenPorts, err := serverEC.Subscribe(plexus.Update+id+plexus.SendListenPorts,
func(subj, reply string, data plexus.ListenPortRequest) {
slog.Info("new listen ports", "network", data.Network)
Expand All @@ -260,6 +265,48 @@ func subcribeToServerTopics(self Device) {
slog.Error("send listen port subscription", "error", err)
}
subscriptions = append(subscriptions, sendListenPorts)
addRouter, err := serverEC.Subscribe(plexus.Update+id+plexus.AddRouter,
func(subj, reply string, data plexus.NetworkPeer) {
if data.WGPublicKey != id {
slog.Error("add router wrong id", "me", id, "router", data.WGPublicKey)
return
}
if !data.IsSubnetRouter {
return
}
slog.Debug("adding subnet router")
if data.UseNat {
if err := addNat(); err != nil {
slog.Error("add nat", "error", err)
}
}
if data.UseVirtSubnet {
if err := addVirtualSubnet(data.VirtSubnet, data.Subnet); err != nil {
slog.Error("add virtual subnet", "error", err)
}
}
})
if err != nil {
slog.Error("add router subscription", "error", err)
}
subscriptions = append(subscriptions, addRouter)
delRouter, err := serverEC.Subscribe(plexus.Update+id+plexus.DeleteRouter,
func(subj, reply string, data plexus.NetworkPeer) {
if data.WGPublicKey != id {
slog.Error("add router wrong id", "me", id, "router", data.WGPublicKey)
return
}
if err := delNat(); err != nil {
slog.Error("delete nat", "error", err)
}
if err := delVirtualSubnet(); err != nil {
slog.Error("delete virtual subnet", "error", err)
}
})
if err != nil {
slog.Error("delete router subscription", "error", err)
}
subscriptions = append(subscriptions, delRouter)
}

func createRegistationConnection(key plexus.KeyValue) (*nats.EncodedConn, error) {
Expand Down
15 changes: 12 additions & 3 deletions internal/agent/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func deleteAllInterfaces() {
if err = delNat(); err != nil {
slog.Error("delete NAT", "error", err)
}
if err = delVirtualSubnet(); err != nil {
slog.Error("delete virtual subnet", "error", err)
}
}

func startAllInterfaces(self Device) {
Expand Down Expand Up @@ -106,6 +109,7 @@ func startInterface(self Device, network Network) error {
}
if port != network.ListenPort {
portChanged = true
network.ListenPort = port
}
if addressChanged {
if err := boltdb.Save(self, "self", deviceTable); err != nil {
Expand All @@ -117,7 +121,7 @@ func startInterface(self Device, network Network) error {
if err := boltdb.Save(network, network.Name, networkTable); err != nil {
return err
}
go publishPeerUpdate(&self, &network)
go publishListenPortUpdate(&self, &network)
}
config := wgtypes.Config{
PrivateKey: &privKey,
Expand Down Expand Up @@ -216,8 +220,13 @@ func getAllowedIPs(node plexus.NetworkPeer, peers []plexus.NetworkPeer) []net.IP
IP: node.Address.IP,
Mask: net.CIDRMask(32, 32),
})
if node.IsSubNetRouter {
allowed = append(allowed, node.SubNet)
if node.IsSubnetRouter {
if node.UseVirtSubnet {
allowed = append(allowed, node.VirtSubnet)
} else {
allowed = append(allowed, node.Subnet)
}
slog.Debug("new allowed ips", "allowed", allowed, "virt", node.VirtSubnet, "subnet", node.Subnet)
}
if node.IsRelay {
for _, peer := range peers {
Expand Down
5 changes: 4 additions & 1 deletion internal/agent/networks.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func toAgentNetwork(in plexus.Network) Network {
return out
}

func saveServerNetworks(networks []plexus.Network) error {
func saveServerNetworks(self Device, networks []plexus.Network) error {
takenInterfaces := []int{}
var err error
for _, serverNet := range networks {
Expand All @@ -40,6 +40,9 @@ func saveServerNetworks(networks []plexus.Network) error {
if err != nil {
return fmt.Errorf("unable to get freeport %w", err)
}
if _, _, err := stunCheck(&self, &network, network.ListenPort); err != nil {
return fmt.Errorf("stun check %w", err)
}
interfaceFound := false
for i := range maxNetworks {
if !slices.Contains(takenInterfaces, i) {
Expand Down
108 changes: 106 additions & 2 deletions internal/agent/nftables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package agent

import (
"log/slog"
"net"

"github.com/c-robinson/iplib"
"github.com/google/nftables"
"github.com/google/nftables/expr"
)

func addNat() error {
slog.Debug("adding NAT rule")
c := &nftables.Conn{}
table := c.AddTable(&nftables.Table{
Name: "plexus",
Expand Down Expand Up @@ -58,8 +61,8 @@ func checkForNat(self Device, network Network) error {
slog.Debug("checking if NAT required")
for _, peer := range network.Peers {
if peer.WGPublicKey == self.WGPublicKey {
slog.Debug("Nat check", "subnet-router", peer.IsSubNetRouter)
if !peer.IsSubNetRouter {
slog.Debug("Nat check", "subnet-router", peer.IsSubnetRouter)
if !peer.IsSubnetRouter {
slog.Debug("nat check -- not subnetrouter")
return nil
}
Expand All @@ -68,6 +71,107 @@ func checkForNat(self Device, network Network) error {
slog.Debug("adding NAT", "network", network.Name)
return addNat()
}
if peer.UseVirtSubnet {
slog.Debug("adding virtual subnet", "peer", peer.HostName, "virtual subnet", peer.VirtSubnet, "subnet", peer.Subnet)
return addVirtualSubnet(peer.VirtSubnet, peer.Subnet)
}
}
}
return nil
}

func addVirtualSubnet(virtual, subnet net.IPNet) error {
slog.Debug("add virtual subnet", "virtual", virtual, "subnet", subnet)
c := &nftables.Conn{}
table := c.AddTable(&nftables.Table{
Name: "plexus",
Family: nftables.TableFamilyIPv4,
})
if err := delVirtualSubnet(); err != nil {
slog.Debug("delete virtual subnet", "error", err)
return err
}
chain := c.AddChain(&nftables.Chain{
Name: "plexus-subnet",
Table: table,
Type: nftables.ChainTypeNAT,
Hooknum: nftables.ChainHookPrerouting,
Priority: nftables.ChainPriorityFilter,
})
ones, _ := virtual.Mask.Size()
virtNet := iplib.NewNet4(virtual.IP, ones)
virt := virtNet.FirstAddress()
subNet := iplib.NewNet4(subnet.IP, ones)
sub := subNet.FirstAddress()
rule := &nftables.Rule{
Table: table,
Chain: chain,
Exprs: []expr.Any{
&expr.Payload{
OperationType: expr.PayloadLoad,
SourceRegister: 0,
DestRegister: 1,
Base: expr.PayloadBaseNetworkHeader,
Offset: 0x10,
Len: 0x4,
},
&expr.Cmp{
Op: expr.CmpOpEq,
Register: 1,
Data: virt,
},
&expr.Immediate{
Register: 1,
Data: sub,
},
&expr.NAT{
Type: expr.NATTypeDestNAT,
Family: uint32(nftables.TableFamilyIPv4),
RegAddrMin: 1,
RegAddrMax: 1,
RegProtoMin: 0,
RegProtoMax: 0,
},
},
}
c.AddRule(rule)
if err := c.Flush(); err != nil {
slog.Debug("flush rules", "errror", err)
return err
}
for {
var err error
virt, err = virtNet.NextIP(virt)
if err != nil {
break
}
sub, err = subNet.NextIP(sub)
if err != nil {
break
}
rule.Exprs[1].(*expr.Cmp).Data = virt
rule.Exprs[2].(*expr.Immediate).Data = sub
c.AddRule(rule)
if err := c.Flush(); err != nil {
slog.Debug("flush rules", "errror", err)
return err
}
}
return nil
}

func delVirtualSubnet() error {
slog.Debug("deleting virtual subnet")
c := &nftables.Conn{}
chains, err := c.ListChains()
if err != nil {
return nil
}
for _, chain := range chains {
if chain.Name == "plexus-subnet" {
slog.Debug("deleting plexus-subnet chain")
c.DelChain(chain)
return c.Flush()
}
}
return nil
Expand Down
44 changes: 43 additions & 1 deletion internal/agent/nftables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ func TestCheckForNat(t *testing.T) {
}
})
t.Run("subnetWithoutNat", func(t *testing.T) {
peer.IsSubNetRouter = true
peer.IsSubnetRouter = true
peer.Subnet = net.IPNet{
IP: net.ParseIP("192.168.0.0"),
Mask: net.CIDRMask(24, 32),
}
network.Peers = []plexus.NetworkPeer{peer}
err = checkForNat(self, network)
assert.Nil(t, err)
Expand Down Expand Up @@ -197,7 +201,45 @@ func TestCheckForNat(t *testing.T) {
RegProtoMax: 0,
}, rules[0].Exprs[0])
})
t.Run("virtual subnet", func(t *testing.T) {
table := &nftables.Table{}
chain := &nftables.Chain{}
peer.UseNat = false
peer.UseVirtSubnet = true
peer.VirtSubnet = net.IPNet{
IP: net.ParseIP("10.100.0.0").To4(),
Mask: net.CIDRMask(24, 32),
}
network.Peers = []plexus.NetworkPeer{peer}
t.Log(self, network)
err = checkForNat(self, network)
assert.Nil(t, err)
tables, err := c.ListTables()
assert.Nil(t, err)
tableFound := false
for _, t := range tables {
if t.Name == "plexus" {
tableFound = true
table = t
}
}
assert.True(t, tableFound)
chains, err := c.ListChains()
assert.Nil(t, err)
chainFound := false
for _, c := range chains {
if c.Name == "plexus-subnet" {
chainFound = true
chain = c
}
}
assert.True(t, chainFound)
rules, err := c.GetRules(table, chain)
assert.Nil(t, err)
assert.Equal(t, 254, len(rules))
})
cleanNat(t, c)

}

func cleanNat(t *testing.T, c *nftables.Conn) {
Expand Down
33 changes: 12 additions & 21 deletions internal/agent/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,28 @@ func publishDeviceUpdate(self *Device) {
}
}

func publishPeerUpdate(self *Device, network *Network) {
slog.Info("publishing network peer update")
me := getSelfFromPeers(self, network.Peers)
// publish new listening ports to server
func publishListenPortUpdate(self *Device, network *Network) {
slog.Info("publishing listen port update")
serverEC := serverConn.Load()
if serverEC == nil {
slog.Error("not connected to server")
return
}
if err := serverEC.Publish(self.WGPublicKey+plexus.UpdateNetworkPeer, plexus.NetworkPeer{
WGPublicKey: self.WGPublicKey,
HostName: self.Name,
Address: me.Address,
if err := serverEC.Publish(self.WGPublicKey+plexus.UpdateListenPorts, plexus.ListenPortResponse{
ListenPort: network.ListenPort,
PublicListenPort: network.PublicListenPort,
Endpoint: self.Endpoint,
NatsConnected: true,
Connectivity: me.Connectivity,
IsRelay: me.IsRelay,
IsRelayed: me.IsRelayed,
RelayedPeers: me.RelayedPeers,
},
); err != nil {
slog.Error("publish network peer update", "error", err)
}
}

func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
for _, peer := range peers {
if peer.WGPublicKey == self.WGPublicKey {
return &peer
}
}
return nil
}
//func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
// for _, peer := range peers {
// if peer.WGPublicKey == self.WGPublicKey {
// return &peer
// }
// }
// return nil
//}
Loading
Loading