Skip to content

Commit

Permalink
Implement data connection on main TCP port.
Browse files Browse the repository at this point in the history
Known bugs:

- The example app will find itself and will try to connect to the zeroed out port.
  • Loading branch information
icedream committed Jan 6, 2021
1 parent c1336b1 commit 6282fa4
Show file tree
Hide file tree
Showing 15 changed files with 807 additions and 239 deletions.
22 changes: 21 additions & 1 deletion cmd/stagelinq-discover/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func main() {
panic(err)
}

listener.Announce()

deadline := time.After(timeout)
foundDevices := []*stagelinq.Device{}

Expand All @@ -35,7 +37,7 @@ discoveryLoop:
case <-deadline:
break discoveryLoop
default:
device, deviceState, err := listener.Discover()
device, deviceState, err := listener.Discover(timeout)
if err != nil {
log.Printf("WARNING: %s", err.Error())
continue discoveryLoop
Expand All @@ -52,6 +54,24 @@ discoveryLoop:
}
foundDevices = append(foundDevices, device)
log.Printf("%s %q %q %q", device.IP.String(), device.Name, device.SoftwareName, device.SoftwareVersion)

// discover provided services
log.Println("\tattempting to connect to this device…")
deviceConn, err := device.Connect(listener.Token(), []*stagelinq.Service{})
if err != nil {
log.Printf("WARNING: %s", err.Error())
} else {
defer deviceConn.Close()
log.Println("\trequesting device data services…")
services, err := deviceConn.RequestServices()
if err != nil {
log.Printf("WARNING: %s", err.Error())
} else {
for _, service := range services {
log.Printf("\toffers %s at port %d", service.Name, service.Port)
}
}
}
}
}

Expand Down
29 changes: 27 additions & 2 deletions device.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,39 @@ const (
// Device presents information about a discovered StagelinQ device on the network.
type Device struct {
port uint16
token [16]byte
token Token

IP net.IP
Name string
SoftwareName string
SoftwareVersion string
}

// dial starts a TCP connection with the device.
func (device *Device) dial() (conn net.Conn, err error) {
ip := device.IP
port := device.port

conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{
IP: ip,
Port: int(port),
})

return
}

// Connect starts a new main connection with the device.
// You need to pass the StagelinQ token announced for your own device.
// You also need to pass services you want to provide; if you don't have any, pass an empty array.
func (device *Device) Connect(token Token, offeredServices []*Service) (conn *MainConnection, err error) {
tcpConn, err := device.dial()
if err != nil {
return
}
conn, err = newMainConnection(tcpConn, token, device.token, offeredServices)
return
}

