Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[M2] app2 implementation #543

Merged
merged 44 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
790571e
Add basic structures
Darkren Aug 30, 2019
5299283
Add methods for HSFrame
Darkren Aug 30, 2019
7deb42b
Fix PR queries
Darkren Aug 31, 2019
95a0d28
Add HSFrame tests
Darkren Aug 31, 2019
804b770
Add more work
Darkren Sep 1, 2019
5029589
Add yamux conn multiplexing
Darkren Sep 6, 2019
972fa99
Add some shape to the client
Darkren Sep 8, 2019
0a6506d
Add some work on server
Darkren Sep 9, 2019
67ab801
More work on app2
Darkren Sep 9, 2019
a07d8a0
Move `listen` to lm, add `Close` to lm
Darkren Sep 10, 2019
8dc1513
`acceptedConn` -> `clientConn`, `clientConn` -> `serverConn`
Darkren Sep 10, 2019
a282c5a
More work on app2
Darkren Sep 10, 2019
e2afc9e
Add RPC server for server
Darkren Sep 12, 2019
f374c39
Finish RPC communication
Darkren Sep 15, 2019
7bcafc9
Add comments
Darkren Sep 16, 2019
cf5e105
Impement `net` interfaces
Darkren Sep 16, 2019
f68b3e0
Add proper port handling
Darkren Sep 16, 2019
a161a2d
Remove `freeLocalPort` from `Conn`
Darkren Sep 16, 2019
a790f2c
Start to merge managers
Darkren Sep 16, 2019
3e295fd
Add more comments, finish manager
Darkren Sep 17, 2019
ca7fcae
Start implementing `manager` tests
Darkren Sep 17, 2019
1daf6c0
Almost finish `manager` tests
Darkren Sep 17, 2019
8b8ad33
Add client tests
Darkren Sep 18, 2019
0e4a24a
Add conn tests
Darkren Sep 18, 2019
f43c6bf
Add `Listener` tests
Darkren Sep 18, 2019
7ef6861
Refactor tests a bit
Darkren Sep 18, 2019
946a3dc
Merge branch 'mainnet-milestone2' of https://github.com/skycoin/skywi…
Darkren Sep 18, 2019
f5986d5
Add networker stuff
Darkren Sep 20, 2019
25489ef
Adjust code to the `Networker` usage
Darkren Sep 20, 2019
eb42bca
Refactor a bit
Darkren Sep 21, 2019
0714e55
Add more tests
Darkren Sep 21, 2019
3491994
Add even more tests
Darkren Sep 21, 2019
07d5b80
And more tests
Darkren Sep 22, 2019
b2dcbbb
Fix some queries
Darkren Sep 23, 2019
e243192
Finish RPC gateway tests
Darkren Sep 23, 2019
f9c933f
Add `WrappedConn`
Darkren Sep 24, 2019
34c6e29
Pass assigned local port from the server
Darkren Sep 24, 2019
677fb24
Add conn/listener tracking
Darkren Sep 24, 2019
3a9fdba
Fix client tests
Darkren Sep 24, 2019
8c02d73
Partially fix `idManager` tests
Darkren Sep 24, 2019
d7e140b
Get rid of porter
Darkren Sep 25, 2019
17ba146
Fix PR queries
Darkren Sep 25, 2019
d560351
Fix tests
Darkren Sep 25, 2019
662c229
Add rpcClient tests
Darkren Sep 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions pkg/app2/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package app2

import (
"encoding/binary"
"net"
"sync/atomic"

"github.com/hashicorp/yamux"

"github.com/pkg/errors"

"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/skywire/pkg/routing"

"github.com/skycoin/dmsg/cipher"
)

var (
ErrWrongHSFrameTypeReceived = errors.New("received wrong HS frame type")
)

// Client is used by skywire apps.
type Client struct {
PK cipher.PubKey
pid ProcID
sockAddr string
conn net.Conn
session *yamux.Session
logger *logging.Logger
Darkren marked this conversation as resolved.
Show resolved Hide resolved
lm *listenersManager
isListening int32
}

