Skip to content

Commit

Permalink
rabbitmq implements of middleware interface
Browse files Browse the repository at this point in the history
Co-authored-by: zhangyongxi <[email protected]>
Co-authored-by: baoyinghai <[email protected]>
Co-authored-by: wuyingjun <[email protected]>
Signed-off-by: zhouhaoA1 <[email protected]>
  • Loading branch information
4 people committed Dec 27, 2023
1 parent 189fc9d commit 9f38fac
Show file tree
Hide file tree
Showing 31 changed files with 9,099 additions and 19 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/prometheus/exporter-toolkit v0.10.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/streadway/amqp v1.1.0
github.com/stretchr/testify v1.8.3
go.uber.org/atomic v1.10.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
Expand Down
66 changes: 66 additions & 0 deletions pkg/watcher/codec/event_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package codec

import (
"bytes"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)

const EVENTTYPE = "EventTypeForLabel"

func EventEncode(eventType watch.EventType, obj runtime.Object, codec runtime.Codec) ([]byte, error) {
accessor := meta.NewAccessor()
labels, err := accessor.Labels(obj)
if err != nil {
return nil, err
}
if labels == nil {
labels = make(map[string]string)
}
labels[EVENTTYPE] = string(eventType)
err = accessor.SetLabels(obj, labels)
if err != nil {
return nil, err
}

var buffer bytes.Buffer
if err := codec.Encode(obj, &buffer); err != nil {
return nil, err
}

return buffer.Bytes(), nil
}

func EventDecode(value []byte, codec runtime.Codec, newFunc func() runtime.Object) (*watch.Event, error) {
into := newFunc()
obj, _, err := codec.Decode(value, nil, into)
if err != nil {
return nil, err
}

accessor := meta.NewAccessor()
labels, err := accessor.Labels(obj)
if err != nil {
return nil, err
}
var eventType string
if labels != nil {
eventType = labels[EVENTTYPE]
delete(labels, EVENTTYPE)
if eventType == "" {
return nil, fmt.Errorf("event can not find eventtype")
}
}
err = accessor.SetLabels(obj, labels)
if err != nil {
return nil, err
}

return &watch.Event{
Type: watch.EventType(eventType),
Object: obj,
}, nil
}
315 changes: 315 additions & 0 deletions pkg/watcher/middleware/rabbitmq/rabbit_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
package rabbitmq

import (
"context"
"fmt"
"math/rand"
"strings"
"time"

"github.com/streadway/amqp"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"

"github.com/clusterpedia-io/clusterpedia/pkg/watcher/codec"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
)

const (
RoleConsumer = "consumer"
RoleProducer = "producer"
)

type QueueExchange struct {
QueueName string
RoutingKey string
ExchangeName string
ExchangeType string
}

type RabbitClient struct {
QueueExchange
conn *RabbitConnection
channel *amqp.Channel
codec runtime.Codec
started bool
cliStopCh chan bool
globalStopCh <-chan struct{}
expiresPerSend int
notifyConfirm chan amqp.Confirmation // msg send confirmed chan
notifyClose chan *amqp.Error // channel closed chan
role string
newFunc func() runtime.Object // event decode
queueExpires int64
}

func NewProducer(queueEx QueueExchange, conn *RabbitConnection, codec runtime.Codec, expiresPerSend int, gStopCh <-chan struct{}) *RabbitClient {
return &RabbitClient{
QueueExchange: queueEx,
conn: conn,
codec: codec,
cliStopCh: make(chan bool, 1),
globalStopCh: gStopCh,
expiresPerSend: expiresPerSend,
role: RoleProducer,
}
}

func NewConsumer(queueEx QueueExchange, conn *RabbitConnection, codec runtime.Codec, gStopCh <-chan struct{}, newFunc func() runtime.Object, queueExpires int64) *RabbitClient {
return &RabbitClient{
QueueExchange: queueEx,
conn: conn,
codec: codec,
cliStopCh: make(chan bool, 1),
globalStopCh: gStopCh,
role: RoleConsumer,
newFunc: newFunc,
queueExpires: queueExpires,
}
}

