Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature: TCPFactory/TCPTransport instead of dmsg #503

Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1857842
Compiling. Passes tests
ayuryshev Jul 25, 2019
b81e03f
configurable pubkeys file, TCPFactory initialized with Listener
ayuryshev Jul 25, 2019
0a2297d
Deduplicated code
ayuryshev Jul 25, 2019
6d96017
Commented out tcp-transport for setup-node
ayuryshev Jul 29, 2019
56c74c5
Changes: TCPFactory working correctly with predefined in/out ports
ayuryshev Jul 31, 2019
15226f0
Now works with dynamic ports from diallers
ayuryshev Jul 31, 2019
afbdd85
Integration environment ready.
ayuryshev Jul 31, 2019
cb5bf31
Proceeded to thorn-letter problem
ayuryshev Aug 1, 2019
4f56b61
Transport is accepted by nodeA. Still not working
ayuryshev Aug 2, 2019
45f805c
Remerged with mainnet. Setup node changes rolled back
ayuryshev Aug 8, 2019
fca2d56
Start of multihead test
ayuryshev Aug 12, 2019
4d4b980
Managed to run 128 nodes in Example_runMultihead
ayuryshev Aug 13, 2019
8dec53d
Logrus output accumulated in memory. Routing Tables are in memory too
ayuryshev Aug 13, 2019
5ae74d5
multihead environment enveloped in type MultiHead. Send message as Mu…
ayuryshev Aug 14, 2019
43d2611
Fixed logging everywhere. Multhead simplified
ayuryshev Aug 15, 2019
a8142bc
Restructured tests for router
ayuryshev Aug 16, 2019
70b1779
Changes in PacketRouter implementations: callbacks streamlined into r…
ayuryshev Aug 17, 2019
4a951c6
a lot of debugging logs
ayuryshev Aug 17, 2019
3da5dbe
still working
ayuryshev Aug 19, 2019
1d31171
yet another step forward
ayuryshev Aug 19, 2019
598e17a
Yet another step
ayuryshev Aug 21, 2019
7572b29
tcp-transport finally working
ayuryshev Aug 22, 2019
7635a5a
some cleanups
ayuryshev Aug 23, 2019
d9e6356
encore un efforti
ayuryshev Aug 26, 2019
fef6c40
restored logging
ayuryshev Aug 26, 2019
3ac70d0
routing.Loop renamed to routing.AddressPair
ayuryshev Aug 26, 2019
1e53cfe
LoopDescriptor, LoopData -> AddressPairDescriptor, AddressPairData
ayuryshev Aug 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/skywire-cli/commands/node/gen-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ func defaultConfig() *visor.Config {
conf.Routing.RouteFinder = "https://routefinder.skywire.skycoin.net/"

const defaultSetupNodePK = "0324579f003e6b4048bae2def4365e634d8e0e3054a20fc7af49daf2a179658557"

sPK := cipher.PubKey{}
if err := sPK.UnmarshalText([]byte(defaultSetupNodePK)); err != nil {
log.WithError(err).Warnf("Failed to unmarshal default setup node public key %s", defaultSetupNodePK)
}

conf.Routing.SetupNodes = []cipher.PubKey{sPK}
conf.Routing.Table.Type = "boltdb"
conf.Routing.Table.Location = "./skywire/routing.db"
Expand All @@ -120,5 +122,8 @@ func defaultConfig() *visor.Config {

conf.Interfaces.RPCAddress = "localhost:3435"

conf.TransportType = "dmsg"
conf.PubKeysFile = "./local/pubkeys"

return conf
}
27 changes: 27 additions & 0 deletions integration/check-delays.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

MSGD=messaging.discovery.skywire.skycoin.net
MSGD_GET="https://"$MSGD"/messaging-discovery/available_servers"

echo -e "\nTCP delays. Measuring by ping:"
ping $MSGD -c 10 -q

if type mtr > /dev/null; then
echo -e "\nTCP delays. Measuring by mtr:"
mtr -y 2 --report --report-cycles=5 $MSGD > /tmp/msgd-out.txt

cat /tmp/msgd-out.txt
else
echo -e "\nTCP delays. mtr not found. Install for detailed stats"
fi

if type vegeta > /dev/null; then
echo -e "\nHTTP delays. Measuring by vegeta:"
echo "GET "$MSGD_GET \
| vegeta attack -duration=10s |tee results.bin |vegeta report
else
echo -e "\nHTTP delays.vegeta not found\n. Install with \ngo get -u github.com/tsenart/vegeta\n for detailed stats"
fi

echo -e "\nHTTP delays. Measuring by curl:"
curl $MSGD_GET >/dev/null
13 changes: 13 additions & 0 deletions integration/tcp-tr/env-vars.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# intended to be sourced `source ./integration/tcp-tr/env-vars.sh`

export RPC_A=192.168.1.2:3435
export RPC_C=192.168.1.3:3435

alias CLI_A='./skywire-cli --rpc $RPC_A'
alias CLI_C='./skywire-cli --rpc $RPC_C'

export PK_A=$(./skywire-cli --rpc $RPC_A node pk)
export PK_C=$(./skywire-cli --rpc $RPC_C node pk)

export CHAT_A=http://192.168.1.2:8001/message
export CHAT_C=http://192.168.1.3:8001/message
3 changes: 3 additions & 0 deletions integration/tcp-tr/hosts.pubkeys
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
02fffa3ffd07630bf5565493a990f182ea7de56a9c14abbe041959a8e4667d3447 192.168.1.2:9119
0315852b6dba67d16c5376b40e082a1036bd17b66c67573b242625f485f605756e 192.168.1.3:9119

53 changes: 53 additions & 0 deletions integration/tcp-tr/nodeA.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"version": "1.0",
"node": {
"static_public_key": "02fffa3ffd07630bf5565493a990f182ea7de56a9c14abbe041959a8e4667d3447",
"static_secret_key": "4be3f22d89d0bade8c5bf0084919f891e5e2dd0d4db0bc139318dfa576ca0237"
},
"messaging": {
"discovery": "https://messaging.discovery.skywire.skycoin.net",
"server_count": 1
},
"transport": {
"discovery": "https://transport.discovery.skywire.skycoin.net",
"log_store": {
"type": "file",
"location": "./local/tcp-tr/nodeA/transport-logs"
}
},
"routing": {
"setup_nodes": [
"0324579f003e6b4048bae2def4365e634d8e0e3054a20fc7af49daf2a179658557"
],
"route_finder": "https://routefinder.skywire.skycoin.net/",
"route_finder_timeout": "10s",
"table": {
"type": "boltdb",
"location": "./local/tcp-tr/nodeA/routing.db"
}
},
"apps": [
{
"app": "skychat",
"version": "1.0",
"auto_start": true,
"port": 1,
"args": [
"-addr",
"192.168.1.2:8001"
]
}
],
"trusted_nodes": [],
"hypervisors": [],
"apps_path": "./apps",
"local_path": "./local",
"transport_type": "tcp-transport",
"pubkeys_file": "./integration/tcp-tr/hosts.pubkeys",
"tcptransport_addr": "192.168.1.2:9119",
"shutdown_timeout": "10s",
"interfaces": {
"rpc": "192.168.1.2:3435"
},
"log_level": "info"
}
53 changes: 53 additions & 0 deletions integration/tcp-tr/nodeC.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"version": "1.0",
"node": {
"static_public_key": "0315852b6dba67d16c5376b40e082a1036bd17b66c67573b242625f485f605756e",
"static_secret_key": "55553e1fb464b557e47392caad59f94ca14dff872f43c4ee786a84df761e69e0"
},
"messaging": {
"discovery": "https://messaging.discovery.skywire.skycoin.net",
"server_count": 1
},
"transport": {
"discovery": "https://transport.discovery.skywire.skycoin.net",
"log_store": {
"type": "file",
"location": "./local/tcp-tr/nodeB/transport-logs"
}
},
"routing": {
"setup_nodes": [
"0324579f003e6b4048bae2def4365e634d8e0e3054a20fc7af49daf2a179658557"
],
"route_finder": "https://routefinder.skywire.skycoin.net/",
"route_finder_timeout": "10s",
"table": {
"type": "boltdb",
"location": "./local/tcp-tr/nodeB/routing.db"
}
},
"apps": [
{
"app": "skychat",
"version": "1.0",
"auto_start": true,
"port": 1,
"args": [
"-addr",
"192.168.1.3:8001"
]
}
],
"trusted_nodes": [],
"hypervisors": [],
"apps_path": "./apps",
"local_path": "./local",
"transport_type": "tcp-transport",
"pubkeys_file": "./integration/tcp-tr/hosts.pubkeys",
"tcptransport_addr": "192.168.1.3:9119",
"shutdown_timeout": "10s",
"interfaces": {
"rpc": "192.168.1.3:3435"
},
"log_level": "info"
}
2 changes: 1 addition & 1 deletion integration/test-messaging.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env bash
curl --data {'"recipient":"'$PK_A'", "message":"Hello Joe!"}' -X POST $CHAT_C
curl --data {'"recipient":"'$PK_C'", "message":"Hello Mike!"}' -X POST $CHAT_A
# curl --data {'"recipient":"'$PK_C'", "message":"Hello Mike!"}' -X POST $CHAT_A
52 changes: 37 additions & 15 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync"
"time"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

Expand Down Expand Up @@ -39,6 +38,7 @@ type Config struct {
RoutingTable routing.Table
RouteFinder routeFinder.Client
SetupNodes []cipher.PubKey
TransportType string
}

// Router implements node.PacketRouter. It manages routing table by
Expand Down Expand Up @@ -256,19 +256,23 @@ func (r *Router) consumePacket(payload []byte, rule routing.Rule) error {
}

func (r *Router) forwardAppPacket(appConn *app.Protocol, packet *app.Packet) error {

r.Logger.Info("Entering r.forwardAppPacket")

if packet.Loop.Remote.PubKey == r.config.PubKey {
return r.forwardLocalAppPacket(packet)
}

r.Logger.Info("Entering r.forwardAppPacket GetLoop")
l, err := r.pm.GetLoop(packet.Loop.Local.Port, packet.Loop.Remote)
if err != nil {
return err
}

tr := r.tm.Transport(l.trID)
if tr == nil {
return errors.New("unknown transport")
return fmt.Errorf("unknown transport id %v", l.trID)
}
r.Logger.Info("r.forwardAppPacket enter routing.MakePacket")

p := routing.MakePacket(l.routeID, packet.Payload)
r.Logger.Infof("Forwarded App packet from LocalPort %d using route ID %d", packet.Loop.Local.Port, l.routeID)
Expand All @@ -293,6 +297,7 @@ func (r *Router) forwardLocalAppPacket(packet *app.Packet) error {
}

func (r *Router) requestLoop(appConn *app.Protocol, raddr routing.Addr) (routing.Addr, error) {

lport := r.pm.Alloc(appConn)
if err := r.pm.SetLoop(lport, raddr, &loop{}); err != nil {
return routing.Addr{}, err
Expand Down Expand Up @@ -322,18 +327,33 @@ func (r *Router) requestLoop(appConn *app.Protocol, raddr routing.Addr) (routing
Reverse: reverseRoute,
}

proto, tr, err := r.setupProto(context.Background())
if err != nil {
return routing.Addr{}, err
}
defer func() {
if err := tr.Close(); err != nil {
r.Logger.Warnf("Failed to close transport: %s", err)
r.Logger.Infof("Router.requestLoop\n")
r.Logger.Infof("laddr: %v\n, raddr: %v\n", laddr, raddr)

r.Logger.Info("Attempt to r.setupProto from r.requestLoop")
switch r.config.TransportType {
case "dmsg":
proto, tr, err := r.setupProto(context.Background())
if err != nil {
return routing.Addr{}, err
}
defer func() {
if err := tr.Close(); err != nil {
r.Logger.Warnf("Failed to close transport: %s", err)
}
}()

r.Logger.Infof("Router.requestLoop 6\n")
if err := setup.CreateLoop(proto, ld); err != nil {
return routing.Addr{}, fmt.Errorf("route setup: %s", err)
}
case "tcp-transport":
r.Logger.Info("Skipping setup for tcp-transport")
_, err := r.tm.CreateSetupTransport(context.Background(), raddr.PubKey, "tcp-transport")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dmsg is to be used with SetupNode always.

if err != nil {
r.Logger.Warnf("error creating transport %s", err)
}
}()

if err := setup.CreateLoop(proto, ld); err != nil {
return routing.Addr{}, fmt.Errorf("route setup: %s", err)
}

r.Logger.Infof("Created new loop to %s on port %d", raddr, laddr.Port)
Expand Down Expand Up @@ -376,7 +396,7 @@ func (r *Router) closeLoop(appConn *app.Protocol, loop routing.Loop) error {
if err := r.destroyLoop(loop); err != nil {
r.Logger.Warnf("Failed to remove loop: %s", err)
}

r.Logger.Info("Attempt to r.setupProto from r.closeLoop")
proto, tr, err := r.setupProto(context.Background())
if err != nil {
return err
Expand Down Expand Up @@ -435,7 +455,9 @@ func (r *Router) setupProto(ctx context.Context) (*setup.Protocol, transport.Tra
return nil, nil, errors.New("route setup: no nodes")
}

tr, err := r.tm.CreateSetupTransport(ctx, r.config.SetupNodes[0], dmsg.Type)
trType := r.config.TransportType
// TODO(evanlinjin): need string constant for tp type.
tr, err := r.tm.CreateSetupTransport(ctx, r.config.SetupNodes[0], trType)
if err != nil {
return nil, nil, fmt.Errorf("setup transport: %s", err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ type Config struct {

TransportDiscovery string `json:"transport_discovery"`

TransportType string `json:"transport_type"`
PubKeysFile string `json:"pubkeys_file"`
TCPTransportAddr string `json:"tcptransport_addr"`

LogLevel string `json:"log_level"`
}
6 changes: 5 additions & 1 deletion pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transport
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
Expand Down Expand Up @@ -47,8 +48,10 @@ type Manager struct {
func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
entries, err := config.DiscoveryClient.GetTransportsByEdge(context.Background(), config.PubKey)
if err != nil {

entries = make([]*EntryWithStatus, 0)
}
log.Infof("transport.NewManager. entries: v%\n", entries)

mEntries := make(map[Entry]struct{})
for _, entry := range entries {
Expand Down Expand Up @@ -316,7 +319,8 @@ func (tm *Manager) dialTransport(ctx context.Context, factory Factory, remote ci

tr, err := factory.Dial(ctx, remote)
if err != nil {
return nil, nil, err

return nil, nil, fmt.Errorf("error %v on factory.Dial to remote %v", err, remote)
}

entry, err := settlementInitiatorHandshake(public).Do(tm, tr, time.Minute)
Expand Down
Loading