Skip to content

Commit

Permalink
improve command handling, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Jul 10, 2019
1 parent 2ef365b commit e76adfd
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 96 deletions.
71 changes: 0 additions & 71 deletions cmd/command.go

This file was deleted.

64 changes: 54 additions & 10 deletions cmd/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,37 @@ import (
"context"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/ctron/hot/pkg/command"

"github.com/google/uuid"

"github.com/ctron/hot/pkg/utils"
"pack.ag/amqp"
)

func createCommandReader() command.Reader {
switch strings.ToLower(commandReader) {
case "prefill":
return &command.PreFillReader{
Prompt: os.Stdout,
Stream: os.Stdin,
Encoder: getEncoder(),
}
case "ondemand":
return &command.OnDemandReader{
Prompt: os.Stdout,
Stream: os.Stdin,
Encoder: getEncoder(),
}
default:
panic(fmt.Errorf("unknown command reader: %s", commandReader))
}
}

func consume(messageType string, uri string, tenant string) error {

fmt.Printf("Consuming %s from %s ...", messageType, uri)
Expand Down Expand Up @@ -77,6 +100,19 @@ func consume(messageType string, uri string, tenant string) error {
fmt.Printf("Consumer running, press Ctrl+C to stop...")
fmt.Println()

// set up command reader

reader := createCommandReader()
if err := reader.Start(); err != nil {
return err
}
defer func() {
if err := reader.Stop(); err != nil {
}
}()

// run loop

for {
// Receive next message
msg, err := receiver.Receive(ctx)
Expand All @@ -91,14 +127,14 @@ func consume(messageType string, uri string, tenant string) error {

utils.PrintMessage(msg)
if processCommands {
if err := processCommand(session, tenant, msg); err != nil {
if err := processCommand(session, reader, tenant, msg); err != nil {
log.Print("Failed to send command: ", err)
}
}
}
}

func processCommand(session *amqp.Session, tenant string, msg *amqp.Message) error {
func processCommand(session *amqp.Session, reader command.Reader, tenant string, msg *amqp.Message) error {
ttd, ok := msg.ApplicationProperties["ttd"].(int32)

if !ok {
Expand All @@ -114,11 +150,10 @@ func processCommand(session *amqp.Session, tenant string, msg *amqp.Message) err
return nil
}

reader := &StdinCommandReader{}

fmt.Printf("Enter command response (%v s): ", ttd)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ttd)*time.Second)
defer cancel()

cmd := reader.ReadCommand(time.Duration(ttd) * time.Second)
cmd := reader.Read(ctx, deviceId)

if cmd == nil {
fmt.Print("Timeout!")
Expand All @@ -144,12 +179,21 @@ func processCommand(session *amqp.Session, tenant string, msg *amqp.Message) err
}
}()

// prepare payload
var payload []byte
if cmd.Payload == nil {
payload = make([]byte, 0)
} else {
payload = cmd.Payload.Bytes()
}

// prepare message

send := amqp.NewMessage([]byte(*cmd))
send := amqp.NewMessage(payload)
send.Properties = &amqp.MessageProperties{
Subject: "CMD",
To: "control/" + tenant + "/" + deviceId,
Subject: cmd.Command,
ContentType: cmd.ContentType,
To: "control/" + tenant + "/" + deviceId,
}

// set message id
Expand All @@ -162,7 +206,7 @@ func processCommand(session *amqp.Session, tenant string, msg *amqp.Message) err

// send message

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := sender.Send(ctx, send); err != nil {
Expand Down
32 changes: 23 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import (
"crypto/tls"
"log"

"github.com/ctron/hot/pkg/encoding"

"github.com/spf13/cobra"
)

var insecure bool
var contentType string = "text/plain"
var contentTypeFlag string = "text/plain"
var commandReader string = ""
var processCommands bool = false
var ttd uint32 = 0
var qos uint8 = 0
Expand All @@ -32,6 +35,10 @@ func createTlsConfig() *tls.Config {
}
}

func getEncoder() encoding.PayloadEncoder {
return encoding.CreateEncoder(contentTypeFlag)
}

func main() {

cmdConsume := &cobra.Command{
Expand All @@ -56,7 +63,14 @@ func main() {
Short: "Publish via HTTP",
Args: cobra.ExactArgs(7),
Run: func(cmd *cobra.Command, args []string) {
if err := publishHttp(args[0], args[1], args[2], args[3], args[4], args[5], contentType, args[6]); err != nil {
if err := publishHttp(HttpPublishInformation{
MessageType: args[0],
URI: args[1],
Tenant: args[2],
DeviceId: args[3],
AuthenticationId: args[4],
Password: args[5],
}, getEncoder(), args[6]); err != nil {
log.Fatal("Failed to publish via HTTP:", err)
}
},
Expand All @@ -66,25 +80,25 @@ func main() {

// publish flags

cmdPublish.PersistentFlags().StringVar(&contentType, "content-type", "text/plain", "content type")

// publish http flags

cmdPublishHttp.Flags().Uint32VarP(&ttd, "ttd", "t", 0, "Wait for command")
cmdPublishHttp.Flags().Uint32VarP(&ttd, "ttd", "c", 0, "Wait for command")
cmdPublishHttp.Flags().Uint8VarP(&qos, "qos", "q", 0, "Quality of service")

// consume flags

cmdConsume.Flags().BoolVarP(&processCommands, "command", "c", false, "Enable commands")
cmdConsume.Flags().StringVarP(&commandReader, "reader", "r", "prefill", "Command reader type (possible values: [ondemand, prefill]")

// root command

var rootCmd = &cobra.Command{Use: "hot"}
rootCmd.AddCommand(cmdConsume, cmdPublish)
var cmdRoot = &cobra.Command{Use: "hot"}
cmdRoot.AddCommand(cmdConsume, cmdPublish)

rootCmd.PersistentFlags().BoolVar(&insecure, "insecure", false, "Skip TLS validation")
cmdRoot.PersistentFlags().StringVarP(&contentTypeFlag, "content-type", "t", "text/plain", "Content type of the payload, may be a MIME type or 'hex'")
cmdRoot.PersistentFlags().BoolVar(&insecure, "insecure", false, "Skip TLS validation")

if err := rootCmd.Execute(); err != nil {
if err := cmdRoot.Execute(); err != nil {
println(err.Error())
}
}
26 changes: 20 additions & 6 deletions cmd/publish_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,34 @@ import (
neturl "net/url"
"strconv"

"github.com/ctron/hot/pkg/encoding"
"github.com/ctron/hot/pkg/utils"
)

func publishHttp(messageType string, uri string, tenant string, deviceId string, authId string, password string, contentType string, payload string) error {
type HttpPublishInformation struct {
MessageType string
URI string
Tenant string
DeviceId string

url, err := neturl.Parse(uri)
AuthenticationId string
Password string
}

func publishHttp(info HttpPublishInformation, encoder encoding.PayloadEncoder, payload string) error {

url, err := neturl.Parse(info.URI)
if err != nil {
return err
}

url.Path = url.Path + neturl.PathEscape(messageType) + "/" + neturl.PathEscape(tenant) + "/" + deviceId
url.Path = url.Path + neturl.PathEscape(info.MessageType) + "/" + neturl.PathEscape(info.Tenant) + "/" + info.DeviceId
fmt.Println("URL:", url)

buf := bytes.NewBufferString(payload)
buf, err := encoder.Encode(payload)
if err != nil {
return err
}

tr := &http.Transport{
TLSClientConfig: createTlsConfig(),
Expand All @@ -45,15 +59,15 @@ func publishHttp(messageType string, uri string, tenant string, deviceId string,
return err
}

request.SetBasicAuth(authId+"@"+tenant, password)
request.SetBasicAuth(info.AuthenticationId+"@"+info.Tenant, info.Password)

if qos > 0 {
request.Header.Set("QoS-Level", strconv.Itoa(int(qos)))
}
if ttd > 0 {
request.Header.Set("hono-ttd", strconv.FormatUint(uint64(ttd), 10))
}
request.Header.Set("Content-Type", contentType)
request.Header.Set("Content-Type", encoder.GetMimeType())

response, err := client.Do(request)
if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions pkg/async/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package async

import (
"bufio"
"io"
)

func CallbackReader(reader io.Reader, consumer func(*string, error) bool) {

r := bufio.NewReader(reader)

go func() {
for {
result, _, err := r.ReadLine()
var rc bool
if err != nil {
rc = consumer(nil, err)
} else {
s := string(result)
rc = consumer(&s, nil)
}
if !rc {
return
}
}
}()
}

func ChannelReader(reader io.Reader, data chan string) {

CallbackReader(reader, func(s *string, e error) bool {
if e != nil {
close(data)
return false
} else {
data <- *s
return true
}
})

}
Loading

0 comments on commit e76adfd

Please sign in to comment.