Skip to content

Commit

Permalink
added TCP list namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
johnpatek committed Jun 24, 2024
1 parent f5f3d42 commit 5ae4d26
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 27 deletions.
29 changes: 25 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"context"
"errors"
"fmt"
"github.com/mailsac/dracula/client/serverpool"
"github.com/mailsac/dracula/client/waitingmessage"
"github.com/mailsac/dracula/protocol"
"github.com/mailsac/dracula/server/rawmessage"
"io/ioutil"
"log"
"math/rand"
Expand All @@ -19,6 +15,11 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/mailsac/dracula/client/serverpool"
"github.com/mailsac/dracula/client/waitingmessage"
"github.com/mailsac/dracula/protocol"
"github.com/mailsac/dracula/server/rawmessage"
)

var (
Expand Down Expand Up @@ -407,6 +408,26 @@ func (c *Client) CountServer() (int, error) {
return int(output), err
}

func (c *Client) ListNamespaces() ([]string, error) {
var err error
messageID := c.makeMessageID()
wg := new(sync.WaitGroup)
namespaces := ""
cb := func(b []byte, e error) {
defer wg.Done()
if e != nil {
err = e
return
}
namespaces = string(b)
}
wg.Add(1)
sendPacket := protocol.NewPacketFromParts(protocol.CmdTCPOnlyNamespaces, messageID, []byte{}, []byte{}, c.preSharedKey)
c.sendOrCallbackErr(sendPacket, cb)
wg.Wait()
return strings.Split(namespaces, "\n"), err
}

func (c *Client) Put(namespace, value string) error {
messageID := c.makeMessageID()
var wg sync.WaitGroup
Expand Down
37 changes: 34 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package client

import (
"github.com/mailsac/dracula/protocol"
"github.com/mailsac/dracula/server"
"github.com/stretchr/testify/assert"
"math"
"sync"
"testing"
"time"

"github.com/mailsac/dracula/protocol"
"github.com/mailsac/dracula/server"
"github.com/stretchr/testify/assert"
)

func TestClient_Auth(t *testing.T) {
Expand Down Expand Up @@ -163,3 +164,33 @@ func TestClient_TcpKeyMatch(t *testing.T) {
assert.ElementsMatch(t, []string{}, matched)
})
}

func TestClient_TcpListNamespaces(t *testing.T) {
t.Run("returns a list of namespaces", func(t *testing.T) {
secret := "asdf-!!?!|asdf"
s := server.NewServer(60, secret)
s.DebugEnable("9011")
err := s.Listen(9011, 9011)
if err != nil {
t.Fatal(err)
}
defer s.Close()

cl := NewClient(Config{RemoteUDPIPPortList: "127.0.0.1:9011", RemoteTCPIPPortList: "127.0.0.1:9011", Timeout: time.Second * 2, PreSharedKey: secret})
assert.NoError(t, cl.Listen(9012))
defer cl.Close()

insertValues := map[string]string{
"namespace0": "key0",
"namespace1": "key1",
}

for namespace, value := range insertValues {
assert.NoError(t, cl.Put(namespace, value))
}

namespaces, err := cl.ListNamespaces()
assert.Len(t, namespaces, 2)
assert.NoError(t, err) // out of order
})
}
18 changes: 17 additions & 1 deletion cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package main
import (
"flag"
"fmt"
"github.com/mailsac/dracula/client"
"os"
"time"

"github.com/mailsac/dracula/client"
)

var (
Expand All @@ -15,6 +16,7 @@ var (
count = flag.Bool("count", false, "Mode: Count items at entry key")
put = flag.Bool("put", false, "Mode: Put item at entry key")
cmdKeys = flag.Bool("keys", false, "Mode: list keys matching this pattern (TCP)")
namespaces = flag.Bool("namespaces", false, "Mode: list namespaces")
secret = flag.String("s", "", "Optional pre-shared auth secret if not using env var DRACULA_SECRET")
localPort = flag.Int("p", 3510, "Local client port to receive responses on")
timeoutSecs = flag.Int64("t", 6, "Request timeout in seconds")
Expand Down Expand Up @@ -121,6 +123,20 @@ func main() {

os.Exit(0)
}
if *namespaces {
namespaceList, err := c.ListNamespaces()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(namespaceList) > 0 {
for index, namespace := range namespaceList {
fmt.Printf("%d) %s\n", index+1, namespace)
}
} else {
fmt.Println("(no namespaces)")
}
}

fmt.Println("no command matched")
os.Exit(1)
Expand Down
35 changes: 19 additions & 16 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,39 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/OneOfOne/xxhash"
"log"
"math"
"net"
"strings"

"github.com/OneOfOne/xxhash"
)

const (
PacketSize = 1500
NamespaceSize = 64
DataValueSize = 1419
PacketSize int = 1500
NamespaceSize int = 64
DataValueSize int = 1419

CmdCount byte = 'C'
CmdPut byte = 'P'
CmdPutReplicate byte = 'R'
CmdCountNamespace byte = 'N'
CmdCountServer byte = 'S'

CmdTCPOnlyKeys byte = 'K'
CmdTCPOnlyValues byte = 'V'
CmdTCPOnlyStore byte = 'T'
CmdTCPOnlyRetrieve byte = 'I'
CmdTCPOnlyKeys byte = 'K'
CmdTCPOnlyValues byte = 'V'
CmdTCPOnlyStore byte = 'T'
CmdTCPOnlyRetrieve byte = 'I'
CmdTCPOnlyNamespaces byte = 'L'

// ResError is a Cmd
ResError byte = 'E'

space byte = ' '
spaceIndex1 = 1
spaceIndex2 = 10
spaceIndex3 = 15
spaceIndex4 = 80
spaceIndex1 int = 1
spaceIndex2 int = 10
spaceIndex3 int = 15
spaceIndex4 int = 80
)

var (
Expand All @@ -56,7 +58,7 @@ func IsRequestCmd(c byte) bool {
}

func IsTcpOnlyCmd(c byte) bool {
return c == CmdTCPOnlyKeys || c == CmdTCPOnlyRetrieve || c == CmdTCPOnlyValues || c == CmdTCPOnlyStore
return c == CmdTCPOnlyKeys || c == CmdTCPOnlyRetrieve || c == CmdTCPOnlyValues || c == CmdTCPOnlyStore || c == CmdTCPOnlyNamespaces
}

// IsResponseCmd indicates if the client should accept this as a command
Expand Down Expand Up @@ -106,7 +108,8 @@ func (p *Packet) DataValueString() string {
}

// ParsePacket parses a packet like:
// [Command char][space][xxhash of pre shared key + id + ns + data][space][Message ID uint32][space][Namespace 64 bytes][space][data remaining bytes]
//
// [Command char][space][xxhash of pre shared key + id + ns + data][space][Message ID uint32][space][Namespace 64 bytes][space][data remaining bytes]
//
// The MTU of 1500 is the maximum allowed packet size. That means the data key can only be 1419
// bytes max.
Expand All @@ -125,7 +128,7 @@ func ParsePacket(buf []byte) (*Packet, error) {
idBytes := buf[spaceIndex2+1 : spaceIndex3]
nsBytes := buf[spaceIndex3+1 : spaceIndex4]
// allows shorter packet to be turned into 1500 byte total packet
endAt := int(math.Min(float64(len(buf)), PacketSize))
endAt := int(math.Min(float64(len(buf)), float64(PacketSize)))
messageIData := buf[spaceIndex4+1 : endAt]
rightSizeData := *PadRight(&messageIData, DataValueSize)
p := Packet{
Expand Down Expand Up @@ -173,7 +176,7 @@ func ParsePacket(buf []byte) (*Packet, error) {
}

// bytes formats the packet for transport. The first 8 bytes are a header.
//// The last byte should be a line break. The data is a UTF-8 string.
// // The last byte should be a line break. The data is a UTF-8 string.
func (p *Packet) bytes() []byte {
//fmt.Println("Bytes() message id", p.MessageID, "|")
//fmt.Println("Bytes() Namespace", p.Namespace, "|", len(p.Namespace))
Expand Down
13 changes: 10 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package server

import (
"errors"
"github.com/mailsac/dracula/protocol"
"github.com/mailsac/dracula/server/rawmessage"
"github.com/mailsac/dracula/store"
"io/ioutil"
"log"
"math"
Expand All @@ -14,6 +11,10 @@ import (
"runtime"
"strconv"
"strings"

"github.com/mailsac/dracula/protocol"
"github.com/mailsac/dracula/server/rawmessage"
"github.com/mailsac/dracula/store"
)

const MinimumExpirySecs = 2
Expand Down Expand Up @@ -298,6 +299,12 @@ func (s *Server) worker(messages <-chan *rawmessage.RawMessage) {
resPacket = protocol.NewPacketFromParts(protocol.CmdTCPOnlyKeys, packet.MessageIDBytes, packet.Namespace, []byte(strings.Join(matchedKeys, "\n")), s.preSharedKey)
respond()
break
case protocol.CmdTCPOnlyNamespaces:
namespaces := s.store.Namespaces()
s.log.Println("Namespaces", packet.NamespaceString(), packet.DataValueString(), namespaces)
resPacket = protocol.NewPacketFromParts(protocol.CmdTCPOnlyNamespaces, packet.MessageIDBytes, packet.Namespace, []byte(strings.Join(namespaces, "\n")), s.preSharedKey)
respond()
break
default:
resPacket = protocol.NewPacketFromParts(protocol.ResError, packet.MessageIDBytes, packet.Namespace, []byte("unknown_command_"+string(packet.Command)), s.preSharedKey)
respond()
Expand Down

0 comments on commit 5ae4d26

Please sign in to comment.