Skip to content

Commit

Permalink
Merge pull request #30 from devilcove/feature/publicEndpoints
Browse files Browse the repository at this point in the history
Feature/public endpoints
  • Loading branch information
mattkasun authored Apr 24, 2024
2 parents 6231693 + d91104b commit 55bb4d9
Show file tree
Hide file tree
Showing 14 changed files with 357 additions and 54 deletions.
84 changes: 84 additions & 0 deletions app/plexus-agent/cmd/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright © 2024 Matthew R Kasun <[email protected]>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd

import (
"fmt"
"net"

"github.com/devilcove/plexus"
"github.com/devilcove/plexus/internal/agent"
"github.com/spf13/cobra"
"github.com/vishvananda/netlink"
)

// setCmd represents the set command
var setCmd = &cobra.Command{
Use: "set ip [network]",
Args: cobra.RangeArgs(1, 2),
Short: "set private endpoint for network",
Long: `set private endpoint ip for a or all networks.`,
Run: func(cmd *cobra.Command, args []string) {
network := ""
if len(args) > 1 {
network = args[1]
}
fmt.Println("set called")
ip := net.ParseIP(args[0])
if ip == nil {
fmt.Println("invalid ip")
return
}
addr, err := netlink.AddrList(nil, netlink.FAMILY_V4)
if err != nil {
fmt.Println("error getting addresses", err)
return
}
found := false
for _, add := range addr {
if ip.Equal(add.IP) {
found = true
}
}
if !found {
fmt.Println("invalid ip")
return
}
request := plexus.PrivateEndpoint{
IP: args[0],
Network: network,
}
resp := plexus.MessageResponse{}
ec, err := agent.ConnectToAgentBroker()
cobra.CheckErr(err)
cobra.CheckErr(ec.Request(agent.Agent+plexus.SetPrivateEndpoint, request, &resp, agent.NatsTimeout))
fmt.Println(resp.Message)
},
}

func init() {
rootCmd.AddCommand(setCmd)

// Here you will define your flags and configuration settings.

// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// setCmd.PersistentFlags().String("foo", "", "A help for foo")

// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// setCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
48 changes: 48 additions & 0 deletions internal/agent/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"log/slog"
"net"
"runtime/debug"
"strings"

Expand Down Expand Up @@ -186,6 +187,53 @@ func subcribe(ec *nats.EncodedConn) {
slog.Error("publish reply to version request", "error", err)
}
})
_, _ = ec.Subscribe(Agent+plexus.SetPrivateEndpoint, func(sub, reply string, request plexus.PrivateEndpoint) {
slog.Debug("set private endpoint", "endpoint", request.IP, "network", request.Network)
var err error
var networks []Network
self, err := boltdb.Get[Device]("self", deviceTable)
if err != nil {
_ = ec.Publish(reply, plexus.MessageResponse{
Message: "error getting device" + err.Error(),
})
}
if request.Network == "" {
networks, err = boltdb.GetAll[Network](networkTable)
if err != nil {
_ = ec.Publish(reply, plexus.MessageResponse{
Message: "error reading networks" + err.Error(),
})
}
} else {
network, err := boltdb.Get[Network](request.Network, networkTable)
if err != nil {
_ = ec.Publish(reply, plexus.MessageResponse{
Message: "error reading network " + err.Error(),
})
}
networks = append(networks, network)
}
for _, network := range networks {
for i, peer := range network.Peers {
if peer.WGPublicKey == self.WGPublicKey {
network.Peers[i].PrivateEndpoint = net.ParseIP(request.IP)
if err := publishNetworkPeerUpdate(self, &network.Peers[i]); err != nil {
_ = ec.Publish(reply, plexus.MessageResponse{
Message: "error publishing update to server " + err.Error(),
})
}
}
}
if err := boltdb.Save(network, network.Name, networkTable); err != nil {
_ = ec.Publish(reply, plexus.MessageResponse{
Message: "error saving network " + network.Name + err.Error(),
})
}
}
_ = ec.Publish(reply, plexus.MessageResponse{
Message: "private endpoint added",
})
})
}