// NewClient creates a new Client. The Client needs to be provided with:
// - localPK: The local public key of the parent skywire visor.
// - pid: The procID assigned for the process that Client is being used by.
// - sockAddr: The socket address to connect to Server.
func NewClient(localPK cipher.PubKey, pid ProcID, sockAddr string, l *logging.Logger) (*Client, error) {
Darkren marked this conversation as resolved.
Show resolved Hide resolved
conn, err := net.Dial("unix", sockAddr)
if err != nil {
return nil, errors.Wrap(err, "error connecting app server")
}

session, err := yamux.Client(conn, nil)
if err != nil {
return nil, errors.Wrap(err, "error opening yamux session")
}

lm := newListenersManager()

return &Client{
PK: localPK,
pid: pid,
sockAddr: sockAddr,
conn: conn,
session: session,
lm: lm,
}, nil
}

func (c *Client) Dial(addr routing.Addr) (net.Conn, error) {
Darkren marked this conversation as resolved.
Show resolved Hide resolved
stream, err := c.session.Open()
if err != nil {
return nil, errors.Wrap(err, "error opening stream")
}

hsFrame := NewHSFrameDSMGDial(c.pid, routing.Loop{
Local: routing.Addr{
PubKey: c.PK,
},
Remote: addr,
})

if _, err := stream.Write(hsFrame); err != nil {
return nil, errors.Wrap(err, "error writing HS frame")
}

hsFrame, err = readHSFrame(stream)
if err != nil {
return nil, errors.Wrap(err, "error reading HS frame")
}

if hsFrame.FrameType() != HSFrameTypeDMSGAccept {
return nil, ErrWrongHSFrameTypeReceived
}

return stream, nil
Darkren marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *Client) Listen(port routing.Port) (*Listener, error) {
if c.lm.portIsBound(port) {
return nil, ErrPortAlreadyBound
}

stream, err := c.session.Open()
if err != nil {
return nil, errors.Wrap(err, "error opening stream")
}

addr := routing.Addr{
PubKey: c.PK,
Port: port,
}

hsFrame := NewHSFrameDMSGListen(c.pid, addr)
if _, err := stream.Write(hsFrame); err != nil {
return nil, errors.Wrap(err, "error writing HS frame")
}

hsFrame, err = readHSFrame(stream)
if err != nil {
return nil, errors.Wrap(err, "error reading HS frame")
}

if hsFrame.FrameType() != HSFrameTypeDMSGListening {
return nil, ErrWrongHSFrameTypeReceived
}

if atomic.CompareAndSwapInt32(&c.isListening, 0, 1) {
go func() {
if err := c.listen(); err != nil {
c.logger.WithError(err).Error("error listening")
}
}()
}

return c.lm.add(addr, c.stopListening, c.logger)
}

func (c *Client) listen() error {
for {
stream, err := c.session.Accept()
if err != nil {
return errors.Wrap(err, "error accepting stream")
}

hsFrame, err := readHSFrame(stream)
if err != nil {
c.logger.WithError(err).Error("error reading HS frame")
continue
}

if hsFrame.FrameType() != HSFrameTypeDMSGDial {
c.logger.WithError(ErrWrongHSFrameTypeReceived).Error("on listening for Dial")
continue
}

// TODO: handle field get gracefully
port := routing.Port(binary.BigEndian.Uint16(hsFrame[HSFrameHeaderLen+HSFramePKLen:]))
if err := c.lm.addConn(port, stream); err != nil {
c.logger.WithError(err).Error("failed to accept")
continue
}
}
}

func (c *Client) stopListening(port routing.Port) error {
stream, err := c.session.Open()
if err != nil {
return errors.Wrap(err, "error opening stream")
}

addr := routing.Addr{
PubKey: c.PK,
Port: port,
}

hsFrame := NewHSFrameDMSGStopListening(c.pid, addr)
if _, err := stream.Write(hsFrame); err != nil {
return errors.Wrap(err, "error writing HS frame")
}

if err := stream.Close(); err != nil {
return errors.Wrap(err, "error closing stream")
}

return nil
}
4 changes: 4 additions & 0 deletions pkg/app2/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package app2 provides facilities to establish communication
// between a visor node and a skywire application. Intended to
// replace the original `app` module
package app2
143 changes: 143 additions & 0 deletions pkg/app2/hsframe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package app2

