Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
raspi committed Jul 17, 2022
1 parent bad96c6 commit 402c7c3
Show file tree
Hide file tree
Showing 16 changed files with 641 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions .idea/runConfigurations/run_client.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/runConfigurations/run_server.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
# jumiks
AF_UNIX socket

Local AF_UNIX unix socket domain server and client for sending
messages from one server to multiple clients.

See [example](example) directory for example.
53 changes: 53 additions & 0 deletions example/cmd/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"fmt"
"github.com/raspi/jumiks/pkg/client"
"os"
"time"
)

type ExampleClient struct {
c *client.Client
delay time.Duration
}

func New(name string, errors chan error) (exclient *ExampleClient, err error) {
exclient = &ExampleClient{
delay: time.Millisecond * 500,
}
// Bind to ExampleClient.on_msg
exclient.c, err = client.New(name, exclient.on_msg, errors)
if err != nil {
return nil, err
}

return exclient, nil
}

func (c *ExampleClient) Listen() {
c.c.Listen()
}

// on_msg gets called every time there's a new message from client
func (c *ExampleClient) on_msg(b []byte) {
fmt.Printf(`got %q`+"\n", string(b))
time.Sleep(c.delay)
c.delay += time.Millisecond * 50
}

func main() {
errors := make(chan error)

c, err := New("@test", errors)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, `error: %v`, err)
os.Exit(1)
}

go c.Listen()

for err := range errors {
fmt.Printf(`got error: %v`, err)
}
}
38 changes: 38 additions & 0 deletions example/cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"github.com/raspi/jumiks/pkg/server"
error2 "github.com/raspi/jumiks/pkg/server/error"
"time"
)

func main() {
var errors chan error2.Error

l, err := server.New("@test", 1000, errors)
if err != nil {
panic(err)
}

go l.Listen()

counter := 0

for {
select {
case err := <-errors:
fmt.Printf(`got error %v`+"\n", err)
default:
// do nothing

msg := fmt.Sprintf("hello, world %d!", counter)
l.SendToAll([]byte(msg))
fmt.Printf(`sent %q`+"\n", msg)

time.Sleep(time.Millisecond * 500)
counter++
}
}

}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/raspi/jumiks

go 1.18
10 changes: 10 additions & 0 deletions jumiks.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
153 changes: 153 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package client

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"github.com/raspi/jumiks/pkg/server"
"github.com/raspi/jumiks/pkg/server/header"
"io"
"net"
)

type Client struct {
conn *net.UnixConn
hfn HandlerFunc
errorch chan error
}

type HandlerFunc func(b []byte)

// New creates a client which is connected to server.Server
func New(name string, handler HandlerFunc, errors chan error) (c *Client, e error) {
addr, err := net.ResolveUnixAddr(server.ConnType, name)
if err != nil {
return nil, err
}

conn, err := net.DialUnix(addr.Network(), nil, addr)
if err != nil {
return nil, err
}

if err = handshake(conn); err != nil {
return nil, err
}

c = &Client{
conn: conn,
hfn: handler,
errorch: errors,
}

return c, nil
}

func (c *Client) Listen() {
defer c.conn.Close()

buffer := make([]byte, 1048576)

for {
// Blocking on read
rb, err := c.conn.Read(buffer)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

c.errorch <- err
continue
}

if rb == 0 {
continue
}

// Read header
buf := bytes.NewBuffer(buffer[:rb])
var sheader header.MessageHeaderFromServer

err = binary.Read(buf, binary.LittleEndian, &sheader)
if err != nil {
c.errorch <- err
break
}

// handle message
c.handleMsg(buf.Bytes())
buf.Reset()

// Send acknowledged header
var sndbuf bytes.Buffer

ackheader := header.MessageHeaderFromClient{
MessageHeader: header.MessageHeader{
PacketId: sheader.PacketId, // processed packet ID
},
}

err = binary.Write(&sndbuf, binary.LittleEndian, ackheader)
if err != nil {
c.errorch <- err
break
}

wb, err := c.conn.Write(sndbuf.Bytes())
if err != nil {
c.errorch <- err
break
}

if wb == 0 {
err = c.conn.Close()
if err != nil {
c.errorch <- err
return
}

return
}

if wb != sndbuf.Len() {
err = c.conn.Close()
if err != nil {
c.errorch <- err
}

return
}
}

}

// handleMsg sends received message to end-user
func (c *Client) handleMsg(b []byte) {
c.hfn(b)
}

// handshake determines if both server.Server and Client are speaking the same protocol
func handshake(conn *net.UnixConn) (err error) {
var serverHs header.Handshake
err = binary.Read(conn, binary.LittleEndian, &serverHs)
if err != nil {
return err
}

shake := header.Handshake{Version: header.DefaultVersion}
err = binary.Write(conn, binary.LittleEndian, shake)
if err != nil {
return err
}

if serverHs.Version.Major != shake.Version.Major {
return fmt.Errorf(`ver mismatch, major`)
}

if serverHs.Version.Minor != shake.Version.Minor {
return fmt.Errorf(`ver mismatch, minor`)
}

return nil
}
23 changes: 23 additions & 0 deletions pkg/server/error/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package error

import "fmt"

var _ error = Error{}

type Error struct {
wrapped error
}

func (e Error) Error() string {
return fmt.Sprintf(`%v`, e.wrapped)
}

func (e Error) String() string {
return fmt.Sprintf(`%v`, e.wrapped)
}

func New(wrapped error) (e Error) {
return Error{
wrapped: wrapped,
}
}
30 changes: 30 additions & 0 deletions pkg/server/header/header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package header

type MessageHeader struct {
PacketId uint64
}

type MessageHeaderFromServer struct {
MessageHeader
}

type MessageHeaderFromClient struct {
MessageHeader
}

// Version for connection handshake
type Version struct {
Major uint16
Minor uint16
Patch uint16
}

type Handshake struct {
Version Version
}

var DefaultVersion = Version{
Major: 1,
Minor: 0,
Patch: 0,
}
Loading

0 comments on commit 402c7c3

Please sign in to comment.