func ConnectToAgentBroker() (*nats.EncodedConn, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
serverCheckTime = time.Hour * 1
connectivityTimeout = time.Minute * 3
networkNotMapped = "network not mapped to server"
version = "v0.1.0"
version = "v0.2.1"
networkTable = "networks"
deviceTable = "devices"
path = "/var/lib/plexus/"
Expand All @@ -30,7 +30,7 @@ var (
subscriptions []*nats.Subscription
//errors
ErrNetNotMapped = errors.New("network not mapped to server")
ErrConnected = errors.New("network connected")
ErrNotConnected = errors.New("not connected to server")
)

type Configuration struct {
Expand Down
61 changes: 61 additions & 0 deletions internal/agent/daemon.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package agent

import (
"bufio"
"context"
"fmt"
"log"
"log/slog"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -33,10 +38,15 @@ func Run() {
startAllInterfaces(self)
checkinTicker := time.NewTicker(checkinTime)
serverTicker := time.NewTicker(serverCheckTime)
wg := &sync.WaitGroup{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go privateEndpointServer(ctx, wg)
for {
select {
case <-quit:
slog.Info("quit")
cancel()
slog.Info("deleting wg interfaces")
deleteAllInterfaces()
slog.Info("stopping tickers")
Expand All @@ -49,6 +59,7 @@ func Run() {
slog.Info("wait for nat server shutdown to complete")
ns.WaitForShutdown()
slog.Info("nats server has shutdown")
wg.Wait()
slog.Info("exiting ...")
return
case <-checkinTicker.C:
Expand Down Expand Up @@ -151,3 +162,53 @@ func closeServerConnections() {
ec.Close()
}
}

func privateEndpointServer(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
networks, err := boltdb.GetAll[Network](networkTable)
if err != nil {
return
}
self, err := boltdb.Get[Device]("self", deviceTable)
if err != nil {
return
}
for _, network := range networks {
me := getSelfFromPeers(&self, network.Peers)
if me.PrivateEndpoint == nil {
continue
}
slog.Info("tcp listener staating on prviate endpoint")
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", me.PrivateEndpoint, me.ListenPort))
if err != nil {
slog.Error("public endpoint server", "error", err)
return
}
go func() {
for {
select {
case <-ctx.Done():
listener.Close()
return
default:
c, err := listener.Accept()
if err != nil {
slog.Warn("connect error", "error", err)
continue
}
go handleConn(c, self.WGPublicKey)
}
}
}()
}
}

func handleConn(c net.Conn, reply string) {
defer c.Close()
reader := bufio.NewReader(c)
_, err := reader.ReadBytes(byte('.'))
if err != nil {
return
}
_, _ = c.Write([]byte(reply))
}
10 changes: 10 additions & 0 deletions internal/agent/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func networkUpdates(subject string, update plexus.NetworkUpdate) {
return
}
}
if update.Peer.PrivateEndpoint != nil {
if connectToPublicEndpoint(update.Peer) {
update.Peer.UsePrivateEndpoint = true
}
}
network.Peers = append(network.Peers, update.Peer)
if err := boltdb.Save(network, network.Name, networkTable); err != nil {
slog.Error("update network -- add peer", "error", err)
Expand Down Expand Up @@ -93,6 +98,11 @@ func networkUpdates(subject string, update plexus.NetworkUpdate) {
found := false
for i, oldpeer := range network.Peers {
if oldpeer.WGPublicKey == update.Peer.WGPublicKey {
if update.Peer.PrivateEndpoint != nil {
if connectToPublicEndpoint(update.Peer) {
update.Peer.UsePrivateEndpoint = true
}
}
network.Peers = slices.Replace(network.Peers, i, i+1, update.Peer)
found = true
break
Expand Down
36 changes: 36 additions & 0 deletions internal/agent/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"bufio"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -287,6 +288,15 @@ func getWGPeers(self Device, network Network) []wgtypes.PeerConfig {
},
PersistentKeepaliveInterval: &keepalive,
}
if peer.PrivateEndpoint != nil {
if connectToPublicEndpoint(peer) {
peer.UsePrivateEndpoint = true
wgPeer.Endpoint = &net.UDPAddr{
IP: peer.PrivateEndpoint,
Port: peer.ListenPort,
}
}
}
peers = append(peers, wgPeer)
}
return peers
Expand Down Expand Up @@ -385,3 +395,29 @@ func convertPeerToWG(netPeer plexus.NetworkPeer, peers []plexus.NetworkPeer) (wg
AllowedIPs: getAllowedIPs(netPeer, peers),
}, nil
}

func connectToPublicEndpoint(peer plexus.NetworkPeer) bool {
slog.Debug("checking private endpoint", "peer", peer.HostName)
endpoint := fmt.Sprintf("%s:%d", peer.PrivateEndpoint, peer.ListenPort)
c, err := net.Dial("tcp", endpoint)
if err != nil {
slog.Debug("err dialing endpoint", "error", err)
return false
}
defer c.Close()
p := make([]byte, 1024)
if _, err := c.Write([]byte("olleh.")); err != nil {
slog.Debug("error writing", "error", err)
return false
}
if _, err := bufio.NewReader(c).Read(p); err != nil {
slog.Debug("error reading", "error", err)
return false
}
if string(p[:44]) != peer.WGPublicKey {
slog.Debug("bad response", "response", string(p))
return false
}
slog.Debug("use private endpoint for", "peer", peer.HostName)
return true
}
29 changes: 21 additions & 8 deletions internal/agent/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,24 @@ func publishListenPortUpdate(self *Device, network *Network) {
}
}

//func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
// for _, peer := range peers {
// if peer.WGPublicKey == self.WGPublicKey {
// return &peer
// }
// }
// return nil
//}
// publish network peer update to server
func publishNetworkPeerUpdate(self Device, peer *plexus.NetworkPeer) error {
slog.Info("publishing network peer update")
serverEC := serverConn.Load()
if serverEC == nil {
return ErrNotConnected
}
if err := serverEC.Publish(self.WGPublicKey+plexus.UpdateNetworkPeer, peer); err != nil {
return err
}
return nil
}

func getSelfFromPeers(self *Device, peers []plexus.NetworkPeer) *plexus.NetworkPeer {
for _, peer := range peers {
if peer.WGPublicKey == self.WGPublicKey {
return &peer
}
}
return nil
}
2 changes: 1 addition & 1 deletion internal/agent/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func createPeer() (*plexus.Peer, *wgtypes.Key, string, error) {
WGPublicKey: pubKey.String(),
PubNkey: nkey,
Name: name,
Version: "v0.1.0",
Version: version,
Endpoint: stunAddr.IP,
OS: runtime.GOOS,
Updated: time.Now(),
Expand Down
13 changes: 13 additions & 0 deletions internal/server/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,5 +363,18 @@ func serverSubcriptions() []*nats.Subscription {
}
subcriptions = append(subcriptions, deviceUpdate)

// network peer updates
peerUpdate, err := eConn.Subscribe("*"+plexus.UpdateNetworkPeer, func(subj string, request *plexus.NetworkPeer) {
if len(subj) != 44+len(plexus.UpdateNetworkPeer) {
slog.Error("invalid sub", "subj", subj)
return
}
processNetworkPeerUpdate(subj[:44], request)
})
if err != nil {
slog.Error("subscribe peer update", "error", err)
}
subcriptions = append(subcriptions, peerUpdate)

return subcriptions
}
Loading

0 comments on commit 55bb4d9

Please sign in to comment.