Skip to content

Commit

Permalink
Move bbolt to separate package
Browse files Browse the repository at this point in the history
Signed-off-by: Chaitanya Munukutla <[email protected]>
  • Loading branch information
c16a committed Aug 24, 2024
1 parent b3fbe07 commit a9ff3ee
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 25 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ A really tiny KV store.
## Building from source

The repository build system outputs binaries for both the server and the CLI. Additionally, OCI containers are also
build for multiple architectures.
built for multiple architectures.

```shell
bazelisk build //...
Expand Down
16 changes: 16 additions & 0 deletions server/bbolt/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "bbolt",
srcs = [
"bbolt.go",
"utils.go",
],
importpath = "github.com/c16a/pouch/server/bbolt",
visibility = ["//visibility:public"],
deps = [
"@com_github_hashicorp_go_msgpack_v2//codec",
"@com_github_hashicorp_raft//:raft",
"@io_etcd_go_bbolt//:bbolt",
],
)
4 changes: 2 additions & 2 deletions server/store/bbolt.go → server/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package bbolt

import (
"errors"
Expand All @@ -17,7 +17,7 @@ var (
dbLogs = []byte("logs")
dbConf = []byte("conf")

// An error indicating a given key does not exist
// ErrKeyNotFound is an error indicating a given key does not exist
ErrKeyNotFound = errors.New("not found")
)

Expand Down
2 changes: 1 addition & 1 deletion server/store/utils.go → server/bbolt/utils.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package bbolt

import (
"bytes"
Expand Down
1 change: 1 addition & 0 deletions server/handlers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ go_library(
"//server/store",
"@com_github_gorilla_websocket//:websocket",
"@com_github_quic_go_quic_go//:quic-go",
"@org_uber_go_zap//:zap",
],
)
16 changes: 13 additions & 3 deletions server/handlers/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,39 @@ import (
"github.com/c16a/pouch/sdk/auth"
"github.com/c16a/pouch/sdk/commands"
"github.com/c16a/pouch/server/store"
"go.uber.org/zap"
"io"
"log"
"net"
"strings"
)

func StartTcpListener(node *store.RaftNode) {
logger := node.GetLogger()
if node.Config.Tcp != nil && node.Config.Tcp.Enabled {
startNetListener(node, "tcp", node.Config.Tcp.Addr)
} else {
logger.Warn("skipping TCP listener")
}
}

func StartUnixListener(node *store.RaftNode) {
logger := node.GetLogger()
if node.Config.Unix != nil && node.Config.Unix.Enabled {
startNetListener(node, "unix", node.Config.Unix.Path)
} else {
logger.Warn("skipping Unix listener")
}
}

func startNetListener(node *store.RaftNode, protocol string, addr string) {
logger := node.GetLogger()

var listener net.Listener
var err error
tlsConfig, err := GetTlsConfig(node.Config)
if err != nil {
log.Fatal(err)
logger.Error("failed to load TLS config", zap.Error(err))
return
} else {
if tlsConfig != nil {
listener, err = tls.Listen(protocol, addr, tlsConfig)
Expand All @@ -38,7 +47,8 @@ func startNetListener(node *store.RaftNode, protocol string, addr string) {
}
}
if err != nil {
log.Fatal(err)
logger.Error("failed to start net listener", zap.String("protocol", protocol), zap.Error(err))
return
}

for {
Expand Down
11 changes: 7 additions & 4 deletions server/handlers/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"github.com/c16a/pouch/sdk/commands"
"github.com/c16a/pouch/server/store"
"github.com/quic-go/quic-go"
"go.uber.org/zap"
"io"
"log"
"strings"
)

func StartQuicListener(node *store.RaftNode) {
logger := node.GetLogger()

if node.Config.Quic == nil || !node.Config.Quic.Enabled {
logger.Warn("skipping quic listener")
return
}

Expand All @@ -22,17 +25,17 @@ func StartQuicListener(node *store.RaftNode) {
var err error
tlsConfig, err := GetTlsConfig(node.Config)
if err != nil {
log.Fatal(err)
logger.Fatal("failed to load TLS config", zap.Error(err))
} else {
if tlsConfig != nil {
listener, err = quic.ListenAddr(quicAddr, tlsConfig, nil)
} else {
log.Fatal("cannot start QUIC listener without TLSConfig")
logger.Fatal("cannot start QUIC listener without TLSConfig")
}
}

if err != nil {
log.Fatal(err)
logger.Error("failed to start net listener", zap.String("protocol", "quic"), zap.Error(err))
}

for {
Expand Down
7 changes: 5 additions & 2 deletions server/handlers/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"github.com/c16a/pouch/sdk/commands"
"github.com/c16a/pouch/server/store"
"github.com/gorilla/websocket"
"go.uber.org/zap"
"log"
"net/http"
"strings"
)

func StartWsListener(node *store.RaftNode) {
logger := node.GetLogger()

if node.Config.Ws == nil || !node.Config.Ws.Enabled {
return
}
Expand All @@ -24,7 +27,7 @@ func StartWsListener(node *store.RaftNode) {

tlsConfig, err := GetTlsConfig(node.Config)
if err != nil {
log.Fatal(err)
logger.Error("failed to load tls config", zap.Error(err))
} else {
if tlsConfig != nil {
server.TLSConfig = tlsConfig
Expand All @@ -34,7 +37,7 @@ func StartWsListener(node *store.RaftNode) {
go func() {
err := server.ListenAndServe()
if err != nil {
log.Fatal(err)
logger.Error("failed to start http server", zap.Error(err))
}
}()
}
Expand Down
5 changes: 1 addition & 4 deletions server/store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "store",
srcs = [
"bbolt.go",
"config.go",
"lists.go",
"node.go",
"peer_join.go",
"sets.go",
"snap_shot.go",
"store.go",
"utils.go",
],
importpath = "github.com/c16a/pouch/server/store",
visibility = ["//visibility:public"],
deps = [
"//sdk/commands",
"//server/bbolt",
"//server/datatypes",
"@com_github_google_uuid//:uuid",
"@com_github_hashicorp_go_msgpack_v2//codec",
"@com_github_hashicorp_raft//:raft",
"@io_etcd_go_bbolt//:bbolt",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapio",
],
Expand Down
20 changes: 12 additions & 8 deletions server/store/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"errors"
"fmt"
"github.com/c16a/pouch/sdk/commands"
"github.com/c16a/pouch/server/bbolt"
"github.com/c16a/pouch/server/datatypes"
"github.com/google/uuid"
"github.com/hashicorp/raft"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
"io"
"log"
"net"
"os"
"path/filepath"
Expand All @@ -37,25 +37,25 @@ type RaftNode struct {
func NewRaftNode(config *NodeConfig, logger *zap.Logger) *RaftNode {
raftPath := config.Cluster.RaftDir
if raftPath == "" {
log.Fatalf("No raft dir specified")
logger.Fatal("no raft dir specified")
}

raftAddr := config.Cluster.Addr
if raftAddr == "" {
log.Fatalf("No raft addr specified")
logger.Fatal("no raft addr specified")
}

nodeId := config.Cluster.NodeID
if nodeId == "" {
id, err := uuid.NewV7()
if err != nil {
log.Fatalf("failed to generate node id: %s", err.Error())
logger.Fatal("failed to generate node id", zap.Error(err))
}
config.Cluster.NodeID = id.String()
}

if err := os.MkdirAll(raftPath, 0700); err != nil {
log.Fatalf("failed to create path for Raft storage: %s", err.Error())
logger.Fatal("failed to create path for Raft storage", zap.Error(err))
}

return &RaftNode{
Expand All @@ -67,6 +67,10 @@ func NewRaftNode(config *NodeConfig, logger *zap.Logger) *RaftNode {
}
}

func (node *RaftNode) GetLogger() *zap.Logger {
return node.logger
}

// Start opens the store. If enableSingle is set, and there are no existing peers,
// then this node becomes the first node, and therefore leader, of the cluster.
// localID should be the server identifier for this node.
Expand Down Expand Up @@ -94,7 +98,7 @@ func (node *RaftNode) Start() error {
}

// Create the log store and stable store.
boltDB, err := NewBoltStore(filepath.Join(node.RaftDir, "raft.db"))
boltDB, err := bbolt.NewBoltStore(filepath.Join(node.RaftDir, "raft.db"))
if err != nil {
return fmt.Errorf("new bbolt store: %s", err)
}
Expand Down Expand Up @@ -162,7 +166,7 @@ func (node *RaftNode) ApplyCmd(cmd commands.Command) string {
case commands.SUnion:
return node.SUnion(cmd.(*commands.SUnionCommand))
default:
return (&commands.ErrorResponse{Err: errors.New("unknown command")}).String()
return (&commands.ErrorResponse{Err: commands.ErrInvalidCommand}).String()
}
}

Expand Down Expand Up @@ -409,7 +413,7 @@ func (node *RaftNode) dialPeer(peerAddr string) error {
func (node *RaftNode) startPeeringServer() error {
peeringPort := node.Config.Cluster.Addr
if peeringPort == "" {
log.Fatalf("Raft addr not specified")
node.logger.Fatal("raft addr not specified")
}

udpAddr, err := net.ResolveUDPAddr("udp", peeringPort)
Expand Down

0 comments on commit a9ff3ee

Please sign in to comment.