// IsEqual checks if this device has the same address and values as the other given device.
func (device *Device) IsEqual(anotherDevice *Device) bool {
return device.token == anotherDevice.token &&
Expand All @@ -33,7 +58,7 @@ func (device *Device) IsEqual(anotherDevice *Device) bool {
device.SoftwareVersion == anotherDevice.SoftwareVersion
}

func newDeviceFromDiscovery(addr *net.UDPAddr, msg *DiscoveryMessage) *Device {
func newDeviceFromDiscovery(addr *net.UDPAddr, msg *discoveryMessage) *Device {
return &Device{
port: msg.Port,
token: msg.Token,
Expand Down
66 changes: 0 additions & 66 deletions device_conn.go

This file was deleted.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module git.icedream.tech/icedream/stagelinq-receiver
go 1.15

require (
github.com/hashicorp/go-multierror v1.1.0
github.com/stretchr/testify v1.6.1
golang.org/x/text v0.3.4
)
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
Expand Down
12 changes: 12 additions & 0 deletions go_hacks_go1.16.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//+build go1.16

package stagelinq

import (
"errors"
"net"
)

func checkErrIsNetClosed(err error) bool {
return errors.Is(err, net.ErrClosed)
}
7 changes: 7 additions & 0 deletions go_hacks_notgo1.16.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//+build !go1.16

package stagelinq

func checkErrIsNetClosed(err error) bool {
return err.Error() == "use of closed network connection"
}
43 changes: 22 additions & 21 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,33 @@ type Listener struct {
softwareVersion string
name string
packetConn net.PacketConn
token [16]byte
timeout time.Duration
token Token
port uint16
}

// Token returns our token that is being announced to the StagelinQ network.
// Use this token for further communication with services on other devices.
func (l *Listener) Token() Token {
return l.token
}

// Close shuts down the listener.
func (l *Listener) Close() error {
// Send two exit messages just to make sure (apps I tested seem to do that)
l.announce(discovererExit)
l.announce(discovererExit)
return l.packetConn.Close()
}

// Announce announces this StagelinQ listener to the network.
// This function should be called before actually listening in for devices to allow them to pick up our token for communication immediately.
func (l *Listener) Announce() error {
return l.announce(DiscovererHowdy)
return l.announce(discovererHowdy)
}

func (l *Listener) announce(action DiscovererMessageAction) (err error) {
func (l *Listener) announce(action discovererMessageAction) (err error) {
// TODO - optimization: cache the built message because it will be sent repeatedly?
m := &DiscoveryMessage{
m := &discoveryMessage{
Source: l.name,
SoftwareName: l.softwareName,
SoftwareVersion: l.softwareVersion,
Expand All @@ -85,14 +93,14 @@ func (l *Listener) announce(action DiscovererMessageAction) (err error) {
}

// Discover listens for any StagelinQ devices announcing to the network.
// If no device is found within the configured timeout or any non-StagelinQ message has been received, nil is returned for the device.
// If no device is found within the given timeout or any non-StagelinQ message has been received, nil is returned for the device.
// If a device has been discovered before, the returned device object is not going to be the same as when the device was previously discovered.
// Use device.IsEqual for such comparison.
func (l *Listener) Discover() (device *Device, deviceState DeviceState, err error) {
func (l *Listener) Discover(timeout time.Duration) (device *Device, deviceState DeviceState, err error) {
b := make([]byte, 8*1024)

if l.timeout != 0 {
l.packetConn.SetReadDeadline(time.Now().Add(l.timeout))
if timeout != 0 {
l.packetConn.SetReadDeadline(time.Now().Add(timeout))
}

n, src, err := l.packetConn.ReadFrom(b)
Expand All @@ -106,25 +114,19 @@ func (l *Listener) Discover() (device *Device, deviceState DeviceState, err erro
return
}

// do first bytes match expected magic bytes?
if !bytes.Equal(b[0:4], magicBytes) {
err = ErrInvalidMessageReceived
return
}

// decode message
r := bytes.NewReader(b[4:n])
m := new(DiscoveryMessage)
r := bytes.NewReader(b)
m := new(discoveryMessage)
if err = m.readFrom(r); err != nil {
return
}

device = newDeviceFromDiscovery(src.(*net.UDPAddr), m)

switch m.Action {
case DiscovererExit:
case discovererExit:
deviceState = DeviceLeaving
case DiscovererHowdy:
case discovererHowdy:
deviceState = DevicePresent
default:
err = ErrInvalidDiscovererActionReceived
Expand All @@ -139,7 +141,7 @@ func Listen() (listener *Listener, err error) {
return ListenWithConfiguration(nil)
}

var zeroToken = [16]byte{}
var zeroToken = Token{}

// ListenWithConfiguration sets up a StagelinQ listener with the given configuration.
func ListenWithConfiguration(listenerConfig *ListenerConfiguration) (listener *Listener, err error) {
Expand Down Expand Up @@ -176,7 +178,6 @@ func ListenWithConfiguration(listenerConfig *ListenerConfiguration) (listener *L
packetConn: packetConn,
softwareName: listenerConfig.SoftwareName,
softwareVersion: listenerConfig.SoftwareVersion,
timeout: listenerConfig.DiscoveryTimeout,
token: token,
}

Expand Down
2 changes: 1 addition & 1 deletion listener_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ type ListenerConfiguration struct {
SoftwareVersion string

// Token is used as part of announcements and main data communication. It is currently recommended to leave this empty.
Token [16]byte
Token Token
}
66 changes: 0 additions & 66 deletions main_conn.go

This file was deleted.

Loading

0 comments on commit 6282fa4

Please sign in to comment.