func NewQueue(quePrefix string, conn *RabbitConnection, queueExpires int64) string {
for {
ch := CreateChannel(conn)

randStr := fmt.Sprintf("%d%d%d%d", rand.Intn(10), rand.Intn(10), rand.Intn(10), rand.Intn(10))
timeStr := time.Now().Format("2006-01-02 15-04-05")
timeStr = strings.ReplaceAll(timeStr, "-", "")
timeStr = strings.ReplaceAll(timeStr, " ", "")
queue := fmt.Sprintf("%s_%s_%s", quePrefix, timeStr, randStr)
args := make(amqp.Table, 1)
args["x-expires"] = queueExpires
_, err := ch.QueueDeclarePassive(queue, true, false, false, false, args)
if err == nil { // queue already exist
_ = ch.Close()
continue
} else { // declare the queue
_ = ch.Close()
ch := CreateChannel(conn)
_, err = ch.QueueDeclare(queue, true, false, false, false, args)
if err != nil {
klog.Errorf("rabbitmq queueDeclare failed: %v", err)
_ = ch.Close()
continue
} else {
_ = ch.Close()
return queue
}
}
}
}

// CreateChannel open a channel until success
func CreateChannel(conn *RabbitConnection) *amqp.Channel {
for {
conn.tryConnect()
ch, err := conn.NewChannel()
if err != nil {
klog.Error("open channel failed. ", err, ". retry after 1 second")
time.Sleep(1 * time.Second)
continue
} else {
return ch
}
}
}

func (r *RabbitClient) Destroy() (err error) {
r.cliStopCh <- true
return nil
}

func (r *RabbitClient) DestroyGvr() {
klog.Info("consume stopped for client stop cmd. delete queue: ", r.QueueName)
_, err := r.channel.QueueDelete(r.QueueName, false, false, true)
if err != nil {
klog.Errorf("delete %s queue fail. %v", r.QueueName, err.Error())
} else {
klog.Info("deleted queue ", r.QueueName)
}
_ = r.closeChannel()
}

func (r *RabbitClient) initChannel() {
for {
r.channel = CreateChannel(r.conn)
err := r.initQuExchange()
if err != nil {
klog.Error("init channel failed. ", err.Error())
_ = r.closeChannel()
continue
} else {
return
}
}
}

func (r *RabbitClient) initQuExchange() error {
args := make(amqp.Table, 1)
args["x-expires"] = r.queueExpires
err := r.channel.ExchangeDeclare(r.ExchangeName, r.ExchangeType, true, false, false, false, args)
if err != nil {
return fmt.Errorf("rabbitmq exchangeDeclare failed: %v", err)
}

r.notifyClose = r.channel.NotifyClose(make(chan *amqp.Error, 1)) // listen channel close event

if r.role == RoleProducer {
err = r.channel.Confirm(false) // set msg confirm mode
if err != nil {
return fmt.Errorf("rabbitmq confirm error. %v", err)
}
r.notifyConfirm = r.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
} else {
_, err = r.channel.QueueDeclare(r.QueueName, true, false, false, false, args)
if err != nil {
return fmt.Errorf("rabbitmq queueDeclare failed: %v", err)
}

err = r.channel.QueueBind(r.QueueName, r.RoutingKey, r.ExchangeName, false, nil)
if err != nil {
return fmt.Errorf("rabbitmq queueBind failed: %v", err)
}

err = r.channel.Qos(1, 0, false)
if err != nil {
return fmt.Errorf("rabbitmq Qos failed: %v", err)
}
}
return nil
}

func (r *RabbitClient) closeChannel() (err error) {
r.channel.Close()
if err != nil {
return fmt.Errorf("close rabbitmq channel failed: %v", err)
}
return
}

