Skip to content

Commit da2c177

Browse files
committed
add rabbitmq library
1 parent 83b588e commit da2c177

File tree

12 files changed

+2067
-0
lines changed

12 files changed

+2067
-0
lines changed

pkg/rabbitmq/README.md

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
## rabbitmq
2+
3+
rabbitmq library wrapped in [github.com/rabbitmq/amqp091-go](github.com/rabbitmq/amqp091-go), supports automatic reconnection and customized setting of queue parameters.
4+
5+
### Example of use
6+
7+
#### Consumer code
8+
9+
This is a consumer code example common to the four types direct, topic, fanout, and headers.
10+
11+
```go
12+
package main
13+
14+
import (
15+
"context"
16+
"strings"
17+
18+
"github.com/zhufuyi/sponge/pkg/logger"
19+
"github.com/zhufuyi/sponge/pkg/rabbitmq"
20+
"github.com/zhufuyi/sponge/pkg/rabbitmq/consumer"
21+
)
22+
23+
var handler = func(ctx context.Context, data []byte, tag ...string) error {
24+
tagID := strings.Join(tag, ",")
25+
logger.Infof("tagID=%s, receive message: %s", tagID, string(data))
26+
return nil
27+
}
28+
29+
func main() {
30+
url := rabbitmq.DefaultURL
31+
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
32+
if err != nil {
33+
logger.Error("NewConnection err",logger.Err(err))
34+
return
35+
}
36+
defer c.Close()
37+
38+
queue, err := consumer.NewQueue(context.Background(), "yourQueueName", c, consumer.WithConsumeAutoAck(false)) // here you can set the consume parameter
39+
if err != nil {
40+
logger.Error("NewQueue err",logger.Err(err))
41+
return
42+
}
43+
44+
queue.Consume(handler)
45+
46+
exit := make(chan struct{})
47+
<-exit
48+
}
49+
```
50+
51+
<br>
52+
53+
#### Direct Type Code
54+
55+
```go
56+
package main
57+
58+
import (
59+
"context"
60+
61+
"github.com/zhufuyi/sponge/pkg/logger"
62+
"github.com/zhufuyi/sponge/pkg/rabbitmq"
63+
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
64+
)
65+
66+
func main() {
67+
url := rabbitmq.DefaultURL
68+
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
69+
if err != nil {
70+
logger.Error("NewConnection err",logger.Err(err))
71+
return
72+
}
73+
defer c.Close()
74+
75+
exchangeName := "direct-exchange-demo"
76+
queueName := "direct-queue-1"
77+
routeKey := "direct-key-1"
78+
exchange := producer.NewDirectExchange(exchangeName, routeKey)
79+
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
80+
if err != nil {
81+
logger.Error("NewQueue err",logger.Err(err))
82+
return
83+
}
84+
defer q.Close()
85+
86+
err = q.Publish(context.Background(), []byte(routeKey+" say hello"))
87+
if err != nil {
88+
logger.Error("Publish err",logger.Err(err))
89+
return
90+
}
91+
}
92+
```
93+
94+
<br>
95+
96+
#### Topic Type Code
97+
98+
```go
99+
package main
100+
101+
import (
102+
"context"
103+
104+
"github.com/zhufuyi/sponge/pkg/logger"
105+
"github.com/zhufuyi/sponge/pkg/rabbitmq"
106+
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
107+
)
108+
109+
func main() {
110+
url := rabbitmq.DefaultURL
111+
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
112+
if err != nil {
113+
logger.Error("NewConnection err",logger.Err(err))
114+
return
115+
}
116+
defer c.Close()
117+
118+
exchangeName := "topic-exchange-demo"
119+
queueName := "topic-queue-1"
120+
routingKey := "key1.key2.*"
121+
exchange := producer.NewTopicExchange(exchangeName, routingKey)
122+
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
123+
if err != nil {
124+
logger.Error("NewQueue err",logger.Err(err))
125+
return
126+
}
127+
defer q.Close()
128+
129+
key:="key1.key2.key3"
130+
err = q.PublishTopic(context.Background(), key, []byte(key+" say hello "))
131+
if err != nil {
132+
logger.Error("PublishTopic err",logger.Err(err))
133+
return
134+
}
135+
}
136+
```
137+
138+
<br>
139+
140+
#### Fanout Type Code
141+
142+
```go
143+
package main
144+
145+
import (
146+
"context"
147+
148+
"github.com/zhufuyi/sponge/pkg/logger"
149+
"github.com/zhufuyi/sponge/pkg/rabbitmq"
150+
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
151+
)
152+
153+
func main() {
154+
url := rabbitmq.DefaultURL
155+
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
156+
if err != nil {
157+
logger.Error("NewConnection err",logger.Err(err))
158+
return
159+
}
160+
defer c.Close()
161+
162+
exchangeName := "fanout-exchange-demo"
163+
queueName := "fanout-queue-1"
164+
exchange := producer.NewFanOutExchange(exchangeName)
165+
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
166+
if err != nil {
167+
logger.Error("NewQueue err",logger.Err(err))
168+
return
169+
}
170+
defer q.Close()
171+
172+
err = q.Publish(context.Background(), []byte("say hello"))
173+
if err != nil {
174+
logger.Error("Publish err",logger.Err(err))
175+
return
176+
}
177+
}
178+
```
179+
180+
<br>
181+
182+
#### Headers Type Code
183+
184+
```go
185+
package main
186+
187+
import (
188+
"context"
189+
190+
"github.com/zhufuyi/sponge/pkg/logger"
191+
"github.com/zhufuyi/sponge/pkg/rabbitmq"
192+
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
193+
)
194+
195+
func main() {
196+
url := rabbitmq.DefaultURL
197+
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
198+
if err != nil {
199+
logger.Error("NewConnection err",logger.Err(err))
200+
return
201+
}
202+
defer c.Close()
203+
204+
205+
exchangeName := "headers-exchange-demo"
206+
// the message is only received if there is an exact match for headers
207+
queueName := "headers-queue-1"
208+
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
209+
exchange := producer.NewHeaderExchange(exchangeName, producer.HeadersTypeAll, kv1)
210+
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
211+
if err != nil {
212+
logger.Error("NewQueue err",logger.Err(err))
213+
return
214+
}
215+
defer q.Close()
216+
headersKey1 := kv1 // exact match, consumer queue can receive messages
217+
err = q.PublishHeaders(context.Background(), headersKey1, []byte("say hello"))
218+
if err != nil {
219+
logger.Error("PublishHeaders err",logger.Err(err))
220+
return
221+
}
222+
}
223+
```
224+
225+
<br>
226+
227+
#### Publish Error Handling
228+
229+
If the error is caused by the network, you can check if the reconnection is successful and resend it again.
230+
231+
```go
232+
err := q.Publish(context.Background(), []byte(routeKey+" say hello"))
233+
if err != nil {
234+
if errors.Is(err, producer.ErrClosed) && c.CheckConnected() { // check connection
235+
q, err = producer.NewQueue(queueName, c.Conn, exchange)
236+
if err != nil {
237+
logger.Info("queue reconnect failed", logger.Err(err))
238+
}else{
239+
logger.Info("queue reconnect success")
240+
}
241+
}
242+
}
243+
```
244+

0 commit comments

Comments
 (0)