Skip to content

Commit

Permalink
Retries, restarts in mesh connector
Browse files Browse the repository at this point in the history
  • Loading branch information
gazillion101 authored and kamyshdm committed Jan 19, 2023
1 parent 5c832e5 commit 29efbc4
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Tunnels/go/client/build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

go build -a -tags netgo -ldflags '-X 'main.MajorVersion=0' -X 'main.MinorVersion=1' -X 'main.ProtocolVersion=3' -w -extldflags "-static"' -o dymium
go build -a -tags netgo -ldflags '-X 'main.MajorVersion=0' -X 'main.MinorVersion=1' -X 'main.ProtocolVersion=5' -w -extldflags "-static"' -o dymium
cp ./dymium ../../../web/go/assets/customer/update/darwin/amd64/


2 changes: 1 addition & 1 deletion Tunnels/go/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func runProxy(listener *net.TCPListener, back chan string, port int, token strin
for i := 0; i < len(ca.RootCApem); i++ {

ok := caCertPool.AppendCertsFromPEM([]byte(ca.RootCApem[i]))
log.Infof("add ca #%d, status %t", i, ok)
log.Debugf("add ca #%d, status %t", i, ok)
}

config := &tls.Config{
Expand Down
95 changes: 76 additions & 19 deletions Tunnels/go/meshconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Virtcon struct {
accumUpstream int
totalUpstream int
}
var pingCounter = 0
var ackCounter = 0
var pingLock sync.RWMutex

func health() {
p := mux.NewRouter()
Expand All @@ -84,16 +87,6 @@ func health() {
http.ListenAndServe(":"+port, p)
}

func displayBuff(what string, buff []byte) {
if len(buff) > 10 {
head := buff[:6]
tail := buff[len(buff)-6:]
log.Debugf("%s head: %v, tail: %v", what, head, tail)
} else {
log.Debugf("%s buffer: %v", what, buff)
}
}

// convert DER to PEM format
func pemCSR(derBytes []byte) []byte {
pemBlock := &pem.Block{
Expand Down Expand Up @@ -176,13 +169,11 @@ func pipe(conmap map[int]*Virtcon, egress net.Conn,
// no op
} else {
if ok {
es := err.Error()
if !strings.Contains(es, "EOF") {
if err != io.EOF {
log.ErrorTenantf(conn.tenant, "Db read failed '%s', id:%d", err.Error(), id)
}
} else {
es := err.Error()
if !strings.Contains(es, "EOF") {
if err != io.EOF {
log.Errorf("Db read failed '%s', id:%d", err.Error(), id)
}
}
Expand Down Expand Up @@ -217,26 +208,70 @@ func MultiplexWriter(messages chan protocol.TransmissionUnit,
}
}
}
func Pinger(enc *gob.Encoder, ingress net.Conn, wake chan int ) {
var ping protocol.TransmissionUnit
// log.Infof("In Pinger")
ping.Action = protocol.Ping

for {
select {
case <- wake:
return
case <-time.After(20 * time.Second):
}

if pingCounter - ackCounter > 2 {
ingress.Close()
log.Errorf("Ping ack missing: %d %d", pingCounter, ackCounter)
return
}
pingLock.Lock()
ping.Id = pingCounter
curr := pingCounter
pingCounter++
pingLock.Unlock()

log.Debugf("Ping %d", curr)

err := enc.Encode(ping)
if err != nil {
ingress.Close()
if !strings.Contains(err.Error(), "closed network connection ") {
log.Errorf("Ping failed: %s", err.Error())
}
return
}
}
}
func PassTraffic(ingress *tls.Conn, customer string) {
// log.Info("in PassTraffic")
var conmap = make(map[int]*Virtcon)
var mu sync.RWMutex
defer ingress.Close()

dec := gob.NewDecoder(ingress)
enc := gob.NewEncoder(ingress)

messages := make(chan protocol.TransmissionUnit, 50)
go MultiplexWriter(messages, enc, ingress)

wake := make(chan int)
go Pinger(enc, ingress, wake )

for {
var buff protocol.TransmissionUnit
// log.Info("Wait in Decode")
err := dec.Decode(&buff)

if err != nil {
log.Errorf("Customer %s, read from client failed '%s', cleanup the proxy connection!",
customer, err.Error())
if err == io.EOF {
log.Errorf("Customer %s, read from client failed '%s', cleanup the proxy connection!",
customer, err.Error())
} else {
log.Errorf("Server closed connection, cleanup the proxy")
}
// close all outgoing connections
wake <- 1
mu.Lock()
for key := range conmap {
conmap[key].sock.Close()
Expand All @@ -246,8 +281,25 @@ func PassTraffic(ingress *tls.Conn, customer string) {
ingress.Close()
return
}

switch buff.Action {
case protocol.Error:
log.Errorf("Server returned error: %s", string(buff.Data))
mu.Lock()
for key := range conmap {
conmap[key].sock.Close()
delete(conmap, key)
}
mu.Unlock()
ingress.Close()
wake <- 0
return
case protocol.Ping:
pingLock.Lock()
ackCounter = buff.Id
log.Debugf("Ack: %d", ackCounter)
pingLock.Unlock()

case protocol.Open:

conn := &Virtcon{}
Expand Down Expand Up @@ -305,7 +357,9 @@ func CreateTunnel(tunnelserver string, clientCert *tls.Certificate) (*tls.Conn,
for i := 0; i < len(ca.RootCApem); i++ {

ok := caCertPool.AppendCertsFromPEM([]byte(ca.RootCApem[i]))
log.Infof("add ca #%d, status %t", i, ok)
if !ok {
log.Errorf("add ca #%d, status %t", i, ok)
}
}

config := &tls.Config{
Expand Down Expand Up @@ -347,7 +401,10 @@ func CreateTunnel(tunnelserver string, clientCert *tls.Certificate) (*tls.Conn,
func DoConnect() {
// -------------------
customer := os.Getenv("CUSTOMER")

pingLock.Lock()
ackCounter = 0
pingCounter = 0
pingLock.Unlock()
csr, err := generateCSR(customer)
if err != nil {
log.Errorf("Error generating CSR %s", err.Error())
Expand Down
12 changes: 12 additions & 0 deletions Tunnels/go/meshconnector/runprod.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

export LOG_LEVEL=Debug
export PORTAL=https://portal.dymium.io/
export CONNECTOR=a1626eb5-4fa4-4db5-b46b-e7cee14e1f1a
export KEY=RlMOV0QrCtIe
export SECRET=MT2L1VPOlGoMSgyRodkYO6PwMw7JB4VK2MDFXL3Iw-e64mLYy5hjdLgKrnlXeizG0bm3RmVaohX2DfdLgDJy2ZAsjJzRPNqC3KMlEmmMyBSGskQbKfim6iuX2FCwouei
export CUSTOMER=spoofcorp
export TUNNELSERVER=spoofcorp.dymium.io:3009
export LOCAL_ENVIRONMENT=true

./meshconnector
60 changes: 54 additions & 6 deletions Tunnels/go/meshserver/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net"
"os"
"io"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -43,6 +44,17 @@ type TunnelUpdate struct {
Tid string
}

func displayBuff(what string, buff []byte) {
if len(buff) > 32 {
head := buff[:16]
tail := buff[len(buff)-16:]
log.Debugf("%s head: %v, tail: %v", what, head, tail)
} else {
log.Debugf("%s buffer: %v", what, buff)
}
log.Debugf("%s ", string(buff) )
}

func initDB(host, nport, user, password, dbname, usetls string) error {
psqlInfo = fmt.Sprintf("host=%s port=%s user=%s "+
"dbname=%s sslmode=%s",
Expand Down Expand Up @@ -74,8 +86,7 @@ func createListener(customer, target, sport string) (*net.TCPListener, error) {
}
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
log.Errorf("Error in ListenTCP(%s): %s, can't continue", addr, err.Error())
os.Exit(1)
return listener, err
}
return listener, err
}
Expand All @@ -88,12 +99,15 @@ func remotepipe(customer string, messages chan protocol.TransmissionUnit, enc *g
go func(messages chan protocol.TransmissionUnit, enc *gob.Encoder) {
for {
tosend := <-messages
//displayBuff("From database: ", tosend.Data)
err := enc.Encode(tosend)
if err != nil {
log.ErrorTenantf(customer, "Error in Encode: %s", err.Error())
return
} else {
log.Debugf("sent %d bytes to connector", len(tosend.Data))
if len(tosend.Data) > 0 {
log.Debugf("sent %d bytes to connector", len(tosend.Data))
}
}
}

Expand All @@ -103,7 +117,12 @@ func remotepipe(customer string, messages chan protocol.TransmissionUnit, enc *g
err := dec.Decode(&buff)

if err != nil {
log.ErrorTenantf(customer, "Read from client failed '%s', cleanup the proxy connection!", err.Error())
if err == io.EOF {
log.ErrorTenantf(customer, "Connection closed, cleanup the proxy connection!", err.Error())

} else {
log.ErrorTenantf(customer, "Read from client failed '%s', cleanup the proxy connection!", err.Error())
}
// close all outgoing connections
mu.Lock()
for key := range conmap {
Expand All @@ -119,6 +138,11 @@ func remotepipe(customer string, messages chan protocol.TransmissionUnit, enc *g
}

switch buff.Action {
case protocol.Ping:
out := protocol.TransmissionUnit{protocol.Ping, buff.Id, []byte{} }
log.Debugf("Got ping, return ack %d", buff.Id)
messages <- out

case protocol.Open:
log.Debugf("protocol.Open. Should not happen")
case protocol.Close:
Expand All @@ -139,6 +163,7 @@ func remotepipe(customer string, messages chan protocol.TransmissionUnit, enc *g
mu.RLock()
conn, ok := conmap[buff.Id]
mu.RUnlock()
displayBuff("To database: ", buff.Data)
if ok {
_, err = conn.sock.Write(buff.Data)

Expand Down Expand Up @@ -260,6 +285,13 @@ func requestConnections(egress net.Conn, customer string, connections []string)
listen, err := createListener(customer, tg[0], tg[1])
if err != nil {
log.ErrorTenantf(customer, "Error creating listener %s", err.Error())
e := protocol.TransmissionUnit{protocol.Error, 0, []byte("Could not create a listener, probably another connector running")}
enc.Encode(e)
time.Sleep(2 * time.Second)
for _, l := range listeners {
l.Close()
}
return
} else {
log.Infof("Created listener for %s %s, connector %s, tunnel %s", tg[0], tg[1], tg[2], tg[3])
}
Expand Down Expand Up @@ -478,7 +510,22 @@ func UpdateTunnels(customer string, tunnels []TunnelUpdate) {
return
}
}

func setTCPKeepAlive(clientHello *tls.ClientHelloInfo) (*tls.Config, error) {
// Check that the underlying connection really is TCP.
if tcpConn, ok := clientHello.Conn.(*net.TCPConn); ok {
// we want to protect against NLB timeouts
if err := tcpConn.SetKeepAlivePeriod(60 * time.Second); err != nil {
log.Infof("Could not set keep alive period %s", err.Error())
} else {
log.Info("update keep alive period")
}
} else {
log.Error("TLS over non-TCP connection")
}

// Make sure to return nil, nil to let the caller fall back on the default behavior.
return nil, nil
}
func Server(address string, port int, customer string,
certPEMBlock, keyPEMBlock []byte, caCert []byte,
dbDomain, dbPort, dbUsername, dbPassword, dbName, usetls string) {
Expand All @@ -504,6 +551,7 @@ func Server(address string, port int, customer string,
Certificates: []tls.Certificate{cer},
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
GetConfigForClient: setTCPKeepAlive,
}
connect := fmt.Sprintf("%s:%d", address, port)
log.Debugf("TLS listen on %s", connect)
Expand All @@ -522,7 +570,7 @@ func Server(address string, port int, customer string,
log.Errorf("Error in tls.Accept: %s", err.Error())
continue
}
log.Debugf("Accepted!")
log.Debugf("Accepted call from a connector!")
// get the underlying tls connection
tlsConn, ok := egress.(*tls.Conn)
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions Tunnels/go/protocol/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package protocol

const (
Close = iota
Open
Open
Send
Ping
Error
)

type TransmissionUnit struct {
Action int
Id int
Data []byte
}

0 comments on commit 29efbc4

Please sign in to comment.