diff --git a/README.md b/README.md index c4219fa..be1ec52 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ A tiny Websocket-based event broker. - [x] Grouped subscriptions - [ ] Offline messages - [ ] Clustering -- [x] QoS (0, 1) +- [x] QoS (0, 1, 2) - [x] Topics & Patterns ### Subscriptions diff --git a/events/pubcomp.go b/events/pubcomp.go new file mode 100644 index 0000000..34c5881 --- /dev/null +++ b/events/pubcomp.go @@ -0,0 +1,8 @@ +package events + +const PubComp = "pubcomp" + +type PubCompEvent struct { + Kind string `json:"kind"` + PacketId string `json:"packet_id"` +} diff --git a/events/pubrec.go b/events/pubrec.go new file mode 100644 index 0000000..52bd6b9 --- /dev/null +++ b/events/pubrec.go @@ -0,0 +1,8 @@ +package events + +const PubRec = "pubrec" + +type PubRecEvent struct { + Kind string `json:"kind"` + PacketId string `json:"packet_id"` +} diff --git a/events/pubrel.go b/events/pubrel.go new file mode 100644 index 0000000..bac5810 --- /dev/null +++ b/events/pubrel.go @@ -0,0 +1,8 @@ +package events + +const PubRel = "pubrel" + +type PubRelEvent struct { + Kind string `json:"kind"` + PacketId string `json:"packet_id"` +} diff --git a/handlers/base.go b/handlers/base.go index 059f953..7e7273f 100644 --- a/handlers/base.go +++ b/handlers/base.go @@ -10,6 +10,8 @@ func HandleMessage(client *broker.ConnectedClient, broker *broker.Broker, messag switch kind { case events.Pub: return handlePublish(message, client, broker) + case events.PubRel: + return handlePubrel(message, client) case events.Sub: return handleSubscribe(message, client) case events.Unsub: diff --git a/handlers/pub.go b/handlers/pub.go index 5eba346..ef044cc 100644 --- a/handlers/pub.go +++ b/handlers/pub.go @@ -20,6 +20,13 @@ func handlePublish(message []byte, client *broker.ConnectedClient, broker *broke } client.WriteInterface(pubAckEvent) } + if event.QoS == 2 { + pubRecEvent := &events.PubRecEvent{ + Kind: events.PubRec, + PacketId: uuid.NewString(), + } + client.WriteInterface(pubRecEvent) + } go broker.Broadcast(event) return nil } diff --git a/handlers/pubrel.go b/handlers/pubrel.go new file mode 100644 index 0000000..1f1e715 --- /dev/null +++ b/handlers/pubrel.go @@ -0,0 +1,21 @@ +package handlers + +import ( + "encoding/json" + "github.com/c16a/microq/broker" + "github.com/c16a/microq/events" +) + +func handlePubrel(message []byte, client *broker.ConnectedClient) error { + var event events.PubRelEvent + err := json.Unmarshal(message, &event) + if err != nil { + return err + } + pubCompEvent := &events.PubCompEvent{ + Kind: events.PubComp, + PacketId: event.PacketId, + } + client.WriteInterface(pubCompEvent) + return nil +}