Skip to content

Commit

Permalink
消息队列
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxi2015 committed Jul 11, 2022
1 parent 03a25a9 commit ddb140e
Show file tree
Hide file tree
Showing 5 changed files with 421 additions and 0 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,30 @@
# go-rabbitmq
rabbitmq的消费者和生产者

# 在项目中安装使用
```shell
go get -u github.com/MQEnergy/go-rabbitmq
```

# 测试
## 1、安装依赖
```shell
go mod tidy
```

## 运行example
配置 amqp
消费者
```shell
go run examples/consumer.go
```
生产者
```shell
go run examples/producer.go
```
查看效果

## 2、单元测试
```shell
go test
```
51 changes: 51 additions & 0 deletions examples/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"fmt"
go_rabbitmq "github.com/MQEnergy/go-rabbitmq"
"sync"
"time"
)

func main() {
config := &go_rabbitmq.Config{
User: "root",
Password: "",
Host: "",
Port: "5672",
Vhost: "",
}
mq := go_rabbitmq.New(config, "test", "", "", 0, 1)
time.Sleep(time.Second * 1)
amqphandler(mq, 3)
}

// amqphandler 消息队列处理
func amqphandler(mq *go_rabbitmq.RabbitMQ, consumerNum int) error {
var wg sync.WaitGroup
cherrors := make(chan error)
wg.Add(consumerNum)
for i := 0; i < consumerNum; i++ {
fmt.Printf("正在开启消费者:第 %d 个\n", i+1)
go func() {
defer wg.Done()
deliveries, err := mq.Consume()
if err != nil {
cherrors <- err
}
for d := range deliveries {
// 消费者逻辑 to do
fmt.Printf("got %dbyte delivery: [%v] %s %q\n", len(d.Body), d.DeliveryTag, d.Exchange, d.Body)
d.Ack(false)
}
}()
}
select {
case err := <-cherrors:
close(cherrors)
fmt.Printf("Consumer failed: %s\n", err)
return err
}
wg.Wait()
return nil
}
26 changes: 26 additions & 0 deletions examples/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"fmt"
go_rabbitmq "github.com/MQEnergy/go-rabbitmq"
"time"
)

func main() {
config := &go_rabbitmq.Config{
User: "root",
Password: "",
Host: "",
Port: "5672",
Vhost: "",
}
mq := go_rabbitmq.New(config, "test", "", "", 0, 1)
// 需要等待一秒钟
time.Sleep(time.Second * 1)

data := []byte("{\"hello\":\"world " + time.Now().Format("2006-01-02 15:04:05") + "\"}")
if err := mq.Push(data); err != nil {
panic(err)
}
fmt.Println("Push succeeded!", string(data))
}
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/MQEnergy/go-rabbitmq

go 1.18

require github.com/streadway/amqp v1.0.0
Loading

0 comments on commit ddb140e

Please sign in to comment.