// sendEventSynchro send message until success
func (r *RabbitClient) sendEventSynchro(event *watch.Event, expiresPerTry int) error {
msgBytes, err := codec.EventEncode(event.Type, event.Object, r.codec)
if err != nil {
return fmt.Errorf("event encode failed. error: %v", err.Error())
}
ticker := time.NewTicker(time.Duration(expiresPerTry) * time.Second)
defer ticker.Stop()
for {
_ = r.channel.Publish(
r.ExchangeName,
r.RoutingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: msgBytes,
})

select {
case c := <-r.notifyConfirm:
if !c.Ack {
klog.Errorf("rabbit confirm ack false. retry init channel and send. exchange: %s", r.ExchangeName)
} else {
return nil
}
case <-ticker.C:
klog.Errorf("send event timeout. retry init channel and send. exchange: %s", r.ExchangeName)
}

_ = r.closeChannel()
r.initChannel()
}
}

func (r *RabbitClient) Produce(eventChan chan *watchcomponents.EventWithCluster, publishEvent func(context.Context, *watchcomponents.EventWithCluster),
ctx context.Context, genCrv2Event func(event *watch.Event)) {
for {
r.initChannel()

LOOP:
for {
select {
case e := <-r.notifyClose:
klog.Warningf("channel notifyClose: %v. exchange: %s. retry channel connecting", e.Error(), r.ExchangeName)
break LOOP
case event := <-eventChan:
genCrv2Event(event.Event)
err := r.sendEventSynchro(event.Event, r.expiresPerSend)
if err != nil {
klog.Errorf("send event error %v. exchange: %s. this should not happen normally", err.Error(), r.ExchangeName)
} else {
publishEvent(ctx, event)
}
case <-r.cliStopCh:
klog.Info("produce stopped for client stop cmd. exchange: ", r.ExchangeName)
_ = r.closeChannel()
close(r.cliStopCh)
return
case <-r.globalStopCh:
klog.Info("produce stopped for global publisher stopped. exchange: ", r.ExchangeName)
_ = r.closeChannel()
return
}
}
}
}

func (r *RabbitClient) Consume(enqueueFunc func(event *watch.Event), clearfunc func()) {
for {
r.initChannel()
msgList, err := r.channel.Consume(r.QueueName, "", false, false, false, false, nil)
if err != nil {
klog.Errorf("consume err: ", err.Error())
_ = r.closeChannel()
continue
}

LOOP:
for {
select {
case <-r.cliStopCh:
klog.Info("consume stopped for client stop cmd. delete queue: ", r.QueueName)
_, err = r.channel.QueueDelete(r.QueueName, false, false, true)
if err != nil {
klog.Errorf("delete %s queue fail. %v", r.QueueName, err.Error())
} else {
klog.Info("deleted queue ", r.QueueName)
}
_ = r.closeChannel()
close(r.cliStopCh)
return
case msg := <-msgList:
//处理数据
event, _ := codec.EventDecode(msg.Body, r.codec, r.newFunc)
klog.V(7).Infof("Event in to cache %v : %v \n", event.Type, event.Object.GetObjectKind().GroupVersionKind())
err = msg.Ack(true)
if err != nil {
klog.Errorf("msg ack error: %v. event: %v, queue: %s. retry init channel and consume...", err.Error(), event.Type, r.QueueName)
break LOOP
}
enqueueFunc(event)
case e := <-r.notifyClose:
klog.Warningf("channel notifyClose: %v. queue: %s. retry channel connecting", e.Error(), r.QueueName)
break LOOP
case <-r.globalStopCh:
klog.Info("consume stopped for global publisher stopped. delete queue: ", r.QueueName)
_, err = r.channel.QueueDelete(r.QueueName, false, false, true)
if err != nil {
klog.Errorf("delete %s queue fail. %v", r.QueueName, err.Error())
} else {
klog.Info("deleted queue ", r.QueueName)
}
_ = r.closeChannel()
return
}
}
}
}

func GvrString(gvr schema.GroupVersionResource) string {
group := strings.ReplaceAll(gvr.Group, ".", "_")
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
}
Loading

0 comments on commit 9f38fac

Please sign in to comment.