import (
"encoding/binary"
"io"

"github.com/pkg/errors"
"github.com/skycoin/skywire/pkg/routing"
)

const (
HSFrameHeaderLen = 3
HSFrameProcIDLen = 2
HSFrameTypeLen = 1
HSFramePKLen = 33
HSFramePortLen = 2
)

// HSFrameType identifies the type of a handshake frame.
type HSFrameType byte

const (
HSFrameTypeDMSGListen HSFrameType = 10 + iota
Darkren marked this conversation as resolved.
Show resolved Hide resolved
HSFrameTypeDMSGListening
HSFrameTypeDMSGDial
HSFrameTypeDMSGAccept
HSFrameTypeStopListening
)

// HSFrame is the data unit for socket connection handshakes between Server and Client.
// It consists of header and body.
//
// Header is a big-endian encoded 3 bytes and is constructed as follows:
// | ProcID (2 bytes) | HSFrameType (1 byte) |
type HSFrame []byte

func newHSFrame(procID ProcID, frameType HSFrameType, bodyLen int) HSFrame {
hsFrame := make(HSFrame, HSFrameHeaderLen+bodyLen)

hsFrame.SetProcID(procID)
hsFrame.SetFrameType(frameType)

return hsFrame
}

func NewHSFrameDMSGListen(procID ProcID, local routing.Addr) HSFrame {
Darkren marked this conversation as resolved.
Show resolved Hide resolved
hsFrame := newHSFrame(procID, HSFrameTypeDMSGListen, HSFramePKLen+HSFramePortLen)

copy(hsFrame[HSFrameHeaderLen:], local.PubKey[:])
binary.BigEndian.PutUint16(hsFrame[HSFrameHeaderLen+HSFramePKLen:], uint16(local.Port))

return hsFrame
}

func NewHSFrameDMSGListening(procID ProcID, local routing.Addr) HSFrame {
hsFrame := newHSFrame(procID, HSFrameTypeDMSGListening, HSFramePKLen+HSFramePortLen)

copy(hsFrame[HSFrameHeaderLen:], local.PubKey[:])
binary.BigEndian.PutUint16(hsFrame[HSFrameHeaderLen+HSFramePKLen:], uint16(local.Port))

return hsFrame
}

func NewHSFrameDSMGDial(procID ProcID, loop routing.Loop) HSFrame {
hsFrame := newHSFrame(procID, HSFrameTypeDMSGDial, 2*HSFramePKLen+2*HSFramePortLen)

copy(hsFrame[HSFrameHeaderLen:], loop.Local.PubKey[:])
binary.BigEndian.PutUint16(hsFrame[HSFrameHeaderLen+HSFramePKLen:], uint16(loop.Local.Port))

copy(hsFrame[HSFrameHeaderLen+HSFramePKLen+HSFramePortLen:], loop.Remote.PubKey[:])
binary.BigEndian.PutUint16(hsFrame[HSFrameHeaderLen+2*HSFramePKLen+HSFramePortLen:], uint16(loop.Remote.Port))

return hsFrame
}

func NewHSFrameDMSGAccept(procID ProcID, loop routing.Loop) HSFrame {
hsFrame := newHSFrame(procID, HSFrameTypeDMSGAccept, 2*HSFramePKLen+2*HSFramePortLen)

copy(hsFrame[HSFrameHeaderLen:], loop.Local.PubKey[:])
binary.BigEndian.PutUint16(hsFrame[HSFrameHeaderLen+HSFramePKLen:], uint16(loop.Local.Port))

copy(hsFrame[HSFrameHeaderLen+HSFramePKLen+HSFramePortLen:], loop.Remote.PubKey[:])
binary.BigEndian.PutUint16(hsFrame[HSFrameHeaderLen+2*HSFramePKLen+HSFramePortLen:], uint16(loop.Remote.Port))

return hsFrame
}

func NewHSFrameDMSGStopListening(procID ProcID, local routing.Addr) HSFrame {
hsFrame := newHSFrame(procID, HSFrameTypeDMSGListen, HSFramePKLen+HSFramePortLen)

copy(hsFrame[HSFrameHeaderLen:], local.PubKey[:])
binary.BigEndian.PutUint16(hsFrame[HSFrameHeaderLen+HSFramePKLen:], uint16(local.Port))

return hsFrame
}

