Skip to content

Commit

Permalink
split up and add publish command
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Jun 28, 2019
1 parent 9cb7c85 commit 2b28840
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 79 deletions.
92 changes: 92 additions & 0 deletions cmd/consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*******************************************************************************
* Copyright (c) 2019 Red Hat Inc
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package main

import (
"context"
"fmt"
"log"
"time"

"github.com/ctron/hot/pkg/utils"
"pack.ag/amqp"
)

func consume(messageType string, uri string, tenant string) error {

fmt.Printf("Consuming %s from %s ...", messageType, uri)
fmt.Println()

opts := make([]amqp.ConnOption, 0)
if insecure {
opts = append(opts, amqp.ConnTLSConfig(createTlsConfig()))
}

client, err := amqp.Dial(uri, opts...)
if err != nil {
return err
}

defer func() {
if err := client.Close(); err != nil {
log.Fatal("Failed to close client:", err)
}
}()

var ctx = context.Background()

session, err := client.NewSession()
if err != nil {
return err
}

defer func() {
if err := session.Close(ctx); err != nil {
log.Fatal("Failed to close session:", err)
}
}()

receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(fmt.Sprintf("%s/%s", messageType, tenant)),
amqp.LinkCredit(10),
)
if err != nil {
return err
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
if err := receiver.Close(ctx); err != nil {
log.Fatal("Failed to close receiver: ", err)
}
cancel()
}()

fmt.Printf("Consumer running, press Ctrl+C to stop...")
fmt.Println()

for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
return err
}

// Accept message
if err := msg.Accept(); err != nil {
return nil
}

utils.PrintMessage(msg)
}
}
109 changes: 30 additions & 79 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,109 +14,60 @@
package main

import (
"context"
"crypto/tls"
"fmt"
"log"
"time"

"github.com/ctron/hot/pkg/utils"

"github.com/spf13/cobra"
"pack.ag/amqp"
)

var insecure bool
var contentType string = "text/plain"

func consume(messageType string, uri string, tenant string) error {

fmt.Printf("Consuming %s from %s ...", messageType, uri)
fmt.Println()

opts := make([]amqp.ConnOption, 0)
if insecure {
var tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
opts = append(opts, amqp.ConnTLSConfig(tlsConfig))
}

client, err := amqp.Dial(uri, opts...)
if err != nil {
return err
}

defer func() {
if err := client.Close(); err != nil {
log.Fatal("Failed to close client:", err)
}
}()

var ctx = context.Background()

session, err := client.NewSession()
if err != nil {
return err
}

defer func() {
if err := session.Close(ctx); err != nil {
log.Fatal("Failed to close session:", err)
}
}()

receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(fmt.Sprintf("%s/%s", messageType, tenant)),
amqp.LinkCredit(10),
)
if err != nil {
return err
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
if err := receiver.Close(ctx); err != nil {
log.Fatal("Failed to close receiver: ", err)
}
cancel()
}()

fmt.Printf("Consumer running, press Ctrl+C to stop...")
fmt.Println()

for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
return err
}

// Accept message
if err := msg.Accept(); err != nil {
return nil
}

utils.PrintMessage(msg)
func createTlsConfig() *tls.Config {
return &tls.Config{
InsecureSkipVerify: insecure,
}
}

func main() {

var cmdConsume = &cobra.Command{
cmdConsume := &cobra.Command{
Use: "consume [telemetry|event] [message endpoint uri] [tenant]",
Short: "Consume and print messages",
Long: `Consume messages from the endpoint and print it on the console.`,
Args: cobra.MinimumNArgs(3),
Args: cobra.ExactArgs(3),
Run: func(cmd *cobra.Command, args []string) {
if err := consume(args[0], args[1], args[2]); err != nil {
log.Fatal("Failed to consume messages: ", err)
log.Fatal("Failed to consume messages:", err)
}
},
}

cmdConsume.Flags().BoolVar(&insecure, "insecure", false, "Skip TLS validation")
cmdPublish := &cobra.Command{
Use: "publish",
Short: "Publish messages",
}

cmdPublishHttp := &cobra.Command{
Use: "http [telemetry|event] [http endpoint uri] [tenant] [deviceId] [authId] [password] [payload]",
Short: "Publish via HTTP",
Args: cobra.ExactArgs(7),
Run: func(cmd *cobra.Command, args []string) {
if err := publishHttp(args[0], args[1], args[2], args[3], args[4], args[5], contentType, args[6]); err != nil {
log.Fatal("Failed to publish via HTTP:", err)
}
},
}

cmdPublish.AddCommand(cmdPublishHttp)
cmdPublish.Flags().StringVarP(&contentType, "content-type", "t", "text/plain", "content type")

// root command

var rootCmd = &cobra.Command{Use: "hot"}
rootCmd.AddCommand(cmdConsume)
rootCmd.AddCommand(cmdConsume, cmdPublish)

rootCmd.Flags().BoolVar(&insecure, "insecure", false, "Skip TLS validation")

if err := rootCmd.Execute(); err != nil {
panic(err)
Expand Down
69 changes: 69 additions & 0 deletions cmd/publish_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*******************************************************************************
* Copyright (c) 2019 Red Hat Inc
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package main

import (
"bytes"
"fmt"
"net/http"
neturl "net/url"
)

func publishHttp(messageType string, uri string, tenant string, deviceId string, authId string, password string, contentType string, payload string) error {

url, err := neturl.Parse(uri)
if err != nil {
return err
}

url.Path = url.Path + neturl.PathEscape(messageType) + "/" + neturl.PathEscape(tenant) + "/" + deviceId
fmt.Println("URL:", url)

buf := bytes.NewBufferString(payload)

tr := &http.Transport{
TLSClientConfig: createTlsConfig(),
}

client := &http.Client{Transport: tr}
request, err := http.NewRequest("PUT", url.String(), buf)
if err != nil {
return err
}

request.SetBasicAuth(authId+"@"+tenant, password)

request.Header.Set("Content-Type", contentType)

response, err := client.Do(request)
if err != nil {
return err
}

fmt.Printf("Publish result: %s", response.Status)
fmt.Println()

body := new(bytes.Buffer)
if _, err := body.ReadFrom(response.Body); err != nil {
return err
}

fmt.Println(body.String())

if err := response.Body.Close(); err != nil {
fmt.Printf("Failed to close response: %v", err)
}

return nil
}

0 comments on commit 2b28840

Please sign in to comment.