-
Notifications
You must be signed in to change notification settings - Fork 0
/
rabbitmq-consumer.go
142 lines (125 loc) · 3.29 KB
/
rabbitmq-consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// consume the queue called 'encryptonator' and run a go routine to start
// the encryption process
package main
import (
"fmt"
"log"
"strings"
"github.com/streadway/amqp"
)
type consumer struct {
uri string
exchange string
exchangeType string
queueName string
bindingKey string
consumerTag string
}
// NewConsumer start consuming queue
func (c *consumer) Consume(quitchan chan string) error {
log.Printf("dialing %q", c.uri)
conn, err := amqp.Dial(c.uri)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
defer conn.Close()
log.Printf("got Connection, getting Channel")
channel, err := conn.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
defer channel.Close()
log.Printf("got Channel, declaring Exchange (%q)", c.exchange)
err = channel.ExchangeDeclare(
c.exchange, // name of the exchange
c.exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}
log.Printf("declared Exchange, declaring Queue %q", c.queueName)
queue, err := channel.QueueDeclare(
c.queueName, // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Queue Declare: %s", err)
}
log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
queue.Name, queue.Messages, queue.Consumers, c.bindingKey)
err = channel.QueueBind(
queue.Name, // name of the queue
c.bindingKey, // bindingKey
c.exchange, // sourceExchange
false, // noWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Queue Bind: %s", err)
}
log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.consumerTag)
deliveries, err := channel.Consume(
queue.Name, // name
c.consumerTag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Queue Consume: %s", err)
}
for {
// check termination conditions first after every message
select {
// handle ctrl-c
case <-quitchan:
log.Printf("Consumer quit")
return nil
// server died
case <-conn.NotifyClose(nil):
log.Printf("NotifyClose")
return nil
// termination condition not met
// wait for incoming message or termination condition
default:
select {
// handle ctrl-c
case <-quitchan:
log.Printf("Consumer quit")
return nil
// server died
case <-conn.NotifyClose(nil):
log.Printf("NotifyClose")
return nil
case d := <-deliveries:
if err := handle(string(d.Body)); err != nil {
log.Printf("handle failed: %s", err)
}
if err := d.Ack(false); err != nil {
log.Printf("d.Ack failed: %s", err)
}
}
}
}
return nil
}
func handle(body string) error {
p := strings.Split(body, ",")
if len(p) != 3 {
return fmt.Errorf("invalid message: %s", body)
}
platform, rsyncPID, path := p[0], p[1], p[2]
log.Printf("got %v and %v and %v", platform, rsyncPID, path)
return FileMover(platform, rsyncPID, path)
}