// ProcID gets ProcID from the HSFrame.
func (f HSFrame) ProcID() ProcID {
return ProcID(binary.BigEndian.Uint16(f))
}

// SetProcID sets ProcID for the HSFrame.
func (f HSFrame) SetProcID(procID ProcID) {
Darkren marked this conversation as resolved.
Show resolved Hide resolved
binary.BigEndian.PutUint16(f, uint16(procID))
}

// FrameType gets FrameType from the HSFrame.
func (f HSFrame) FrameType() HSFrameType {
_ = f[HSFrameProcIDLen] // bounds check hint to compiler; see golang.org/issue/14808
return HSFrameType(f[HSFrameProcIDLen])
}

// SetFrameType sets FrameType for the HSFrame.
func (f HSFrame) SetFrameType(frameType HSFrameType) {
_ = f[HSFrameProcIDLen] // bounds check hint to compiler; see golang.org/issue/14808
f[HSFrameProcIDLen] = byte(frameType)
}

func readHSFrame(r io.Reader) (HSFrame, error) {
hsFrame := make(HSFrame, HSFrameHeaderLen)
if _, err := io.ReadFull(r, hsFrame); err != nil {
return nil, errors.Wrap(err, "error reading HS frame header")
}

hsFrame, err := readHSFrameBody(hsFrame, r)
if err != nil {
return nil, errors.Wrap(err, "error reading HS frame body")
}

return hsFrame, nil
}

func readHSFrameBody(hsFrame HSFrame, r io.Reader) (HSFrame, error) {
switch hsFrame.FrameType() {
case HSFrameTypeDMSGListen, HSFrameTypeDMSGListening:
hsFrame = append(hsFrame, make([]byte, HSFramePKLen+HSFramePortLen)...)
case HSFrameTypeDMSGDial, HSFrameTypeDMSGAccept:
hsFrame = append(hsFrame, make([]byte, 2*HSFramePKLen+2*HSFramePortLen)...)
}

_, err := io.ReadFull(r, hsFrame[HSFrameHeaderLen:])
return hsFrame, err
}
69 changes: 69 additions & 0 deletions pkg/app2/hsframe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package app2

import (
"encoding/binary"
"encoding/json"
"testing"

"github.com/stretchr/testify/require"
)

func TestHSFrame(t *testing.T) {
t.Run("ok", func(t *testing.T) {
body := struct {
Test string `json:"test"`
}{
Test: "some string",
}

bodyBytes, err := json.Marshal(body)
require.NoError(t, err)

procID := ProcID(1)
frameType := HSFrameTypeDMSGListen
bodyLen := len(bodyBytes)

hsFrame, err := NewHSFrame(procID, frameType, body)
require.NoError(t, err)

require.Equal(t, len(hsFrame), HSFrameHeaderLen+len(bodyBytes))

gotProcID := ProcID(binary.BigEndian.Uint16(hsFrame))
require.Equal(t, gotProcID, procID)

gotFrameType := HSFrameType(hsFrame[HSFrameProcIDLen])
require.Equal(t, gotFrameType, frameType)

gotBodyLen := int(binary.BigEndian.Uint16(hsFrame[HSFrameProcIDLen+HSFrameTypeLen:]))
require.Equal(t, gotBodyLen, bodyLen)

require.Equal(t, bodyBytes, []byte(hsFrame[HSFrameProcIDLen+HSFrameTypeLen+HSFrameBodyLenLen:]))

gotProcID = hsFrame.ProcID()
require.Equal(t, gotProcID, procID)

gotFrameType = hsFrame.FrameType()
require.Equal(t, gotFrameType, frameType)

gotBodyLen = hsFrame.BodyLen()
require.Equal(t, gotBodyLen, bodyLen)
})

t.Run("fail - too large body", func(t *testing.T) {
body := struct {
Test string `json:"test"`
}{
Test: "some string",
}

for len(body.Test) <= HSFrameMaxBodyLen {
body.Test += body.Test
}

procID := ProcID(1)
frameType := HSFrameTypeDMSGListen

_, err := NewHSFrame(procID, frameType, body)
require.Equal(t, err, ErrHSFrameBodyTooLarge)
})
}
Loading