From 8da12407dcf8858726c6f19d584eaa22ed3b2713 Mon Sep 17 00:00:00 2001 From: David McWhorter Date: Fri, 17 Feb 2023 12:33:47 -0500 Subject: [PATCH 1/5] wagslane client and start of unit tests --- executor/api/rabbitmq/connection.go | 137 --------- executor/api/rabbitmq/connection_test.go | 153 ---------- executor/api/rabbitmq/consumer.go | 138 --------- executor/api/rabbitmq/consumer_test.go | 343 ++++++++++++----------- executor/api/rabbitmq/publisher.go | 68 ----- executor/api/rabbitmq/publisher_test.go | 313 ++++++++++----------- executor/api/rabbitmq/rabbitmq_client.go | 153 ++++++++++ executor/api/rabbitmq/rabbitmq_test.go | 99 +++---- executor/api/rabbitmq/server.go | 118 +++++--- executor/api/rabbitmq/server_test.go | 319 +++++++++++---------- executor/api/rabbitmq/types.go | 64 ++--- executor/api/rabbitmq/utils.go | 5 +- executor/api/rabbitmq/utils_test.go | 23 +- executor/go.mod | 6 +- executor/go.sum | 5 + 15 files changed, 815 insertions(+), 1129 deletions(-) delete mode 100644 executor/api/rabbitmq/connection.go delete mode 100644 executor/api/rabbitmq/connection_test.go delete mode 100644 executor/api/rabbitmq/consumer.go delete mode 100644 executor/api/rabbitmq/publisher.go create mode 100644 executor/api/rabbitmq/rabbitmq_client.go diff --git a/executor/api/rabbitmq/connection.go b/executor/api/rabbitmq/connection.go deleted file mode 100644 index 8697ebc236..0000000000 --- a/executor/api/rabbitmq/connection.go +++ /dev/null @@ -1,137 +0,0 @@ -package rabbitmq - -import ( - "fmt" - "time" - - "github.com/go-logr/logr" - amqp "github.com/rabbitmq/amqp091-go" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher.go - */ - -const ( - connectionRetryLimit = 5 - - queueDurable = true - queueAutoDelete = false - queueExclusive = false - queueNoWait = false - - amqpExchange = "" -) - -var ( - connectionRetryDelay = 5 * time.Second - queueArgs = amqp.Table{} -) - -type connection struct { - log logr.Logger - uri string - - conn Connection - channel Channel - err chan error -} - -// NewConnection creates a new AMQP connection that targets a specific broker uri and queue. -func NewConnection(uri string, logger logr.Logger) (*connection, error) { - conn := &connection{ - uri: uri, - err: make(chan error), - log: logger.WithName("Connection"), - } - - if err := conn.connect(); err != nil { - conn.log.Error(err, "error connecting", "uri", uri) - return nil, fmt.Errorf("error '%w' connecting to '%v'", err, uri) - } - return conn, nil -} - -func (c *connection) DeclareQueue(queueName string) (amqp.Queue, error) { - return c.channel.QueueDeclare( - queueName, - queueDurable, - queueAutoDelete, - queueExclusive, - queueNoWait, - queueArgs, - ) -} - -// Close will close the underlying AMQP connection if one has been set, and this operation will cascade down to any -// channels created under this connection. -func (c *connection) Close() error { - if c.conn != nil { - err := c.conn.Close() - if err != nil { - return fmt.Errorf("error '%w' closing connection '%v'", err, c) - } - } - return nil -} - -// implements retry logic with delays for establishing AMQP connections. -func (c *connection) connect() error { - - ticker := time.NewTicker(connectionRetryDelay) - defer ticker.Stop() - - for counter := 0; counter < connectionRetryLimit; <-ticker.C { - var err error - - c.conn, err = defaultDialerAdapter(c.uri) - if err != nil { - c.log.Error(err, "cannot dial rabbitmq", "uri", c.uri, "attempt", counter+1) - - counter++ - continue - } - - go func() { - closed := make(chan *amqp.Error, 1) - c.conn.NotifyClose(closed) - - reason, ok := <-closed - if ok { - c.log.Error(reason, "rabbitmq connection closed, registering err signal") - c.err <- reason - } - }() - - c.channel, err = c.conn.Channel() - if err != nil { - c.log.Error(err, "error creating rabbitmq channel", "uri", c.uri) - return fmt.Errorf("error '%w' creating rabbitmq channel to %q", err, c.uri) - } - - go func() { - closed := make(chan *amqp.Error, 1) - c.channel.NotifyClose(closed) - - reason, ok := <-closed - if ok { - c.log.Error(reason, "rabbitmq channel closed, registering err signal") - c.err <- reason - } - }() - - err = c.channel.Qos( - 1, - 0, - true, - ) - if err != nil { - c.log.Error(err, "error setting rabbitmq Qos") - return fmt.Errorf("error '%w' error setting rabbitmq Qos", err) - } - - return nil - } - - return fmt.Errorf("rabbitmq connection retry limit reached: %d", connectionRetryLimit) -} diff --git a/executor/api/rabbitmq/connection_test.go b/executor/api/rabbitmq/connection_test.go deleted file mode 100644 index 76fac8eb2a..0000000000 --- a/executor/api/rabbitmq/connection_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package rabbitmq - -import ( - "errors" - "github.com/go-logr/logr/testr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "testing" - "time" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go - */ - -var ( - uri = "amqp://test-rabbitmq:5672/" - queueName = "test-queue" - consumerTag = "tag" -) - -type connectFixture struct { - adapter *mockDialerAdapter - connection *mockConnection - channel *mockChannel -} - -func setupConnect(fn func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel)) ( - *connectFixture, func(), -) { - mockChan := &mockChannel{} - mockConn := &mockConnection{} - mockAdapter := &mockDialerAdapter{} - - fn(mockAdapter, mockConn, mockChan) - - origAdapter := defaultDialerAdapter - origRetryDelay := connectionRetryDelay - - defaultDialerAdapter = mockAdapter.Dial - connectionRetryDelay = 1 * time.Nanosecond - - fixture := &connectFixture{ - adapter: mockAdapter, - connection: mockConn, - channel: mockChan, - } - reset := func() { - defaultDialerAdapter = origAdapter - connectionRetryDelay = origRetryDelay - } - - return fixture, reset -} - -func TestNewConnection(t *testing.T) { - log := testr.New(t) - - t.Run("connect", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - conn.On("Channel").Return(channel, nil) - adapter.On("Dial", uri).Return(conn, nil) - }) - defer reset() - - actual, err := NewConnection(uri, log) - require.NoError(t, err) - assert.NotNil(t, actual.conn) - assert.NotNil(t, actual.channel) - assert.Equal(t, uri, actual.uri) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("reconnect", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - conn.On("Channel").Return(channel, nil) - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")).Once() - adapter.On("Dial", uri).Return(conn, nil).Once() - }) - defer reset() - - actual, err := NewConnection(uri, log) - require.NoError(t, err) - assert.NotNil(t, actual.conn) - assert.NotNil(t, actual.channel) - assert.Equal(t, uri, actual.uri) - - f.adapter.AssertExpectations(t) - f.adapter.AssertNumberOfCalls(t, "Dial", 2) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("channel_failure", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - conn.On("Channel").Return(nil, errors.New("test channel failure")) - adapter.On("Dial", uri).Return(conn, nil) - }) - defer reset() - - _, err := NewConnection(uri, log) - assert.Error(t, err) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - }) - - t.Run("retry_limit_failure", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")) - }) - defer reset() - - _, err := NewConnection(uri, log) - assert.Error(t, err) - - f.adapter.AssertExpectations(t) - f.adapter.AssertNumberOfCalls(t, "Dial", connectionRetryLimit) - }) -} - -func TestConnection_Close(t *testing.T) { - t.Run("success", func(t *testing.T) { - mockConn := &mockConnection{} - mockConn.On("Close").Return(nil) - con := &connection{ - conn: mockConn, - } - - assert.NoError(t, con.Close()) - mockConn.AssertExpectations(t) - }) - - t.Run("failure", func(t *testing.T) { - mockConn := &mockConnection{} - mockConn.On("Close").Return(errors.New("test failed to close connection")) - con := &connection{ - conn: mockConn, - } - - assert.ErrorContains(t, con.Close(), "test failed to close connection") - mockConn.AssertExpectations(t) - }) - - t.Run("no_connection", func(t *testing.T) { - assert.NoError(t, (&connection{}).Close()) - }) -} diff --git a/executor/api/rabbitmq/consumer.go b/executor/api/rabbitmq/consumer.go deleted file mode 100644 index 735b90d357..0000000000 --- a/executor/api/rabbitmq/consumer.go +++ /dev/null @@ -1,138 +0,0 @@ -package rabbitmq - -import ( - "fmt" - "github.com/go-logr/logr" - amqp "github.com/rabbitmq/amqp091-go" - "os" - "os/signal" - "syscall" -) - -/* - * based on patterns from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher.go - */ - -type consumer struct { - connection - queueName string - consumerTag string -} - -func NewConsumer(uri, queueName, consumerTag string, logger logr.Logger) (*consumer, error) { - c, err := NewConnection(uri, logger.WithName("Consumer")) - if err != nil { - c.log.Error(err, "error creating connection for consumer", "uri", c.uri) - return nil, fmt.Errorf("error '%w' creating connection to '%v' for consumer", err, uri) - } - return &consumer{ - *c, - queueName, - consumerTag, - }, nil -} - -// In the event that the underlying connection was closed after connection creation, this function will attempt to -// reconnection to the AMQP broker before performing these operations. -// this is a blocking function while the consumer is running, run it in a goroutine if needed -func (c *consumer) Consume( - payloadHandler func(*SeldonPayloadWithHeaders) error, - errorHandler func(args ConsumerError) error, -) error { - select { - case <-c.err: - c.log.Info("attempting to reconnect to rabbitmq", "uri", c.uri) - - if err := c.connect(); err != nil { - c.log.Error(err, "error reconnecting to rabbitmq") - return fmt.Errorf("error '%w' reconnecting to rabbitmq", err) - } - default: - } - - // default exchange with queue name as name is the same as a direct exchange routed to the queue - deliveries, err := c.channel.Consume( - c.queueName, // name - c.consumerTag, // consumerTag, - false, // autoAck - false, // exclusive - false, // noLocal - false, // noWait - amqp.Table{}, // arguments TODO should something go here? - ) - if err != nil { - c.log.Error(err, "error consuming from rabbitmq queue") - return fmt.Errorf("error '%w' consuming from rabbitmq queue", err) - } - - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - for delivery := range deliveries { - select { - case sig := <-sigChan: - c.log.Info("terminating due to signal", "signal", sig) - break - default: - pl, err := DeliveryToPayload(delivery) - if err != nil { - errorHandlerErr := c.handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler) - if errorHandlerErr != nil { - // fatal error, abort the consumer - return errorHandlerErr - } - continue - } - err = payloadHandler(pl) - if err != nil { - errorHandlerErr := c.handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler) - if errorHandlerErr != nil { - // fatal error, abort the consumer - return errorHandlerErr - } - continue - } - ackErr := delivery.Ack(false) - if ackErr != nil { - // fatal error, abort the consumer - return ackErr - } - } - } - close(sigChan) - - select { - case err = <-c.err: - return fmt.Errorf("error '%w' with rabbitmq connection", err) - default: - } - - return nil -} - -type ConsumerError struct { - err error - delivery amqp.Delivery - pl *SeldonPayloadWithHeaders // might be nil -} - -func (c *consumer) handleConsumerError( - args ConsumerError, - errorHandler func(args ConsumerError) error, -) error { - delivery := args.delivery - - err := errorHandler(args) - if err != nil { - c.log.Error(err, "error handler encountered an error", "original error", args.err) - return fmt.Errorf("error handler encountered an error '%w' when handling original error '%v'", err, args.err) - } - - rejectErr := delivery.Reject(false) - if rejectErr != nil { - c.log.Error(rejectErr, "error rejecting") - return fmt.Errorf("error '%w' rejecting", rejectErr) - } - - return nil -} diff --git a/executor/api/rabbitmq/consumer_test.go b/executor/api/rabbitmq/consumer_test.go index d5d2afe8a9..c2b9edaa2e 100644 --- a/executor/api/rabbitmq/consumer_test.go +++ b/executor/api/rabbitmq/consumer_test.go @@ -1,173 +1,174 @@ package rabbitmq -import ( - "errors" - "github.com/go-logr/logr/testr" - proto2 "github.com/golang/protobuf/proto" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" - "github.com/seldonio/seldon-core/executor/api/payload" - "github.com/seldonio/seldon-core/executor/api/rest" - "github.com/stretchr/testify/assert" - "testing" -) - -/* - * based on patterns from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go - */ - -func TestConsume(t *testing.T) { - log := testr.New(t) - testMessageStr := `"hello"` - seldonMessage := proto.SeldonMessage{ - Status: &proto.Status{ - Status: proto.Status_SUCCESS, - }, - Meta: nil, - DataOneof: &proto.SeldonMessage_StrData{ - StrData: testMessageStr, - }, - } - seldonMessageEnc, _ := proto2.Marshal(&seldonMessage) - seldonMessage.XXX_sizecache = 0 // to make test cases match - - t.Run("success", func(t *testing.T) { - mockChan := &mockChannel{} - - mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer - mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) - close(mockDeliveries) - - mockChan.On("Consume", queueName, consumerTag, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, - nil) - mockChan.On("Ack", uint64(0), false).Return(nil) - - cons := &consumer{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - consumerTag: consumerTag, - } - - payloadHandler := func(pl *SeldonPayloadWithHeaders) error { - assert.Equal( - t, - &SeldonPayloadWithHeaders{ - &payload.BytesPayload{ - Msg: []byte(`"hello"`), - ContentType: rest.ContentTypeJSON, - ContentEncoding: "", - }, - make(map[string][]string), - }, - pl, - "payloads not equal", - ) - return nil - } - - errorHandler := func(err ConsumerError) error { - assert.NoError(t, err.err, "unexpected error") - return nil - } - - err := cons.Consume(payloadHandler, errorHandler) - - assert.NoError(t, err) - mockChan.AssertExpectations(t) - }) - - t.Run("on failed message don't retry, but reject method", func(t *testing.T) { - mockChan := &mockChannel{} - - mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer - mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) - close(mockDeliveries) - - mockChan.On("Consume", queueName, consumerTag, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, - nil) - mockChan.On("Reject", uint64(0), false).Return(nil) - - cons := &consumer{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - consumerTag: consumerTag, - } - - payloadHandler := func(pl *SeldonPayloadWithHeaders) error { - return errors.New("Something bad happened during prediction") - } - - errorHandler := func(err ConsumerError) error { - return nil - } - - err := cons.Consume(payloadHandler, errorHandler) - - assert.NoError(t, err) - mockChan.AssertExpectations(t) - }) - - t.Run("encoded seldon msg", func(t *testing.T) { - mockChan := &mockChannel{} - - mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer - mockDeliveries <- createTestDelivery(mockChan, seldonMessageEnc, payload.APPLICATION_TYPE_PROTOBUF) - close(mockDeliveries) - - mockChan.On("Consume", queueName, consumerTag, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, - nil) - mockChan.On("Ack", uint64(0), false).Return(nil) - - cons := &consumer{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - consumerTag: consumerTag, - } - - payloadHandler := func(pl *SeldonPayloadWithHeaders) error { - assert.Equal( - t, - &SeldonPayloadWithHeaders{ - &payload.ProtoPayload{ - Msg: &seldonMessage, - }, - make(map[string][]string), - }, - pl, - "payloads not equal", - ) - return nil - } - - errorHandler := func(err ConsumerError) error { - assert.NoError(t, err.err, "unexpected error") - return nil - } - - err := cons.Consume(payloadHandler, errorHandler) - - assert.NoError(t, err) - mockChan.AssertExpectations(t) - }) -} - -func createTestDelivery(ack amqp.Acknowledger, body []byte, contentType string) amqp.Delivery { - return amqp.Delivery{ - Acknowledger: ack, - Body: body, - ContentType: contentType, - ContentEncoding: "", - } -} +// +//import ( +// "errors" +// "github.com/go-logr/logr/testr" +// proto2 "github.com/golang/protobuf/proto" +// amqp "github.com/rabbitmq/amqp091-go" +// "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" +// "github.com/seldonio/seldon-core/executor/api/payload" +// "github.com/seldonio/seldon-core/executor/api/rest" +// "github.com/stretchr/testify/assert" +// "testing" +//) +// +///* +// * based on patterns from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go +// */ +// +//func TestConsume(t *testing.T) { +// log := testr.New(t) +// testMessageStr := `"hello"` +// seldonMessage := proto.SeldonMessage{ +// Status: &proto.Status{ +// Status: proto.Status_SUCCESS, +// }, +// Meta: nil, +// DataOneof: &proto.SeldonMessage_StrData{ +// StrData: testMessageStr, +// }, +// } +// seldonMessageEnc, _ := proto2.Marshal(&seldonMessage) +// seldonMessage.XXX_sizecache = 0 // to make test cases match +// +// t.Run("success", func(t *testing.T) { +// mockChan := &mockChannel{} +// +// mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer +// mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) +// close(mockDeliveries) +// +// mockChan.On("Consume", queueName, consumerTag, false, false, false, false, +// amqp.Table{}).Return(mockDeliveries, +// nil) +// mockChan.On("Ack", uint64(0), false).Return(nil) +// +// cons := &consumer{ +// connection: connection{ +// log: log, +// channel: mockChan, +// }, +// queueName: queueName, +// consumerTag: consumerTag, +// } +// +// payloadHandler := func(pl *SeldonPayloadWithHeaders) error { +// assert.Equal( +// t, +// &SeldonPayloadWithHeaders{ +// &payload.BytesPayload{ +// Msg: []byte(`"hello"`), +// ContentType: rest.ContentTypeJSON, +// ContentEncoding: "", +// }, +// make(map[string][]string), +// }, +// pl, +// "payloads not equal", +// ) +// return nil +// } +// +// errorHandler := func(err ConsumerError) error { +// assert.NoError(t, err.err, "unexpected error") +// return nil +// } +// +// err := cons.Consume(payloadHandler, errorHandler) +// +// assert.NoError(t, err) +// mockChan.AssertExpectations(t) +// }) +// +// t.Run("on failed message don't retry, but reject method", func(t *testing.T) { +// mockChan := &mockChannel{} +// +// mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer +// mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) +// close(mockDeliveries) +// +// mockChan.On("Consume", queueName, consumerTag, false, false, false, false, +// amqp.Table{}).Return(mockDeliveries, +// nil) +// mockChan.On("Reject", uint64(0), false).Return(nil) +// +// cons := &consumer{ +// connection: connection{ +// log: log, +// channel: mockChan, +// }, +// queueName: queueName, +// consumerTag: consumerTag, +// } +// +// payloadHandler := func(pl *SeldonPayloadWithHeaders) error { +// return errors.New("Something bad happened during prediction") +// } +// +// errorHandler := func(err ConsumerError) error { +// return nil +// } +// +// err := cons.Consume(payloadHandler, errorHandler) +// +// assert.NoError(t, err) +// mockChan.AssertExpectations(t) +// }) +// +// t.Run("encoded seldon msg", func(t *testing.T) { +// mockChan := &mockChannel{} +// +// mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer +// mockDeliveries <- createTestDelivery(mockChan, seldonMessageEnc, payload.APPLICATION_TYPE_PROTOBUF) +// close(mockDeliveries) +// +// mockChan.On("Consume", queueName, consumerTag, false, false, false, false, +// amqp.Table{}).Return(mockDeliveries, +// nil) +// mockChan.On("Ack", uint64(0), false).Return(nil) +// +// cons := &consumer{ +// connection: connection{ +// log: log, +// channel: mockChan, +// }, +// queueName: queueName, +// consumerTag: consumerTag, +// } +// +// payloadHandler := func(pl *SeldonPayloadWithHeaders) error { +// assert.Equal( +// t, +// &SeldonPayloadWithHeaders{ +// &payload.ProtoPayload{ +// Msg: &seldonMessage, +// }, +// make(map[string][]string), +// }, +// pl, +// "payloads not equal", +// ) +// return nil +// } +// +// errorHandler := func(err ConsumerError) error { +// assert.NoError(t, err.err, "unexpected error") +// return nil +// } +// +// err := cons.Consume(payloadHandler, errorHandler) +// +// assert.NoError(t, err) +// mockChan.AssertExpectations(t) +// }) +//} +// +//func createTestDelivery(ack amqp.Acknowledger, body []byte, contentType string) amqp.Delivery { +// return amqp.Delivery{ +// Acknowledger: ack, +// Body: body, +// ContentType: contentType, +// ContentEncoding: "", +// } +//} diff --git a/executor/api/rabbitmq/publisher.go b/executor/api/rabbitmq/publisher.go deleted file mode 100644 index 660ebf2128..0000000000 --- a/executor/api/rabbitmq/publisher.go +++ /dev/null @@ -1,68 +0,0 @@ -package rabbitmq - -import ( - "fmt" - - "github.com/go-logr/logr" - amqp "github.com/rabbitmq/amqp091-go" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher.go - */ - -const ( - publishMandatory = true - publishImmediate = false -) - -type publisher struct { - connection - queueName string -} - -func NewPublisher(uri, queueName string, logger logr.Logger) (*publisher, error) { - c, err := NewConnection(uri, logger.WithName("Publisher")) - if err != nil { - c.log.Error(err, "error creating connection for publisher", "uri", c.uri) - return nil, fmt.Errorf("error '%w' creating connection to '%v' for publisher", err, uri) - } - return &publisher{ - *c, - queueName, - }, nil -} - -// In the event that the underlying connection was closed after connection creation, this function will attempt to -// reconnection to the AMQP broker before performing these operations. -func (p *publisher) Publish(payload SeldonPayloadWithHeaders) error { - select { - case <-p.err: - p.log.Info("attempting to reconnect to rabbitmq", "uri", p.uri) - - if err := p.connect(); err != nil { - p.log.Error(err, "error reconnecting to rabbitmq") - return fmt.Errorf("error '%w' reconnecting to rabbitmq", err) - } - default: - } - - body, err := payload.GetBytes() - if err != nil { - p.log.Error(err, "error retrieving payload bytes") - return fmt.Errorf("error '%w' retrieving payload bytes", err) - } - message := amqp.Publishing{ - Headers: StringMapToTable(payload.Headers), - ContentType: payload.GetContentType(), - ContentEncoding: payload.GetContentEncoding(), - DeliveryMode: amqp.Persistent, - Body: body, - } - err = p.channel.Publish(amqpExchange, p.queueName, publishMandatory, publishImmediate, message) - if err != nil { - p.log.Error(err, "error publishing rabbitmq message") - return fmt.Errorf("error '%w' publishing rabbitmq message", err) - } - return nil -} diff --git a/executor/api/rabbitmq/publisher_test.go b/executor/api/rabbitmq/publisher_test.go index 1b2030b57b..dc6e8ef377 100644 --- a/executor/api/rabbitmq/publisher_test.go +++ b/executor/api/rabbitmq/publisher_test.go @@ -1,158 +1,159 @@ package rabbitmq -import ( - "errors" - "github.com/go-logr/logr/testr" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/stretchr/testify/assert" - "testing" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go - */ - -func TestPublisher(t *testing.T) { - log := testr.New(t) - testMessage := SeldonPayloadWithHeaders{ - &TestPayload{Msg: `"hello"`}, - make(map[string][]string), - } - - t.Run("success", func(t *testing.T) { - mockChan := &mockChannel{} - - msg := amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - } - mockChan.On("Publish", "", queueName, true, false, msg).Return(nil) - - pub := &publisher{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - } - - assert.NoError(t, pub.Publish(testMessage)) - - mockChan.AssertExpectations(t) - }) - - t.Run("publish_failure", func(t *testing.T) { - mockChan := &mockChannel{} - - mockChan.On("Publish", "", queueName, true, false, amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - }).Return(errors.New("test error")) - - pub := &publisher{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - } - - assert.ErrorContains(t, pub.Publish(testMessage), "test error") - - mockChan.AssertExpectations(t) - }) - - t.Run("connection_closed", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - channel.On("Publish", "", queueName, true, false, amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - }).Return(nil) - - conn.On("Channel").Return(channel, nil) - adapter.On("Dial", uri).Return(conn, nil) - }) - defer reset() - - pub := &publisher{ - connection: connection{ - uri: uri, - log: log, - err: make(chan error, 1), - }, - queueName: queueName, - } - - pub.err <- errors.New("dang, conn be broke") - - assert.NoError(t, pub.Publish(testMessage)) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("connection_closed_retry", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - channel.On("Publish", "", queueName, true, false, amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - }).Return(nil) - - conn.On("Channel").Return(channel, nil) - - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")).Once() - adapter.On("Dial", uri).Return(conn, nil).Once() - }) - defer reset() - - pub := &publisher{ - connection: connection{ - uri: uri, - log: log, - err: make(chan error, 1), - }, - queueName: queueName, - } - - pub.err <- errors.New("dang, conn be broke") - - assert.NoError(t, pub.Publish(testMessage)) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("connection_closed_retry_failure", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")) - }) - defer reset() - - pub := &publisher{ - connection: connection{ - uri: uri, - log: log, - err: make(chan error, 1), - }, - queueName: queueName, - } - - pub.err <- errors.New("dang, conn be broke") - - assert.Error(t, pub.Publish(testMessage)) - - f.adapter.AssertExpectations(t) - }) -} +// +//import ( +// "errors" +// "github.com/go-logr/logr/testr" +// amqp "github.com/rabbitmq/amqp091-go" +// "github.com/stretchr/testify/assert" +// "testing" +//) +// +///* +// * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go +// */ +// +//func TestPublisher(t *testing.T) { +// log := testr.New(t) +// testMessage := SeldonPayloadWithHeaders{ +// &TestPayload{Msg: `"hello"`}, +// make(map[string][]string), +// } +// +// t.Run("success", func(t *testing.T) { +// mockChan := &mockChannel{} +// +// msg := amqp.Publishing{ +// Headers: make(map[string]interface{}), +// ContentType: "application/json", +// DeliveryMode: amqp.Persistent, +// Body: []byte(`"hello"`), +// } +// mockChan.On("Publish", "", queueName, true, false, msg).Return(nil) +// +// pub := &publisher{ +// connection: connection{ +// log: log, +// channel: mockChan, +// }, +// queueName: queueName, +// } +// +// assert.NoError(t, pub.Publish(testMessage)) +// +// mockChan.AssertExpectations(t) +// }) +// +// t.Run("publish_failure", func(t *testing.T) { +// mockChan := &mockChannel{} +// +// mockChan.On("Publish", "", queueName, true, false, amqp.Publishing{ +// Headers: make(map[string]interface{}), +// ContentType: "application/json", +// DeliveryMode: amqp.Persistent, +// Body: []byte(`"hello"`), +// }).Return(errors.New("test error")) +// +// pub := &publisher{ +// connection: connection{ +// log: log, +// channel: mockChan, +// }, +// queueName: queueName, +// } +// +// assert.ErrorContains(t, pub.Publish(testMessage), "test error") +// +// mockChan.AssertExpectations(t) +// }) +// +// t.Run("connection_closed", func(t *testing.T) { +// f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { +// channel.On("Qos", 1, 0, true).Return(nil) +// channel.On("Publish", "", queueName, true, false, amqp.Publishing{ +// Headers: make(map[string]interface{}), +// ContentType: "application/json", +// DeliveryMode: amqp.Persistent, +// Body: []byte(`"hello"`), +// }).Return(nil) +// +// conn.On("Channel").Return(channel, nil) +// adapter.On("Dial", uri).Return(conn, nil) +// }) +// defer reset() +// +// pub := &publisher{ +// connection: connection{ +// uri: uri, +// log: log, +// err: make(chan error, 1), +// }, +// queueName: queueName, +// } +// +// pub.err <- errors.New("dang, conn be broke") +// +// assert.NoError(t, pub.Publish(testMessage)) +// +// f.adapter.AssertExpectations(t) +// f.connection.AssertExpectations(t) +// f.channel.AssertExpectations(t) +// }) +// +// t.Run("connection_closed_retry", func(t *testing.T) { +// f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { +// channel.On("Qos", 1, 0, true).Return(nil) +// channel.On("Publish", "", queueName, true, false, amqp.Publishing{ +// Headers: make(map[string]interface{}), +// ContentType: "application/json", +// DeliveryMode: amqp.Persistent, +// Body: []byte(`"hello"`), +// }).Return(nil) +// +// conn.On("Channel").Return(channel, nil) +// +// adapter.On("Dial", uri).Return(nil, errors.New("test dial error")).Once() +// adapter.On("Dial", uri).Return(conn, nil).Once() +// }) +// defer reset() +// +// pub := &publisher{ +// connection: connection{ +// uri: uri, +// log: log, +// err: make(chan error, 1), +// }, +// queueName: queueName, +// } +// +// pub.err <- errors.New("dang, conn be broke") +// +// assert.NoError(t, pub.Publish(testMessage)) +// +// f.adapter.AssertExpectations(t) +// f.connection.AssertExpectations(t) +// f.channel.AssertExpectations(t) +// }) +// +// t.Run("connection_closed_retry_failure", func(t *testing.T) { +// f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { +// adapter.On("Dial", uri).Return(nil, errors.New("test dial error")) +// }) +// defer reset() +// +// pub := &publisher{ +// connection: connection{ +// uri: uri, +// log: log, +// err: make(chan error, 1), +// }, +// queueName: queueName, +// } +// +// pub.err <- errors.New("dang, conn be broke") +// +// assert.Error(t, pub.Publish(testMessage)) +// +// f.adapter.AssertExpectations(t) +// }) +//} diff --git a/executor/api/rabbitmq/rabbitmq_client.go b/executor/api/rabbitmq/rabbitmq_client.go new file mode 100644 index 0000000000..1cee783886 --- /dev/null +++ b/executor/api/rabbitmq/rabbitmq_client.go @@ -0,0 +1,153 @@ +package rabbitmq + +import ( + "fmt" + "github.com/go-logr/logr" + "github.com/wagslane/go-rabbitmq" +) + +const ( + publishMandatory = true + publishImmediate = false +) + +type RabbitMqConnection struct { + conn *rabbitmq.Conn + log logr.Logger +} + +type RabbitMqConsumer struct { + rabbitmq.Consumer +} + +type RabbitMqPublisher struct { + rabbitmq.Publisher +} + +func createRabbitMQConnection(brokerUrl string, log logr.Logger) (ConnectionWrapper, error) { + conn, err := rabbitmq.NewConn(brokerUrl, rabbitmq.WithConnectionOptionsLogging) + if err != nil { + log.Error(err, "error connecting to rabbitmq") + return nil, fmt.Errorf("error '%w' connecting to rabbitmq", err) + } + return &RabbitMqConnection{conn, log}, nil +} + +func (c *RabbitMqConnection) Close() error { + return c.conn.Close() +} + +func (c *RabbitMqConnection) NewPublisher() (PublisherWrapper, error) { + publisher, err := rabbitmq.NewPublisher( + c.conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName(""), //default exchange + ) + if err != nil { + c.log.Error(err, "error creating publisher") + return nil, fmt.Errorf("error '%w' creating publisher", err) + } + return &RabbitMqPublisher{*publisher}, nil +} + +func (c *RabbitMqConnection) NewConsumer(handler rabbitmq.Handler, queue string, consumerTag string) (ConsumerWrapper, error) { + consumer, err := rabbitmq.NewConsumer( + c.conn, + handler, + queue, + rabbitmq.WithConsumerOptionsConsumerName(consumerTag), + rabbitmq.WithConsumerOptionsQueueNoDeclare, + ) + if err != nil { + c.log.Error(err, "error creating publisher") + return nil, fmt.Errorf("error '%w' creating publisher", err) + } + return &RabbitMqConsumer{*consumer}, nil +} + +func (c *RabbitMqConsumer) Close() { + c.Consumer.Close() +} + +func (p *RabbitMqPublisher) Close() { + p.Publisher.Close() +} + +func (p *RabbitMqPublisher) Publish( + data []byte, + routingKeys []string, + optionFuncs ...func(*rabbitmq.PublishOptions), +) error { + return p.Publisher.Publish(data, routingKeys, optionFuncs...) +} + +func DoPublish( + publisher PublisherWrapper, + payload SeldonPayloadWithHeaders, + queueName string, + log logr.Logger, +) error { + + body, err := payload.GetBytes() + if err != nil { + log.Error(err, "error retrieving payload bytes") + return fmt.Errorf("error '%w' retrieving payload bytes", err) + } + + options := []func(options *rabbitmq.PublishOptions){ + rabbitmq.WithPublishOptionsHeaders(StringMapToTable(payload.Headers)), + rabbitmq.WithPublishOptionsContentType(payload.GetContentType()), + rabbitmq.WithPublishOptionsContentEncoding(""), + rabbitmq.WithPublishOptionsPersistentDelivery, + } + if publishMandatory { + options = append(options, rabbitmq.WithPublishOptionsMandatory) + } + if publishImmediate { + options = append(options, rabbitmq.WithPublishOptionsImmediate) + } + + err = publisher.Publish( + body, + []string{queueName}, + options..., + ) + if err != nil { + log.Error(err, "error publishing rabbitmq message") + return fmt.Errorf("error '%w' publishing rabbitmq message", err) + } + + return nil +} + +func CreateConsumerHandler( + payloadHandler func(*SeldonPayloadWithHeaders) error, + errorHandler func(args ConsumerError) error, + log logr.Logger, +) rabbitmq.Handler { + return func(delivery rabbitmq.Delivery) rabbitmq.Action { + pl, err := DeliveryToPayload(delivery) + if err != nil { + return handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler, log) + } + err = payloadHandler(pl) + if err != nil { + return handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler, log) + } + return rabbitmq.Ack + } +} + +func handleConsumerError( + args ConsumerError, + errorHandler func(args ConsumerError) error, + log logr.Logger, +) rabbitmq.Action { + + err := errorHandler(args) + if err != nil { + log.Error(err, "error handler encountered an error", "original error", args.err) + } + + return rabbitmq.NackDiscard +} diff --git a/executor/api/rabbitmq/rabbitmq_test.go b/executor/api/rabbitmq/rabbitmq_test.go index 007d5494b5..2158589e95 100644 --- a/executor/api/rabbitmq/rabbitmq_test.go +++ b/executor/api/rabbitmq/rabbitmq_test.go @@ -1,91 +1,66 @@ package rabbitmq import ( - amqp "github.com/rabbitmq/amqp091-go" "github.com/seldonio/seldon-core/executor/api/rest" "github.com/stretchr/testify/mock" + "github.com/wagslane/go-rabbitmq" ) -/* - * adapted from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/amqp_test.go - */ - -type mockDialerAdapter struct { - mock.Mock -} - -func (m *mockDialerAdapter) Dial(url string) (Connection, error) { - args := m.Called(url) - amqpConn, _ := args.Get(0).(Connection) - - return amqpConn, args.Error(1) -} - -type mockConnection struct { +type MockConnectionWrapper struct { mock.Mock } -func (m *mockConnection) Channel() (Channel, error) { - args := m.Called() - amqpCh, _ := args.Get(0).(Channel) - - return amqpCh, args.Error(1) -} - -func (m *mockConnection) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { - return receiver -} - -func (m *mockConnection) Close() error { - args := m.Called() - return args.Error(0) +func (m *MockConnectionWrapper) Close() error { + returnArgs := m.Called() + return returnArgs.Error(0) } -type mockChannel struct { - mock.Mock -} - -func (m *mockChannel) QueueDeclare( - name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, -) (amqp.Queue, error) { - mArgs := m.Called(name, durable, autoDelete, exclusive, noWait, args) - return mArgs.Get(0).(amqp.Queue), mArgs.Error(1) +func (m *MockConnectionWrapper) NewPublisher() (PublisherWrapper, error) { + returnArgs := m.Called() + publisher := returnArgs.Get(0).(PublisherWrapper) + return publisher, returnArgs.Error(1) } -func (m *mockChannel) Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error { - args := m.Called(exchange, key, mandatory, immediate, msg) - return args.Error(0) +func (m *MockConnectionWrapper) NewConsumer(handler rabbitmq.Handler, queue string, consumerTag string) (ConsumerWrapper, error) { + returnArgs := m.Called(handler, inputQueue, consumerTag) + consumer := returnArgs.Get(0).(*TestConsumerWrapper) // unchecked for now + consumer.handler = handler + consumer.queue = queue + consumer.consumerTag = consumerTag + return consumer, returnArgs.Error(1) } -func (m *mockChannel) Consume( - name string, consumerTag string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table, -) (<-chan amqp.Delivery, error) { - mArgs := m.Called(name, consumerTag, autoAck, exclusive, noLocal, noWait, args) - return mArgs.Get(0).(chan amqp.Delivery), mArgs.Error(1) +type TestConsumerWrapper struct { + handler rabbitmq.Handler + queue string + consumerTag string + isClosed bool } -func (m *mockChannel) Ack(tag uint64, multiple bool) error { - args := m.Called(tag, multiple) - return args.Error(0) +func (m *TestConsumerWrapper) Close() { + println("closed consumer wrapper") + m.isClosed = true } -func (m *mockChannel) Nack(tag uint64, multiple bool, requeue bool) error { - args := m.Called(tag, multiple, requeue) - return args.Error(0) +func (m *TestConsumerWrapper) SimulateDelivery(delivery rabbitmq.Delivery) rabbitmq.Action { + return m.handler(delivery) } -func (m *mockChannel) Reject(tag uint64, requeue bool) error { - args := m.Called(tag, requeue) - return args.Error(0) +type MockPublisherWrapper struct { + mock.Mock } -func (m *mockChannel) Qos(prefetchCount int, prefetchSize int, global bool) error { - args := m.Called(prefetchCount, prefetchSize, global) - return args.Error(0) +func (m *MockPublisherWrapper) Close() { + m.Called() } -func (m *mockChannel) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { - return receiver +func (m *MockPublisherWrapper) Publish( + data []byte, + routingKeys []string, + optionFuncs ...func(*rabbitmq.PublishOptions), +) error { + returnArgs := m.Called(data, routingKeys, optionFuncs) + return returnArgs.Error(0) } type TestPayload struct { diff --git a/executor/api/rabbitmq/server.go b/executor/api/rabbitmq/server.go index ae3c9bbf91..56fec75083 100644 --- a/executor/api/rabbitmq/server.go +++ b/executor/api/rabbitmq/server.go @@ -4,19 +4,21 @@ import ( "context" "errors" "fmt" - "log" "net/url" "os" + "os/signal" "strconv" + "sync" + "syscall" "time" "github.com/golang/protobuf/jsonpb" guuid "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" - amqp "github.com/rabbitmq/amqp091-go" "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" "github.com/seldonio/seldon-core/executor/k8s" + "github.com/wagslane/go-rabbitmq" "github.com/go-logr/logr" "github.com/seldonio/seldon-core/executor/api" @@ -121,29 +123,61 @@ func CreateRabbitMQServer(args RabbitMQServerOptions) (*SeldonRabbitMQServer, er } func (rs *SeldonRabbitMQServer) Serve() error { - conn, err := NewConnection(rs.BrokerUrl, rs.Log.WithName("RabbitMQServerConnection")) + conn, err := createRabbitMQConnection(rs.BrokerUrl, rs.Log) if err != nil { rs.Log.Error(err, "error connecting to rabbitmq") return fmt.Errorf("error '%w' connecting to rabbitmq", err) } + defer func(conn ConnectionWrapper) { + err := conn.Close() + if err != nil { + rs.Log.Error(err, "error closing rabbitMQ connection") + } + }(conn) + + wg := new(sync.WaitGroup) + terminateChan, err := rs.serve(conn, wg) + if err != nil { + rs.Log.Error(err, "error starting rabbitmq server") + return fmt.Errorf("error '%w' starting rabbitmq server", err) + } + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + wg.Add(1) + // wait for shutdown signal and terminate if received go func() { - err := <-conn.err - log.Fatal("RabbitMQ connection died", err) // causes app to exit with error + rs.Log.Info("awaiting OS shutdown signals") + sig := <-sigs + rs.Log.Info("sending termination message due to signal", "signal", sig) + terminateChan <- true + wg.Done() }() - return rs.serve(conn) + wg.Wait() + + rs.Log.Info("RabbitMQ server terminated normally") + return nil } -func (rs *SeldonRabbitMQServer) serve(conn *connection) error { - //TODO not sure if this is the best pattern or better to pass in pod name explicitly somehow +func (rs *SeldonRabbitMQServer) serve(conn ConnectionWrapper, wg *sync.WaitGroup) (chan<- bool, error) { + // not sure if this is the best pattern or better to pass in pod name explicitly somehow consumerTag, err := os.Hostname() if err != nil { - return fmt.Errorf("error '%w' retrieving hostname", err) + return nil, fmt.Errorf("error '%w' retrieving hostname", err) } - consumer := &consumer{*conn, rs.InputQueueName, consumerTag} - rs.Log.Info("Created", "consumer", consumer, "input queue", rs.InputQueueName) + publisher, err := conn.NewPublisher() + if err != nil { + return nil, fmt.Errorf("error '%w' creating RMQ publisher", err) + } + rs.Log.Info("Created", "publisher", publisher) + + consumerHandler := CreateConsumerHandler( + func(reqPl *SeldonPayloadWithHeaders) error { return rs.predictAndPublishResponse(reqPl, publisher) }, + func(args ConsumerError) error { return rs.createAndPublishErrorResponse(args, publisher) }, + rs.Log, + ) // wait for graph to be ready ready := false @@ -156,25 +190,31 @@ func (rs *SeldonRabbitMQServer) serve(conn *connection) error { } } - producer := &publisher{*conn, rs.OutputQueueName} - - // consumer creates input queue if it doesn't exist - err = consumer.Consume( - func(reqPl *SeldonPayloadWithHeaders) error { return rs.predictAndPublishResponse(reqPl, producer) }, - func(args ConsumerError) error { return rs.createAndPublishErrorResponse(args, producer) }, - ) + consumer, err := conn.NewConsumer(consumerHandler, rs.InputQueueName, consumerTag) if err != nil { - rs.Log.Error(err, "error in consumer") - return fmt.Errorf("error '%w' in consumer", err) + return nil, fmt.Errorf("error '%w' creating RMQ consumer", err) } + rs.Log.Info("Created", "consumer", consumer, "input queue", rs.InputQueueName) - rs.Log.Info("Consumer exited without error") - return nil + // provide a channel to terminate the server + terminate := make(chan bool, 1) + + wg.Add(1) + go func() { + rs.Log.Info("awaiting group termination") + <-terminate + rs.Log.Info("termination initiated, shutting down") + consumer.Close() + publisher.Close() + wg.Done() + }() + + return terminate, nil } func (rs *SeldonRabbitMQServer) predictAndPublishResponse( reqPayload *SeldonPayloadWithHeaders, - publisher *publisher, + publisher PublisherWrapper, ) error { if reqPayload == nil { err := errors.New("missing request payload") @@ -243,10 +283,10 @@ func (rs *SeldonRabbitMQServer) predictAndPublishResponse( rs.Log.Error(err, UNHANDLED_ERROR) return fmt.Errorf("unhandled error %w from predictor process", err) } - return publishPayload(publisher, updatedPayload, seldonPuid) + return rs.publishPayload(publisher, updatedPayload, seldonPuid) } -func (rs *SeldonRabbitMQServer) createAndPublishErrorResponse(errorArgs ConsumerError, publisher *publisher) error { +func (rs *SeldonRabbitMQServer) createAndPublishErrorResponse(errorArgs ConsumerError, publisher PublisherWrapper) error { reqPayload := errorArgs.pl seldonPuid := assignAndReturnPUID(reqPayload, &errorArgs.delivery) @@ -285,10 +325,22 @@ func (rs *SeldonRabbitMQServer) createAndPublishErrorResponse(errorArgs Consumer break } - return publishPayload(publisher, resPayload, seldonPuid) + return rs.publishPayload(publisher, resPayload, seldonPuid) } -func assignAndReturnPUID(pl *SeldonPayloadWithHeaders, delivery *amqp.Delivery) string { +func (rs *SeldonRabbitMQServer) publishPayload(publisher PublisherWrapper, pl payload.SeldonPayload, seldonPuid string) error { + resHeaders := map[string][]string{payload.SeldonPUIDHeader: {seldonPuid}} + //TODO might need more headers + + resPayloadWithHeaders := SeldonPayloadWithHeaders{ + pl, + resHeaders, + } + + return DoPublish(publisher, resPayloadWithHeaders, rs.OutputQueueName, rs.Log) +} + +func assignAndReturnPUID(pl *SeldonPayloadWithHeaders, delivery *rabbitmq.Delivery) string { if pl == nil { if delivery != nil && delivery.Headers != nil && delivery.Headers[payload.SeldonPUIDHeader] != nil { return delivery.Headers[payload.SeldonPUIDHeader].(string) @@ -303,15 +355,3 @@ func assignAndReturnPUID(pl *SeldonPayloadWithHeaders, delivery *amqp.Delivery) } return pl.Headers[payload.SeldonPUIDHeader][0] } - -func publishPayload(publisher *publisher, pl payload.SeldonPayload, seldonPuid string) error { - resHeaders := map[string][]string{payload.SeldonPUIDHeader: {seldonPuid}} - //TODO might need more headers - - resPayloadWithHeaders := SeldonPayloadWithHeaders{ - pl, - resHeaders, - } - - return publisher.Publish(resPayloadWithHeaders) -} diff --git a/executor/api/rabbitmq/server_test.go b/executor/api/rabbitmq/server_test.go index 56b8633fac..95bdebd72c 100644 --- a/executor/api/rabbitmq/server_test.go +++ b/executor/api/rabbitmq/server_test.go @@ -1,15 +1,10 @@ package rabbitmq import ( - "errors" - "fmt" "github.com/go-logr/logr/testr" - guuid "github.com/google/uuid" . "github.com/onsi/gomega" - amqp "github.com/rabbitmq/amqp091-go" "github.com/seldonio/seldon-core/executor/api" "github.com/seldonio/seldon-core/executor/api/payload" - "github.com/seldonio/seldon-core/executor/api/rest" "github.com/seldonio/seldon-core/executor/api/test" v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" "github.com/stretchr/testify/assert" @@ -20,6 +15,7 @@ import ( "net/url" "strconv" "strings" + "sync" "testing" ) @@ -49,11 +45,11 @@ func TestRabbitMqServer(t *testing.T) { }) server := httptest.NewServer(handler) defer server.Close() - serverUrl, err := url.Parse(server.URL) - g.Expect(err).Should(BeNil()) + serverUrl, setupErr := url.Parse(server.URL) + g.Expect(setupErr).Should(BeNil()) urlParts := strings.Split(serverUrl.Host, ":") - port, err := strconv.Atoi(urlParts[1]) - g.Expect(err).Should(BeNil()) + port, setupErr := strconv.Atoi(urlParts[1]) + g.Expect(setupErr).Should(BeNil()) p := v1.PredictorSpec{ Name: "p", @@ -84,19 +80,19 @@ func TestRabbitMqServer(t *testing.T) { FullHealthCheck: fullHealthCheck, } - testPuid := guuid.New().String() - testHeaders := map[string]interface{}{payload.SeldonPUIDHeader: testPuid} - - invalidErrorJsonResponse := fmt.Sprintf( - `{"status":{"info":"Prediction Failed","reason":"unknown payload type 'bogus'","status":"FAILURE"},"meta":{"puid":"%v"}}`, - testPuid, - ) - invalidErrorPublishing := amqp.Publishing{ - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(invalidErrorJsonResponse), - Headers: testHeaders, - } + //testPuid := guuid.New().String() + //testHeaders := map[string]interface{}{payload.SeldonPUIDHeader: testPuid} + // + //invalidErrorJsonResponse := fmt.Sprintf( + // `{"status":{"info":"Prediction Failed","reason":"unknown payload type 'bogus'","status":"FAILURE"},"meta":{"puid":"%v"}}`, + // testPuid, + //) + //invalidErrorPublishing := rabbitmq.Publishing{ + // ContentType: "application/json", + // DeliveryMode: rabbitmq.Persistent, + // Body: []byte(invalidErrorJsonResponse), + // Headers: testHeaders, + //} t.Run("create server", func(t *testing.T) { server, err := CreateRabbitMQServer(RabbitMQServerOptions{ @@ -131,144 +127,171 @@ func TestRabbitMqServer(t *testing.T) { }) /* - * This makes sure the Serve() and predictAndPublishResponse() code runs and makes the proper calls - * by hacking a bunch of mocks. + * This makes sure the serve() code runs and makes the proper calls by setting up mocks. * It is not doing anything to validate the messages are properly processed. That's challenging in a * unit test since the code connects to RabbitMQ. */ t.Run("serve", func(t *testing.T) { - mockRmqConn := &mockConnection{} - mockRmqChan := &mockChannel{} - mockConn := &connection{ - conn: mockRmqConn, - channel: mockRmqChan, - } - - testDelivery := amqp.Delivery{ - Acknowledger: mockRmqChan, - ContentType: rest.ContentTypeJSON, - Body: []byte(`{ "data": { "ndarray": [[1,2,3,4]] } }`), - Headers: testHeaders, - } - - mockDeliveries := make(chan amqp.Delivery, 1) - mockDeliveries <- testDelivery - close(mockDeliveries) - - mockRmqChan.On("Consume", inputQueue, mock.Anything, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, nil) - mockRmqChan.On("Publish", "", outputQueue, publishMandatory, publishImmediate, - mock.MatchedBy(func(p amqp.Publishing) bool { return true })).Return(nil) - mockRmqChan.On("Ack", uint64(0), false).Return(nil) - - err := testServer.serve(mockConn) - - assert.NoError(t, err) - - mockRmqChan.AssertExpectations(t) - }) + mockRmqConn := new(MockConnectionWrapper) - /* - * This makes sure the Serve(), predictAndPublishResponse(), and createAndPublishErrorResponse() code runs and - * makes the proper calls and returns an appropriate error message by hacking a bunch of mocks. - */ - t.Run("serveError", func(t *testing.T) { - mockRmqConn := &mockConnection{} - mockRmqChan := &mockChannel{} - mockConn := &connection{ - conn: mockRmqConn, - channel: mockRmqChan, - } - - invalidDelivery := amqp.Delivery{ - Acknowledger: mockRmqChan, - ContentType: "bogus", - Body: []byte(`bogus`), - Headers: testHeaders, - Redelivered: true, - } + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") - mockDeliveries := make(chan amqp.Delivery, 1) - mockDeliveries <- invalidDelivery - close(mockDeliveries) + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) - mockRmqChan.On("Consume", inputQueue, mock.Anything, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, nil) - mockRmqChan.On("Publish", "", outputQueue, publishMandatory, publishImmediate, - invalidErrorPublishing).Return(nil) - mockRmqChan.On("Reject", uint64(0), false).Return(nil) + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) - err := testServer.serve(mockConn) + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") assert.NoError(t, err) - mockRmqChan.AssertExpectations(t) + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) }) - t.Run("createAndPublishErrorResponse", func(t *testing.T) { - mockRmqConn := &mockConnection{} - mockRmqChan := &mockChannel{} - mockConn := &connection{ - conn: mockRmqConn, - channel: mockRmqChan, - } - - testDelivery := amqp.Delivery{ - Acknowledger: mockRmqChan, - ContentType: rest.ContentTypeJSON, - Body: []byte(`{ "data": { "ndarray": [[1,2,3,4]] } }`), - Headers: testHeaders, - } - - publisher := &publisher{*mockConn, outputQueue} - - // valid payload - error1Text := "error 1" - error1 := errors.New(error1Text) - generatedErrorPublishing1 := amqp.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf( - `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, - error1Text, - testPuid, - )), - DeliveryMode: amqp.Persistent, - Headers: testHeaders, - } - mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing1).Return(nil) - pl1, _ := DeliveryToPayload(testDelivery) - consumerError1 := ConsumerError{ - err: error1, - delivery: testDelivery, - pl: pl1, - } - err1 := testServer.createAndPublishErrorResponse(consumerError1, publisher) - assert.NoError(t, err1) - - mockRmqChan.AssertExpectations(t) - - // no payload - error2Text := "error 2" - error2 := errors.New(error2Text) - generatedErrorPublishing2 := amqp.Publishing{ - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(fmt.Sprintf( - `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, - error2Text, - testPuid, - )), - Headers: testHeaders, - } - mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing2).Return(nil) - consumerError2 := ConsumerError{ - err: error2, - delivery: testDelivery, - } - err2 := testServer.createAndPublishErrorResponse(consumerError2, publisher) - assert.NoError(t, err2) - - mockRmqChan.AssertExpectations(t) - }) + ///* + // * This makes sure the serve() code runs and makes the proper calls by setting up mocks. + // * It is not doing anything to validate the messages are properly processed. That's challenging in a + // * unit test since the code connects to RabbitMQ. + // */ + //t.Run("handleMessage", func(t *testing.T) { + // mockRmqConn := &MockConnectionWrapper{} + // + // mockPublisher := &MockPublisherWrapper{} + // mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + // + // mockConsumer := &TestConsumerWrapper{} + // mockRmqConn.On( + // "NewConsumer", + // mock.Anything, + // inputQueue, + // mock.Anything, + // ).Return(mockConsumer, nil) + // + // _, termChan, err := testServer.serve(mockRmqConn) + // + // termChan <- true + // + // assert.NoError(t, err) + // + // mockRmqConn.AssertExpectations(t) + // mockPublisher.AssertExpectations(t) + //}) + // + ///* + //* This makes sure the Serve(), predictAndPublishResponse(), and createAndPublishErrorResponse() code runs and + //* makes the proper calls and returns an appropriate error message by hacking a bunch of mocks. + // */ + //t.Run("serveError", func(t *testing.T) { + // mockRmqConn := &mockConnection{} + // mockRmqChan := &mockChannel{} + // mockConn := &connection{ + // conn: mockRmqConn, + // channel: mockRmqChan, + // } + // + // invalidDelivery := rabbitmq.Delivery{ + // Acknowledger: mockRmqChan, + // ContentType: "bogus", + // Body: []byte(`bogus`), + // Headers: testHeaders, + // Redelivered: true, + // } + // + // mockDeliveries := make(chan rabbitmq.Delivery, 1) + // mockDeliveries <- invalidDelivery + // close(mockDeliveries) + // + // mockRmqChan.On("Consume", inputQueue, mock.Anything, false, false, false, false, + // rabbitmq.Table{}).Return(mockDeliveries, nil) + // mockRmqChan.On("Publish", "", outputQueue, publishMandatory, publishImmediate, + // invalidErrorPublishing).Return(nil) + // mockRmqChan.On("Reject", uint64(0), false).Return(nil) + // + // setupErr := testServer.serve(mockConn) + // + // assert.NoError(t, setupErr) + // + // mockRmqChan.AssertExpectations(t) + //}) + // + //t.Run("createAndPublishErrorResponse", func(t *testing.T) { + // mockRmqConn := &mockConnection{} + // mockRmqChan := &mockChannel{} + // mockConn := &connection{ + // conn: mockRmqConn, + // channel: mockRmqChan, + // } + // + // testDelivery := rabbitmq.Delivery{ + // Acknowledger: mockRmqChan, + // ContentType: rest.ContentTypeJSON, + // Body: []byte(`{ "data": { "ndarray": [[1,2,3,4]] } }`), + // Headers: testHeaders, + // } + // + // publisher := &publisher{*mockConn, outputQueue} + // + // // valid payload + // error1Text := "error 1" + // error1 := errors.New(error1Text) + // generatedErrorPublishing1 := rabbitmq.Publishing{ + // ContentType: "application/json", + // Body: []byte(fmt.Sprintf( + // `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, + // error1Text, + // testPuid, + // )), + // DeliveryMode: rabbitmq.Persistent, + // Headers: testHeaders, + // } + // mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing1).Return(nil) + // pl1, _ := DeliveryToPayload(testDelivery) + // consumerError1 := ConsumerError{ + // setupErr: error1, + // delivery: testDelivery, + // pl: pl1, + // } + // err1 := testServer.createAndPublishErrorResponse(consumerError1, publisher) + // assert.NoError(t, err1) + // + // mockRmqChan.AssertExpectations(t) + // + // // no payload + // error2Text := "error 2" + // error2 := errors.New(error2Text) + // generatedErrorPublishing2 := rabbitmq.Publishing{ + // ContentType: "application/json", + // DeliveryMode: rabbitmq.Persistent, + // Body: []byte(fmt.Sprintf( + // `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, + // error2Text, + // testPuid, + // )), + // Headers: testHeaders, + // } + // mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing2).Return(nil) + // consumerError2 := ConsumerError{ + // setupErr: error2, + // delivery: testDelivery, + // } + // err2 := testServer.createAndPublishErrorResponse(consumerError2, publisher) + // assert.NoError(t, err2) + // + // mockRmqChan.AssertExpectations(t) + //}) } diff --git a/executor/api/rabbitmq/types.go b/executor/api/rabbitmq/types.go index 32348061f0..885b67b390 100644 --- a/executor/api/rabbitmq/types.go +++ b/executor/api/rabbitmq/types.go @@ -1,62 +1,36 @@ package rabbitmq import ( - "fmt" "github.com/seldonio/seldon-core/executor/api/payload" + "github.com/wagslane/go-rabbitmq" "io" - - amqp "github.com/rabbitmq/amqp091-go" ) -/* - * adapted from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/amqp_test.go - */ - -// default implementation leverages the real "streadway/amqp" dialer -var defaultDialerAdapter DialerAdapter = func(url string) (Connection, error) { - conn, err := amqp.Dial(url) - if err != nil { - return nil, fmt.Errorf("error %w dialing rabbitmq at url %v", err, url) - } - - return ConnectionAdapter{conn}, nil -} - -// DialerAdapter is a function that returns a handle to a Connection type. -type DialerAdapter func(url string) (Connection, error) +// wrapper around `rabbitmq.Conn` +type ConnectionWrapper interface { + io.Closer -// ConnectionAdapter adapts the amqp.Connection type so that it adheres to our libraries interfaces. -type ConnectionAdapter struct { - *amqp.Connection + NewPublisher() (PublisherWrapper, error) + NewConsumer(handler rabbitmq.Handler, queue string, consumerTag string) (ConsumerWrapper, error) } -// Channel adapts an amqp.Channel to our Channel interface. -func (c ConnectionAdapter) Channel() (Channel, error) { - return c.Connection.Channel() +type ConsumerWrapper interface { + Close() } -// Connection defines the AMQP connections operations required by this library. -type Connection interface { - io.Closer - - Channel() (Channel, error) - NotifyClose(receiver chan *amqp.Error) chan *amqp.Error +type PublisherWrapper interface { + Close() + Publish( + data []byte, + routingKeys []string, + optionFuncs ...func(*rabbitmq.PublishOptions), + ) error } -// Channel defines the AMQP channel operations required by this library. -type Channel interface { - QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) ( - amqp.Queue, error, - ) - Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error - Consume( - name string, consumerTag string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table, - ) (<-chan amqp.Delivery, error) - Ack(tag uint64, multiple bool) error - Nack(tag uint64, multiple bool, requeue bool) error - Reject(tag uint64, requeue bool) error - Qos(prefetchCount int, prefetchSize int, global bool) error - NotifyClose(receiver chan *amqp.Error) chan *amqp.Error +type ConsumerError struct { + err error + delivery rabbitmq.Delivery + pl *SeldonPayloadWithHeaders // might be nil } type SeldonPayloadWithHeaders struct { diff --git a/executor/api/rabbitmq/utils.go b/executor/api/rabbitmq/utils.go index 960e74fb64..9c5c6aea09 100644 --- a/executor/api/rabbitmq/utils.go +++ b/executor/api/rabbitmq/utils.go @@ -8,6 +8,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/rest" + rabbitmq "github.com/wagslane/go-rabbitmq" ) func TableToStringMap(t amqp.Table) map[string][]string { @@ -18,7 +19,7 @@ func TableToStringMap(t amqp.Table) map[string][]string { return stringMap } -func StringMapToTable(m map[string][]string) amqp.Table { +func StringMapToTable(m map[string][]string) rabbitmq.Table { table := make(map[string]interface{}) for key, values := range m { // just take the first value, at least for now @@ -27,7 +28,7 @@ func StringMapToTable(m map[string][]string) amqp.Table { return table } -func DeliveryToPayload(delivery amqp.Delivery) (*SeldonPayloadWithHeaders, error) { +func DeliveryToPayload(delivery rabbitmq.Delivery) (*SeldonPayloadWithHeaders, error) { var pl *SeldonPayloadWithHeaders = nil var err error = nil diff --git a/executor/api/rabbitmq/utils_test.go b/executor/api/rabbitmq/utils_test.go index 8f43ede6cb..eb934d26e4 100644 --- a/executor/api/rabbitmq/utils_test.go +++ b/executor/api/rabbitmq/utils_test.go @@ -8,6 +8,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/rest" "github.com/stretchr/testify/assert" + "github.com/wagslane/go-rabbitmq" "google.golang.org/protobuf/types/known/structpb" "testing" @@ -26,7 +27,7 @@ func TestStringMapTableFunctions(t *testing.T) { "key1": {"value1", "value2"}, "key2": {"45"}, } - derivedTable1 := amqp.Table{ + derivedTable1 := rabbitmq.Table{ "key1": "value1", "key2": "45", } @@ -44,10 +45,12 @@ func TestStringMapTableFunctions(t *testing.T) { func TestDeliveryToPayload(t *testing.T) { bytesBody := []byte(`{"status":{"status":0},"strData":"\"hello\""}`) - testDeliveryRest := amqp.Delivery{ - Body: bytesBody, - ContentType: rest.ContentTypeJSON, - ContentEncoding: "", + testDeliveryRest := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + Body: bytesBody, + ContentType: rest.ContentTypeJSON, + ContentEncoding: "", + }, } protoMessage := &proto.SeldonMessage{ Status: &proto.Status{ @@ -60,10 +63,12 @@ func TestDeliveryToPayload(t *testing.T) { } protoMessageEnc, _ := proto2.Marshal(protoMessage) protoMessage.XXX_sizecache = 0 // to make test cases match - testDeliveryProto := amqp.Delivery{ - Body: protoMessageEnc, - ContentType: payload.APPLICATION_TYPE_PROTOBUF, - ContentEncoding: "", + testDeliveryProto := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + Body: protoMessageEnc, + ContentType: payload.APPLICATION_TYPE_PROTOBUF, + ContentEncoding: "", + }, } t.Run("proto payload", func(t *testing.T) { diff --git a/executor/go.mod b/executor/go.mod index ef7ea8434a..40640a3465 100644 --- a/executor/go.mod +++ b/executor/go.mod @@ -16,7 +16,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/prometheus/common v0.34.0 - github.com/rabbitmq/amqp091-go v1.3.4 + github.com/rabbitmq/amqp091-go v1.5.0 github.com/seldonio/seldon-core/operator v0.0.0-00010101000000-000000000000 github.com/tensorflow/tensorflow/tensorflow/go/core v0.0.0-00010101000000-000000000000 github.com/uber/jaeger-client-go v2.25.0+incompatible @@ -40,6 +40,7 @@ require ( github.com/go-logr/zapr v1.2.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/mock v1.6.0 // indirect github.com/google/go-cmp v0.5.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect @@ -57,15 +58,18 @@ require ( github.com/stretchr/objx v0.4.0 // indirect github.com/stretchr/testify v1.8.0 // indirect github.com/uber/jaeger-lib v2.2.0+incompatible // indirect + github.com/wagslane/go-rabbitmq v0.12.1 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect + golang.org/x/mod v0.5.1 // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect + golang.org/x/tools v0.1.8 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect diff --git a/executor/go.sum b/executor/go.sum index 91e63b00f2..907c776fbb 100644 --- a/executor/go.sum +++ b/executor/go.sum @@ -714,6 +714,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1418,6 +1419,8 @@ github.com/quasilyte/regex/syntax v0.0.0-20200407221936-30656e2c4a95/go.mod h1:r github.com/quobyte/api v0.1.8/go.mod h1:jL7lIHrmqQ7yh05OJ+eEEdHr0u/kmT1Ff9iHd+4H6VI= github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU= github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= @@ -1616,6 +1619,8 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17 github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vmware/govmomi v0.20.3/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= github.com/wagslane/go-password-validator v0.3.0/go.mod h1:TI1XJ6T5fRdRnHqHt14pvy1tNVnrwe7m3/f1f2fDphQ= +github.com/wagslane/go-rabbitmq v0.12.1 h1:A3ec8wmP3hr1SKsbXwFaf42xXd5D7yAeJfdFZJydKlU= +github.com/wagslane/go-rabbitmq v0.12.1/go.mod h1:jTSN7opv/tmphx0MYaRR/++HQCuhxrBZEyTd0xCym2c= github.com/whilp/git-urls v0.0.0-20191001220047-6db9661140c0/go.mod h1:2rx5KE5FLD0HRfkkpyn8JwbVLBdhgeiOb2D2D9LLKM4= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= From 5a3c9c43e191f71b823cb3bb854dcf0c678f9e28 Mon Sep 17 00:00:00 2001 From: David McWhorter Date: Wed, 22 Feb 2023 11:02:54 -0500 Subject: [PATCH 2/5] unit tests are working with wagslane client --- executor/api/rabbitmq/consumer_test.go | 174 --------- executor/api/rabbitmq/publisher_test.go | 159 --------- executor/api/rabbitmq/rabbitmq_client.go | 24 +- executor/api/rabbitmq/rabbitmq_test.go | 7 +- executor/api/rabbitmq/server.go | 15 +- executor/api/rabbitmq/server_test.go | 434 +++++++++++++++-------- executor/api/rabbitmq/types.go | 7 +- executor/api/rabbitmq/utils_test.go | 6 +- 8 files changed, 315 insertions(+), 511 deletions(-) delete mode 100644 executor/api/rabbitmq/consumer_test.go delete mode 100644 executor/api/rabbitmq/publisher_test.go diff --git a/executor/api/rabbitmq/consumer_test.go b/executor/api/rabbitmq/consumer_test.go deleted file mode 100644 index c2b9edaa2e..0000000000 --- a/executor/api/rabbitmq/consumer_test.go +++ /dev/null @@ -1,174 +0,0 @@ -package rabbitmq - -// -//import ( -// "errors" -// "github.com/go-logr/logr/testr" -// proto2 "github.com/golang/protobuf/proto" -// amqp "github.com/rabbitmq/amqp091-go" -// "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" -// "github.com/seldonio/seldon-core/executor/api/payload" -// "github.com/seldonio/seldon-core/executor/api/rest" -// "github.com/stretchr/testify/assert" -// "testing" -//) -// -///* -// * based on patterns from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go -// */ -// -//func TestConsume(t *testing.T) { -// log := testr.New(t) -// testMessageStr := `"hello"` -// seldonMessage := proto.SeldonMessage{ -// Status: &proto.Status{ -// Status: proto.Status_SUCCESS, -// }, -// Meta: nil, -// DataOneof: &proto.SeldonMessage_StrData{ -// StrData: testMessageStr, -// }, -// } -// seldonMessageEnc, _ := proto2.Marshal(&seldonMessage) -// seldonMessage.XXX_sizecache = 0 // to make test cases match -// -// t.Run("success", func(t *testing.T) { -// mockChan := &mockChannel{} -// -// mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer -// mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) -// close(mockDeliveries) -// -// mockChan.On("Consume", queueName, consumerTag, false, false, false, false, -// amqp.Table{}).Return(mockDeliveries, -// nil) -// mockChan.On("Ack", uint64(0), false).Return(nil) -// -// cons := &consumer{ -// connection: connection{ -// log: log, -// channel: mockChan, -// }, -// queueName: queueName, -// consumerTag: consumerTag, -// } -// -// payloadHandler := func(pl *SeldonPayloadWithHeaders) error { -// assert.Equal( -// t, -// &SeldonPayloadWithHeaders{ -// &payload.BytesPayload{ -// Msg: []byte(`"hello"`), -// ContentType: rest.ContentTypeJSON, -// ContentEncoding: "", -// }, -// make(map[string][]string), -// }, -// pl, -// "payloads not equal", -// ) -// return nil -// } -// -// errorHandler := func(err ConsumerError) error { -// assert.NoError(t, err.err, "unexpected error") -// return nil -// } -// -// err := cons.Consume(payloadHandler, errorHandler) -// -// assert.NoError(t, err) -// mockChan.AssertExpectations(t) -// }) -// -// t.Run("on failed message don't retry, but reject method", func(t *testing.T) { -// mockChan := &mockChannel{} -// -// mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer -// mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) -// close(mockDeliveries) -// -// mockChan.On("Consume", queueName, consumerTag, false, false, false, false, -// amqp.Table{}).Return(mockDeliveries, -// nil) -// mockChan.On("Reject", uint64(0), false).Return(nil) -// -// cons := &consumer{ -// connection: connection{ -// log: log, -// channel: mockChan, -// }, -// queueName: queueName, -// consumerTag: consumerTag, -// } -// -// payloadHandler := func(pl *SeldonPayloadWithHeaders) error { -// return errors.New("Something bad happened during prediction") -// } -// -// errorHandler := func(err ConsumerError) error { -// return nil -// } -// -// err := cons.Consume(payloadHandler, errorHandler) -// -// assert.NoError(t, err) -// mockChan.AssertExpectations(t) -// }) -// -// t.Run("encoded seldon msg", func(t *testing.T) { -// mockChan := &mockChannel{} -// -// mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer -// mockDeliveries <- createTestDelivery(mockChan, seldonMessageEnc, payload.APPLICATION_TYPE_PROTOBUF) -// close(mockDeliveries) -// -// mockChan.On("Consume", queueName, consumerTag, false, false, false, false, -// amqp.Table{}).Return(mockDeliveries, -// nil) -// mockChan.On("Ack", uint64(0), false).Return(nil) -// -// cons := &consumer{ -// connection: connection{ -// log: log, -// channel: mockChan, -// }, -// queueName: queueName, -// consumerTag: consumerTag, -// } -// -// payloadHandler := func(pl *SeldonPayloadWithHeaders) error { -// assert.Equal( -// t, -// &SeldonPayloadWithHeaders{ -// &payload.ProtoPayload{ -// Msg: &seldonMessage, -// }, -// make(map[string][]string), -// }, -// pl, -// "payloads not equal", -// ) -// return nil -// } -// -// errorHandler := func(err ConsumerError) error { -// assert.NoError(t, err.err, "unexpected error") -// return nil -// } -// -// err := cons.Consume(payloadHandler, errorHandler) -// -// assert.NoError(t, err) -// mockChan.AssertExpectations(t) -// }) -//} -// -//func createTestDelivery(ack amqp.Acknowledger, body []byte, contentType string) amqp.Delivery { -// return amqp.Delivery{ -// Acknowledger: ack, -// Body: body, -// ContentType: contentType, -// ContentEncoding: "", -// } -//} diff --git a/executor/api/rabbitmq/publisher_test.go b/executor/api/rabbitmq/publisher_test.go deleted file mode 100644 index dc6e8ef377..0000000000 --- a/executor/api/rabbitmq/publisher_test.go +++ /dev/null @@ -1,159 +0,0 @@ -package rabbitmq - -// -//import ( -// "errors" -// "github.com/go-logr/logr/testr" -// amqp "github.com/rabbitmq/amqp091-go" -// "github.com/stretchr/testify/assert" -// "testing" -//) -// -///* -// * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go -// */ -// -//func TestPublisher(t *testing.T) { -// log := testr.New(t) -// testMessage := SeldonPayloadWithHeaders{ -// &TestPayload{Msg: `"hello"`}, -// make(map[string][]string), -// } -// -// t.Run("success", func(t *testing.T) { -// mockChan := &mockChannel{} -// -// msg := amqp.Publishing{ -// Headers: make(map[string]interface{}), -// ContentType: "application/json", -// DeliveryMode: amqp.Persistent, -// Body: []byte(`"hello"`), -// } -// mockChan.On("Publish", "", queueName, true, false, msg).Return(nil) -// -// pub := &publisher{ -// connection: connection{ -// log: log, -// channel: mockChan, -// }, -// queueName: queueName, -// } -// -// assert.NoError(t, pub.Publish(testMessage)) -// -// mockChan.AssertExpectations(t) -// }) -// -// t.Run("publish_failure", func(t *testing.T) { -// mockChan := &mockChannel{} -// -// mockChan.On("Publish", "", queueName, true, false, amqp.Publishing{ -// Headers: make(map[string]interface{}), -// ContentType: "application/json", -// DeliveryMode: amqp.Persistent, -// Body: []byte(`"hello"`), -// }).Return(errors.New("test error")) -// -// pub := &publisher{ -// connection: connection{ -// log: log, -// channel: mockChan, -// }, -// queueName: queueName, -// } -// -// assert.ErrorContains(t, pub.Publish(testMessage), "test error") -// -// mockChan.AssertExpectations(t) -// }) -// -// t.Run("connection_closed", func(t *testing.T) { -// f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { -// channel.On("Qos", 1, 0, true).Return(nil) -// channel.On("Publish", "", queueName, true, false, amqp.Publishing{ -// Headers: make(map[string]interface{}), -// ContentType: "application/json", -// DeliveryMode: amqp.Persistent, -// Body: []byte(`"hello"`), -// }).Return(nil) -// -// conn.On("Channel").Return(channel, nil) -// adapter.On("Dial", uri).Return(conn, nil) -// }) -// defer reset() -// -// pub := &publisher{ -// connection: connection{ -// uri: uri, -// log: log, -// err: make(chan error, 1), -// }, -// queueName: queueName, -// } -// -// pub.err <- errors.New("dang, conn be broke") -// -// assert.NoError(t, pub.Publish(testMessage)) -// -// f.adapter.AssertExpectations(t) -// f.connection.AssertExpectations(t) -// f.channel.AssertExpectations(t) -// }) -// -// t.Run("connection_closed_retry", func(t *testing.T) { -// f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { -// channel.On("Qos", 1, 0, true).Return(nil) -// channel.On("Publish", "", queueName, true, false, amqp.Publishing{ -// Headers: make(map[string]interface{}), -// ContentType: "application/json", -// DeliveryMode: amqp.Persistent, -// Body: []byte(`"hello"`), -// }).Return(nil) -// -// conn.On("Channel").Return(channel, nil) -// -// adapter.On("Dial", uri).Return(nil, errors.New("test dial error")).Once() -// adapter.On("Dial", uri).Return(conn, nil).Once() -// }) -// defer reset() -// -// pub := &publisher{ -// connection: connection{ -// uri: uri, -// log: log, -// err: make(chan error, 1), -// }, -// queueName: queueName, -// } -// -// pub.err <- errors.New("dang, conn be broke") -// -// assert.NoError(t, pub.Publish(testMessage)) -// -// f.adapter.AssertExpectations(t) -// f.connection.AssertExpectations(t) -// f.channel.AssertExpectations(t) -// }) -// -// t.Run("connection_closed_retry_failure", func(t *testing.T) { -// f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { -// adapter.On("Dial", uri).Return(nil, errors.New("test dial error")) -// }) -// defer reset() -// -// pub := &publisher{ -// connection: connection{ -// uri: uri, -// log: log, -// err: make(chan error, 1), -// }, -// queueName: queueName, -// } -// -// pub.err <- errors.New("dang, conn be broke") -// -// assert.Error(t, pub.Publish(testMessage)) -// -// f.adapter.AssertExpectations(t) -// }) -//} diff --git a/executor/api/rabbitmq/rabbitmq_client.go b/executor/api/rabbitmq/rabbitmq_client.go index 1cee783886..7d5c2db316 100644 --- a/executor/api/rabbitmq/rabbitmq_client.go +++ b/executor/api/rabbitmq/rabbitmq_client.go @@ -22,6 +22,7 @@ type RabbitMqConsumer struct { type RabbitMqPublisher struct { rabbitmq.Publisher + log logr.Logger } func createRabbitMQConnection(brokerUrl string, log logr.Logger) (ConnectionWrapper, error) { @@ -47,7 +48,7 @@ func (c *RabbitMqConnection) NewPublisher() (PublisherWrapper, error) { c.log.Error(err, "error creating publisher") return nil, fmt.Errorf("error '%w' creating publisher", err) } - return &RabbitMqPublisher{*publisher}, nil + return &RabbitMqPublisher{*publisher, c.log}, nil } func (c *RabbitMqConnection) NewConsumer(handler rabbitmq.Handler, queue string, consumerTag string) (ConsumerWrapper, error) { @@ -74,28 +75,19 @@ func (p *RabbitMqPublisher) Close() { } func (p *RabbitMqPublisher) Publish( - data []byte, - routingKeys []string, - optionFuncs ...func(*rabbitmq.PublishOptions), -) error { - return p.Publisher.Publish(data, routingKeys, optionFuncs...) -} - -func DoPublish( - publisher PublisherWrapper, - payload SeldonPayloadWithHeaders, + payloadWithHeaders SeldonPayloadWithHeaders, queueName string, - log logr.Logger, ) error { + payload := payloadWithHeaders.Payload body, err := payload.GetBytes() if err != nil { - log.Error(err, "error retrieving payload bytes") + p.log.Error(err, "error retrieving payload bytes") return fmt.Errorf("error '%w' retrieving payload bytes", err) } options := []func(options *rabbitmq.PublishOptions){ - rabbitmq.WithPublishOptionsHeaders(StringMapToTable(payload.Headers)), + rabbitmq.WithPublishOptionsHeaders(StringMapToTable(payloadWithHeaders.Headers)), rabbitmq.WithPublishOptionsContentType(payload.GetContentType()), rabbitmq.WithPublishOptionsContentEncoding(""), rabbitmq.WithPublishOptionsPersistentDelivery, @@ -107,13 +99,13 @@ func DoPublish( options = append(options, rabbitmq.WithPublishOptionsImmediate) } - err = publisher.Publish( + err = p.Publisher.Publish( body, []string{queueName}, options..., ) if err != nil { - log.Error(err, "error publishing rabbitmq message") + p.log.Error(err, "error publishing rabbitmq message") return fmt.Errorf("error '%w' publishing rabbitmq message", err) } diff --git a/executor/api/rabbitmq/rabbitmq_test.go b/executor/api/rabbitmq/rabbitmq_test.go index 2158589e95..9054561b01 100644 --- a/executor/api/rabbitmq/rabbitmq_test.go +++ b/executor/api/rabbitmq/rabbitmq_test.go @@ -55,11 +55,10 @@ func (m *MockPublisherWrapper) Close() { } func (m *MockPublisherWrapper) Publish( - data []byte, - routingKeys []string, - optionFuncs ...func(*rabbitmq.PublishOptions), + payload SeldonPayloadWithHeaders, + queueName string, ) error { - returnArgs := m.Called(data, routingKeys, optionFuncs) + returnArgs := m.Called(payload, queueName) return returnArgs.Error(0) } diff --git a/executor/api/rabbitmq/server.go b/executor/api/rabbitmq/server.go index 56fec75083..a0a15c48e5 100644 --- a/executor/api/rabbitmq/server.go +++ b/executor/api/rabbitmq/server.go @@ -238,7 +238,7 @@ func (rs *SeldonRabbitMQServer) predictAndPublishResponse( seldonPredictorProcess := pred.NewPredictorProcess( ctx, rs.Client, rs.Log.WithName("RabbitMqClient"), &rs.ServerUrl, rs.Namespace, reqPayload.Headers, "") - resPayload, err := seldonPredictorProcess.Predict(&rs.Predictor.Graph, reqPayload) + resPayload, err := seldonPredictorProcess.Predict(&rs.Predictor.Graph, reqPayload.Payload) if err != nil && resPayload == nil { // normal errors from the predict process contain a status failed payload // this is handling an unexpected case, so failing entirely, at least for now @@ -278,7 +278,7 @@ func (rs *SeldonRabbitMQServer) predictAndPublishResponse( } } - updatedPayload, err := UpdatePayloadWithPuid(reqPayload, resPayload) + updatedPayload, err := UpdatePayloadWithPuid(reqPayload.Payload, resPayload) if err != nil { rs.Log.Error(err, UNHANDLED_ERROR) return fmt.Errorf("unhandled error %w from predictor process", err) @@ -328,16 +328,17 @@ func (rs *SeldonRabbitMQServer) createAndPublishErrorResponse(errorArgs Consumer return rs.publishPayload(publisher, resPayload, seldonPuid) } -func (rs *SeldonRabbitMQServer) publishPayload(publisher PublisherWrapper, pl payload.SeldonPayload, seldonPuid string) error { +func addPuidHeader(pl payload.SeldonPayload, seldonPuid string) SeldonPayloadWithHeaders { resHeaders := map[string][]string{payload.SeldonPUIDHeader: {seldonPuid}} - //TODO might need more headers - - resPayloadWithHeaders := SeldonPayloadWithHeaders{ + return SeldonPayloadWithHeaders{ pl, resHeaders, } +} - return DoPublish(publisher, resPayloadWithHeaders, rs.OutputQueueName, rs.Log) +func (rs *SeldonRabbitMQServer) publishPayload(publisher PublisherWrapper, pl payload.SeldonPayload, seldonPuid string) error { + plWithHeaders := addPuidHeader(pl, seldonPuid) + return publisher.Publish(plWithHeaders, rs.OutputQueueName) } func assignAndReturnPUID(pl *SeldonPayloadWithHeaders, delivery *rabbitmq.Delivery) string { diff --git a/executor/api/rabbitmq/server_test.go b/executor/api/rabbitmq/server_test.go index 95bdebd72c..3263b73f10 100644 --- a/executor/api/rabbitmq/server_test.go +++ b/executor/api/rabbitmq/server_test.go @@ -1,14 +1,22 @@ package rabbitmq import ( + "bytes" + "fmt" "github.com/go-logr/logr/testr" + "github.com/golang/protobuf/jsonpb" + guuid "github.com/google/uuid" . "github.com/onsi/gomega" + amqp "github.com/rabbitmq/amqp091-go" "github.com/seldonio/seldon-core/executor/api" + "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" "github.com/seldonio/seldon-core/executor/api/payload" + "github.com/seldonio/seldon-core/executor/api/rest" "github.com/seldonio/seldon-core/executor/api/test" v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/wagslane/go-rabbitmq" "io/ioutil" "net/http" "net/http/httptest" @@ -80,19 +88,82 @@ func TestRabbitMqServer(t *testing.T) { FullHealthCheck: fullHealthCheck, } - //testPuid := guuid.New().String() - //testHeaders := map[string]interface{}{payload.SeldonPUIDHeader: testPuid} - // - //invalidErrorJsonResponse := fmt.Sprintf( - // `{"status":{"info":"Prediction Failed","reason":"unknown payload type 'bogus'","status":"FAILURE"},"meta":{"puid":"%v"}}`, - // testPuid, - //) - //invalidErrorPublishing := rabbitmq.Publishing{ - // ContentType: "application/json", - // DeliveryMode: rabbitmq.Persistent, - // Body: []byte(invalidErrorJsonResponse), - // Headers: testHeaders, - //} + testPuid := guuid.New().String() + testHeaders := map[string]interface{}{payload.SeldonPUIDHeader: testPuid} + + // test predictor process returns the request payload as the response + validJson := fmt.Sprintf( + `{"meta":{"puid":"%v"},"jsonData":{"start":1,"stop":10}}`, + testPuid, + ) + validRequestDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: "application/json", + DeliveryMode: rabbitmq.Persistent, + Body: []byte(validJson), + Headers: testHeaders, + }, + } + validPayloadResponseWithHeaders := SeldonPayloadWithHeaders{ + &payload.BytesPayload{ + Msg: []byte(validJson), + ContentType: "application/json", + }, + TableToStringMap(testHeaders), + } + + // test predictor process returns the request payload as the response + validJsonNoPuid := `{"jsonData":{"start":1,"stop":10}}` + validRequestNoPuidDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: "application/json", + DeliveryMode: rabbitmq.Persistent, + Body: []byte(validJsonNoPuid), + Headers: map[string]interface{}{}, + }, + } + validPayloadResponseNoPuid := + &payload.BytesPayload{ + Msg: []byte(validJsonNoPuid), + ContentType: "application/json", + } + + invalidContentType := "bogus" + invalidDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: invalidContentType, + Body: []byte(`bogus`), + Headers: testHeaders, + }, + } + invalidJsonResponse := fmt.Sprintf( + `{"status":{"info":"Prediction Failed","reason":"unknown payload type '%v'","status":"FAILURE"}}`, + invalidContentType, + ) + invalidPayloadResponse := + &payload.BytesPayload{ + Msg: []byte(invalidJsonResponse), + ContentType: "application/json", + } + + // test predictor process returns the request payload as the response + errorJson := fmt.Sprintf( + `{"status":{"info":"Prediction Failed","reason":"unknown payload type 'bogus'","status":"FAILURE"},"meta":{"puid":"%v"}}`, + testPuid, + ) + errorPayloadResponse := + &payload.BytesPayload{ + Msg: []byte(errorJson), + ContentType: "application/json", + } + errorDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: "application/json", + DeliveryMode: rabbitmq.Persistent, + Body: []byte(errorJson), + Headers: testHeaders, + }, + } t.Run("create server", func(t *testing.T) { server, err := CreateRabbitMQServer(RabbitMQServerOptions{ @@ -162,136 +233,211 @@ func TestRabbitMqServer(t *testing.T) { assert.True(t, testConsumer.isClosed) }) - ///* - // * This makes sure the serve() code runs and makes the proper calls by setting up mocks. - // * It is not doing anything to validate the messages are properly processed. That's challenging in a - // * unit test since the code connects to RabbitMQ. - // */ - //t.Run("handleMessage", func(t *testing.T) { - // mockRmqConn := &MockConnectionWrapper{} - // - // mockPublisher := &MockPublisherWrapper{} - // mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) - // - // mockConsumer := &TestConsumerWrapper{} - // mockRmqConn.On( - // "NewConsumer", - // mock.Anything, - // inputQueue, - // mock.Anything, - // ).Return(mockConsumer, nil) - // - // _, termChan, err := testServer.serve(mockRmqConn) - // - // termChan <- true - // - // assert.NoError(t, err) - // - // mockRmqConn.AssertExpectations(t) - // mockPublisher.AssertExpectations(t) - //}) - // - ///* - //* This makes sure the Serve(), predictAndPublishResponse(), and createAndPublishErrorResponse() code runs and - //* makes the proper calls and returns an appropriate error message by hacking a bunch of mocks. - // */ - //t.Run("serveError", func(t *testing.T) { - // mockRmqConn := &mockConnection{} - // mockRmqChan := &mockChannel{} - // mockConn := &connection{ - // conn: mockRmqConn, - // channel: mockRmqChan, - // } - // - // invalidDelivery := rabbitmq.Delivery{ - // Acknowledger: mockRmqChan, - // ContentType: "bogus", - // Body: []byte(`bogus`), - // Headers: testHeaders, - // Redelivered: true, - // } - // - // mockDeliveries := make(chan rabbitmq.Delivery, 1) - // mockDeliveries <- invalidDelivery - // close(mockDeliveries) - // - // mockRmqChan.On("Consume", inputQueue, mock.Anything, false, false, false, false, - // rabbitmq.Table{}).Return(mockDeliveries, nil) - // mockRmqChan.On("Publish", "", outputQueue, publishMandatory, publishImmediate, - // invalidErrorPublishing).Return(nil) - // mockRmqChan.On("Reject", uint64(0), false).Return(nil) - // - // setupErr := testServer.serve(mockConn) - // - // assert.NoError(t, setupErr) - // - // mockRmqChan.AssertExpectations(t) - //}) - // - //t.Run("createAndPublishErrorResponse", func(t *testing.T) { - // mockRmqConn := &mockConnection{} - // mockRmqChan := &mockChannel{} - // mockConn := &connection{ - // conn: mockRmqConn, - // channel: mockRmqChan, - // } - // - // testDelivery := rabbitmq.Delivery{ - // Acknowledger: mockRmqChan, - // ContentType: rest.ContentTypeJSON, - // Body: []byte(`{ "data": { "ndarray": [[1,2,3,4]] } }`), - // Headers: testHeaders, - // } - // - // publisher := &publisher{*mockConn, outputQueue} - // - // // valid payload - // error1Text := "error 1" - // error1 := errors.New(error1Text) - // generatedErrorPublishing1 := rabbitmq.Publishing{ - // ContentType: "application/json", - // Body: []byte(fmt.Sprintf( - // `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, - // error1Text, - // testPuid, - // )), - // DeliveryMode: rabbitmq.Persistent, - // Headers: testHeaders, - // } - // mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing1).Return(nil) - // pl1, _ := DeliveryToPayload(testDelivery) - // consumerError1 := ConsumerError{ - // setupErr: error1, - // delivery: testDelivery, - // pl: pl1, - // } - // err1 := testServer.createAndPublishErrorResponse(consumerError1, publisher) - // assert.NoError(t, err1) - // - // mockRmqChan.AssertExpectations(t) - // - // // no payload - // error2Text := "error 2" - // error2 := errors.New(error2Text) - // generatedErrorPublishing2 := rabbitmq.Publishing{ - // ContentType: "application/json", - // DeliveryMode: rabbitmq.Persistent, - // Body: []byte(fmt.Sprintf( - // `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, - // error2Text, - // testPuid, - // )), - // Headers: testHeaders, - // } - // mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing2).Return(nil) - // consumerError2 := ConsumerError{ - // setupErr: error2, - // delivery: testDelivery, - // } - // err2 := testServer.createAndPublishErrorResponse(consumerError2, publisher) - // assert.NoError(t, err2) - // - // mockRmqChan.AssertExpectations(t) - //}) + t.Run("process valid request", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) + + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") + + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) + + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) + + mockPublisher.On("Publish", validPayloadResponseWithHeaders, outputQueue).Return(nil) + action := testConsumer.SimulateDelivery(validRequestDelivery) + assert.Equal(t, rabbitmq.Ack, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") + + assert.NoError(t, err) + + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) + }) + + t.Run("process valid request missing puid", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) + + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") + + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) + + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) + + mockPublisher.On( + "Publish", + mock.MatchedBy(matchingSeldonPayloadsWithPuid(t, validPayloadResponseNoPuid, false)), + outputQueue, + ).Return(nil) + action := testConsumer.SimulateDelivery(validRequestNoPuidDelivery) + assert.Equal(t, rabbitmq.Ack, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") + + assert.NoError(t, err) + + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) + }) + + t.Run("process invalid request", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) + + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") + + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) + + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) + + // test that payload is expected and Puid is added + mockPublisher.On( + "Publish", + mock.MatchedBy(matchingSeldonPayloadsWithPuid(t, invalidPayloadResponse, true)), + outputQueue, + ).Return(nil) + action := testConsumer.SimulateDelivery(invalidDelivery) + assert.Equal(t, rabbitmq.NackDiscard, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") + + assert.NoError(t, err) + + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) + }) + + t.Run("process error response", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) + + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") + + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) + + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) + + mockPublisher.On( + "Publish", + mock.MatchedBy(matchingSeldonPayloadsWithPuid(t, errorPayloadResponse, true)), + outputQueue, + ).Return(nil) + action := testConsumer.SimulateDelivery(errorDelivery) + assert.Equal(t, rabbitmq.Ack, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") + + assert.NoError(t, err) + + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) + }) +} + +func addPuid(pl payload.SeldonPayload, puid string) (payload.SeldonPayload, error) { + switch pl.GetContentType() { + case rest.ContentTypeJSON: + requestBody := &proto.SeldonMessage{} + err := jsonpb.UnmarshalString(string(pl.GetPayload().([]byte)), requestBody) + if err != nil { + return nil, err + } + requestBody.Meta = &proto.Meta{ + Puid: puid, + } + ma := jsonpb.Marshaler{} + marshaled, err := ma.MarshalToString(requestBody) + if err != nil { + return nil, err + } + return &payload.BytesPayload{ + Msg: []byte(marshaled), + ContentType: rest.ContentTypeJSON, + }, nil + default: + return nil, fmt.Errorf("unsupported content type '%v'", pl.GetContentType()) + } +} + +func samePayloads(pl1 payload.SeldonPayload, pl2 payload.SeldonPayload) bool { + providedBytes, _ := pl1.GetBytes() + referenceBytes, _ := pl2.GetBytes() + return bytes.Equal(providedBytes, referenceBytes) && + pl1.GetContentType() == pl2.GetContentType() && + pl1.GetContentEncoding() == pl2.GetContentEncoding() +} +func matchingSeldonPayloadsWithPuid( + t *testing.T, + reference payload.SeldonPayload, + addPuidToPayload bool, +) func(SeldonPayloadWithHeaders) bool { + return func(pl SeldonPayloadWithHeaders) bool { + puidHeader := pl.Headers[payload.SeldonPUIDHeader] + var referencePayloadToUse payload.SeldonPayload + var err error + if addPuidToPayload { + referencePayloadToUse, err = addPuid(reference, puidHeader[0]) + if err != nil { + t.Logf("error adding puid %v", err) + return false + } + } else { + referencePayloadToUse = reference + } + return puidHeader != nil && samePayloads(pl.Payload, referencePayloadToUse) + } } diff --git a/executor/api/rabbitmq/types.go b/executor/api/rabbitmq/types.go index 885b67b390..14cb9a6d63 100644 --- a/executor/api/rabbitmq/types.go +++ b/executor/api/rabbitmq/types.go @@ -21,9 +21,8 @@ type ConsumerWrapper interface { type PublisherWrapper interface { Close() Publish( - data []byte, - routingKeys []string, - optionFuncs ...func(*rabbitmq.PublishOptions), + payload SeldonPayloadWithHeaders, + queueName string, ) error } @@ -34,6 +33,6 @@ type ConsumerError struct { } type SeldonPayloadWithHeaders struct { - payload.SeldonPayload + Payload payload.SeldonPayload Headers map[string][]string } diff --git a/executor/api/rabbitmq/utils_test.go b/executor/api/rabbitmq/utils_test.go index eb934d26e4..07f99b94f4 100644 --- a/executor/api/rabbitmq/utils_test.go +++ b/executor/api/rabbitmq/utils_test.go @@ -75,17 +75,17 @@ func TestDeliveryToPayload(t *testing.T) { pl, err := DeliveryToPayload(testDeliveryProto) assert.NoError(t, err) - assert.Equal(t, protoMessage, pl.GetPayload()) + assert.Equal(t, protoMessage, pl.Payload.GetPayload()) }) t.Run("rest payload", func(t *testing.T) { pl, err := DeliveryToPayload(testDeliveryRest) assert.NoError(t, err) - assert.Equal(t, bytesBody, pl.GetPayload()) + assert.Equal(t, bytesBody, pl.Payload.GetPayload()) body := &proto.SeldonMessage{} - err = jsonpb.UnmarshalString(string(pl.GetPayload().([]byte)), body) + err = jsonpb.UnmarshalString(string(pl.Payload.GetPayload().([]byte)), body) assert.NoError(t, err) assert.Equal(t, protoMessage, body) From 204d37b9039f6fd5c5636fd2bc15291c3e7be5d4 Mon Sep 17 00:00:00 2001 From: David McWhorter Date: Wed, 22 Feb 2023 11:19:25 -0500 Subject: [PATCH 3/5] remove unused mock package --- executor/go.mod | 1 - executor/go.sum | 1 - 2 files changed, 2 deletions(-) diff --git a/executor/go.mod b/executor/go.mod index 40640a3465..c2eaa7cd07 100644 --- a/executor/go.mod +++ b/executor/go.mod @@ -40,7 +40,6 @@ require ( github.com/go-logr/zapr v1.2.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/mock v1.6.0 // indirect github.com/google/go-cmp v0.5.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/googleapis/gnostic v0.5.5 // indirect diff --git a/executor/go.sum b/executor/go.sum index 907c776fbb..817414a864 100644 --- a/executor/go.sum +++ b/executor/go.sum @@ -714,7 +714,6 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= -github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= From 305ec27b93375db2a37b4f74499548e1fb6880e5 Mon Sep 17 00:00:00 2001 From: David McWhorter Date: Thu, 23 Feb 2023 15:47:04 -0500 Subject: [PATCH 4/5] fix licenses --- executor/licenses/additional_license_info.csv | 4 +- executor/licenses/dep.txt | 2 + executor/licenses/license.txt | 84 +++++++++++++++++-- executor/licenses/license_info.csv | 6 +- executor/licenses/repo.txt | 2 + ...elearning.seldon.io_seldondeployments.yaml | 1 - 6 files changed, 89 insertions(+), 10 deletions(-) diff --git a/executor/licenses/additional_license_info.csv b/executor/licenses/additional_license_info.csv index ab8a743fb3..2bc84decdb 100644 --- a/executor/licenses/additional_license_info.csv +++ b/executor/licenses/additional_license_info.csv @@ -106,4 +106,6 @@ https://api.github.com/repos/square/go-jose/license,Apache License 2.0 https://github.com/antlr/antlr4/blob/master/LICENSE.txt,BSD 3-Clause "New" or "Revised" License https://github.com/munnerz/goautoneg/blob/master/LICENSE,BSD 3-Clause "New" or "Revised" License https://github.com/ulikunitz/unixtime/blob/master/LICENSE,BSD 3-Clause "New" or "Revised" License -https://github.com/go-yaml/yaml/blob/v3/LICENSE,MIT License \ No newline at end of file +https://github.com/go-yaml/yaml/blob/v3/LICENSE,MIT License +https://github.com/rabbitmq/amqp091-go/blob/main/LICENSE,BSD 2-Clause "Simplified" License + diff --git a/executor/licenses/dep.txt b/executor/licenses/dep.txt index 157959dd5f..473ed93972 100644 --- a/executor/licenses/dep.txt +++ b/executor/licenses/dep.txt @@ -298,6 +298,7 @@ github.com/prometheus/common github.com/prometheus/procfs github.com/prometheus/statsd_exporter github.com/prometheus/tsdb +github.com/rabbitmq/amqp091-go github.com/rcrowley/go-metrics github.com/robfig/cron/v3 github.com/rogpeppe/fastuuid @@ -337,6 +338,7 @@ github.com/ulikunitz/unixtime github.com/valyala/bytebufferpool github.com/valyala/fasthttp github.com/wagslane/go-password-validator +github.com/wagslane/go-rabbitmq github.com/xdg-go/pbkdf2 github.com/xdg-go/scram github.com/xdg-go/stringprep diff --git a/executor/licenses/license.txt b/executor/licenses/license.txt index 7efb3fc20a..ebf09d3bbe 100644 --- a/executor/licenses/license.txt +++ b/executor/licenses/license.txt @@ -9505,9 +9505,9 @@ SOFTWARE. -------------------------------------------------------------------------------- -go-redis/redis BSD 2-Clause "Simplified" License https://github.com/go-redis/redis/blob/master/LICENSE +go-redis/redis BSD 2-Clause "Simplified" License https://github.com/redis/go-redis/blob/master/LICENSE -------------------------------------------------------------------------------- -Copyright (c) 2013 The github.com/go-redis/redis Authors. +Copyright (c) 2013 The github.com/redis/go-redis Authors. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -14208,6 +14208,8 @@ Exhibit B - “Incompatible With Secondary Licenses” Notice -------------------------------------------------------------------------------- hashicorp/errwrap Mozilla Public License 2.0 https://github.com/hashicorp/errwrap/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2014 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -14566,6 +14568,8 @@ Exhibit B - “Incompatible With Secondary Licenses” Notice -------------------------------------------------------------------------------- hashicorp/go-cleanhttp Mozilla Public License 2.0 https://github.com/hashicorp/go-cleanhttp/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2015 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -14933,9 +14937,7 @@ Exhibit B - "Incompatible With Secondary Licenses" Notice -------------------------------------------------------------------------------- hashicorp/go-hclog MIT License https://github.com/hashicorp/go-hclog/blob/main/LICENSE -------------------------------------------------------------------------------- -MIT License - -Copyright (c) 2017 HashiCorp +Copyright (c) 2017 HashiCorp, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -14958,6 +14960,8 @@ SOFTWARE. -------------------------------------------------------------------------------- hashicorp/go-immutable-radix Mozilla Public License 2.0 https://github.com/hashicorp/go-immutable-radix/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2015 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -15351,6 +15355,8 @@ SOFTWARE. -------------------------------------------------------------------------------- hashicorp/go-multierror Mozilla Public License 2.0 https://github.com/hashicorp/go-multierror/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2014 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -15706,7 +15712,7 @@ Exhibit B - “Incompatible With Secondary Licenses” Notice the Mozilla Public License, v. 2.0. -------------------------------------------------------------------------------- -hashicorp/go-plugin Mozilla Public License 2.0 https://github.com/hashicorp/go-plugin/blob/master/LICENSE +hashicorp/go-plugin Mozilla Public License 2.0 https://github.com/hashicorp/go-plugin/blob/main/LICENSE -------------------------------------------------------------------------------- Copyright (c) 2016 HashiCorp, Inc. @@ -16067,6 +16073,8 @@ Exhibit B - “Incompatible With Secondary Licenses” Notice -------------------------------------------------------------------------------- hashicorp/go-retryablehttp Mozilla Public License 2.0 https://github.com/hashicorp/go-retryablehttp/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2015 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -16434,6 +16442,8 @@ Exhibit B - "Incompatible With Secondary Licenses" Notice -------------------------------------------------------------------------------- hashicorp/go-rootcerts Mozilla Public License 2.0 https://github.com/hashicorp/go-rootcerts/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2016 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -16801,6 +16811,8 @@ Exhibit B - "Incompatible With Secondary Licenses" Notice -------------------------------------------------------------------------------- hashicorp/go-secure-stdlib Mozilla Public License 2.0 https://github.com/hashicorp/go-secure-stdlib/blob/main/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2020 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -19056,6 +19068,8 @@ Exhibit B - “Incompatible With Secondary Licenses” Notice -------------------------------------------------------------------------------- hashicorp/logutils Mozilla Public License 2.0 https://github.com/hashicorp/logutils/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2013 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -19436,6 +19450,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -------------------------------------------------------------------------------- hashicorp/memberlist Mozilla Public License 2.0 https://github.com/hashicorp/memberlist/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2013 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -19794,6 +19810,8 @@ Exhibit B - “Incompatible With Secondary Licenses” Notice -------------------------------------------------------------------------------- hashicorp/serf Mozilla Public License 2.0 https://github.com/hashicorp/serf/blob/master/LICENSE -------------------------------------------------------------------------------- +Copyright (c) 2013 HashiCorp, Inc. + Mozilla Public License, version 2.0 1. Definitions @@ -29971,6 +29989,35 @@ prometheus/tsdb Apache License 2.0 https://github.com/prometheus-junkyard/tsdb See the License for the specific language governing permissions and limitations under the License. +-------------------------------------------------------------------------------- +rabbitmq/amqp091-go BSD-2-Clause https://github.com/rabbitmq/amqp091-go/blob/main/LICENSE +-------------------------------------------------------------------------------- +AMQP 0-9-1 Go Client +Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. + +Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright notice, this +list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + -------------------------------------------------------------------------------- rcrowley/go-metrics BSD 2-Clause "Simplified" License https://github.com/rcrowley/go-metrics/blob/master/LICENSE -------------------------------------------------------------------------------- @@ -32049,6 +32096,31 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +wagslane/go-rabbitmq MIT License https://github.com/wagslane/go-rabbitmq/blob/main/LICENSE +-------------------------------------------------------------------------------- +MIT License + +Copyright (c) 2021 Lane Wagner + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- xdg-go/pbkdf2 Apache License 2.0 https://github.com/xdg-go/pbkdf2/blob/main/LICENSE -------------------------------------------------------------------------------- diff --git a/executor/licenses/license_info.csv b/executor/licenses/license_info.csv index ae7ee70e59..eeea242f60 100644 --- a/executor/licenses/license_info.csv +++ b/executor/licenses/license_info.csv @@ -108,7 +108,7 @@ go-playground/assert,https://github.com/go-playground/assert/blob/master/LICENSE go-playground/locales,https://github.com/go-playground/locales/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/go-playground/locales/master/LICENSE go-playground/universal-translator,https://github.com/go-playground/universal-translator/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/go-playground/universal-translator/master/LICENSE go-playground/validator,https://github.com/go-playground/validator/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/go-playground/validator/master/LICENSE -go-redis/redis,https://github.com/go-redis/redis/blob/master/LICENSE,BSD 2-Clause "Simplified" License,https://raw.githubusercontent.com/go-redis/redis/master/LICENSE +go-redis/redis,https://github.com/redis/go-redis/blob/master/LICENSE,BSD 2-Clause "Simplified" License,https://raw.githubusercontent.com/redis/go-redis/master/LICENSE go-sql-driver/mysql,https://github.com/go-sql-driver/mysql/blob/master/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/go-sql-driver/mysql/master/LICENSE go-stack/stack,https://github.com/go-stack/stack/blob/master/LICENSE.md,MIT License,https://raw.githubusercontent.com/go-stack/stack/master/LICENSE.md gobuffalo/flect,https://github.com/gobuffalo/flect/blob/main/LICENSE,MIT License,https://raw.githubusercontent.com/gobuffalo/flect/main/LICENSE @@ -156,7 +156,7 @@ hashicorp/go-hclog,https://github.com/hashicorp/go-hclog/blob/main/LICENSE,MIT L hashicorp/go-immutable-radix,https://github.com/hashicorp/go-immutable-radix/blob/master/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/hashicorp/go-immutable-radix/master/LICENSE hashicorp/go-msgpack,https://github.com/hashicorp/go-msgpack/blob/main/LICENSE,MIT License,https://raw.githubusercontent.com/hashicorp/go-msgpack/main/LICENSE hashicorp/go-multierror,https://github.com/hashicorp/go-multierror/blob/master/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/hashicorp/go-multierror/master/LICENSE -hashicorp/go-plugin,https://github.com/hashicorp/go-plugin/blob/master/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/hashicorp/go-plugin/master/LICENSE +hashicorp/go-plugin,https://github.com/hashicorp/go-plugin/blob/main/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/hashicorp/go-plugin/main/LICENSE hashicorp/go-retryablehttp,https://github.com/hashicorp/go-retryablehttp/blob/master/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/hashicorp/go-retryablehttp/master/LICENSE hashicorp/go-rootcerts,https://github.com/hashicorp/go-rootcerts/blob/master/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/hashicorp/go-rootcerts/master/LICENSE hashicorp/go-secure-stdlib,https://github.com/hashicorp/go-secure-stdlib/blob/main/LICENSE,Mozilla Public License 2.0,https://raw.githubusercontent.com/hashicorp/go-secure-stdlib/main/LICENSE @@ -264,6 +264,7 @@ prometheus/common,https://github.com/prometheus/common/blob/main/LICENSE,Apache prometheus/procfs,https://github.com/prometheus/procfs/blob/master/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/prometheus/procfs/master/LICENSE prometheus/statsd_exporter,https://github.com/prometheus/statsd_exporter/blob/master/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/prometheus/statsd_exporter/master/LICENSE prometheus/tsdb,https://github.com/prometheus-junkyard/tsdb/blob/master/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/prometheus-junkyard/tsdb/master/LICENSE +rabbitmq/amqp091-go,https://github.com/rabbitmq/amqp091-go/blob/main/LICENSE,BSD-2-Clause,https://raw.githubusercontent.com/rabbitmq/amqp091-go/main/LICENSE rcrowley/go-metrics,https://github.com/rcrowley/go-metrics/blob/master/LICENSE,BSD 2-Clause "Simplified" License,https://raw.githubusercontent.com/rcrowley/go-metrics/master/LICENSE robfig/cron,https://github.com/robfig/cron/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/robfig/cron/master/LICENSE rogpeppe/fastuuid,https://github.com/rogpeppe/fastuuid/blob/master/LICENSE,BSD 3-Clause "New" or "Revised" License,https://raw.githubusercontent.com/rogpeppe/fastuuid/master/LICENSE @@ -302,6 +303,7 @@ ulikunitz/unixtime,https://github.com/ulikunitz/unixtime/blob/master/LICENSE,BSD valyala/bytebufferpool,https://github.com/valyala/bytebufferpool/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/valyala/bytebufferpool/master/LICENSE valyala/fasthttp,https://github.com/valyala/fasthttp/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/valyala/fasthttp/master/LICENSE wagslane/go-password-validator,https://github.com/wagslane/go-password-validator/blob/main/LICENSE,MIT License,https://raw.githubusercontent.com/wagslane/go-password-validator/main/LICENSE +wagslane/go-rabbitmq,https://github.com/wagslane/go-rabbitmq/blob/main/LICENSE,MIT License,https://raw.githubusercontent.com/wagslane/go-rabbitmq/main/LICENSE xdg-go/pbkdf2,https://github.com/xdg-go/pbkdf2/blob/main/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/xdg-go/pbkdf2/main/LICENSE xdg-go/scram,https://github.com/xdg-go/scram/blob/master/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/xdg-go/scram/master/LICENSE xdg-go/stringprep,https://github.com/xdg-go/stringprep/blob/master/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/xdg-go/stringprep/master/LICENSE diff --git a/executor/licenses/repo.txt b/executor/licenses/repo.txt index 6a864b12bf..2ba87b2597 100644 --- a/executor/licenses/repo.txt +++ b/executor/licenses/repo.txt @@ -265,6 +265,7 @@ prometheus/common prometheus/procfs prometheus/statsd_exporter prometheus/tsdb +rabbitmq/amqp091-go rcrowley/go-metrics robfig/cron rogpeppe/fastuuid @@ -303,6 +304,7 @@ ulikunitz/unixtime valyala/bytebufferpool valyala/fasthttp wagslane/go-password-validator +wagslane/go-rabbitmq xdg-go/pbkdf2 xdg-go/scram xdg-go/stringprep diff --git a/operator/testing/machinelearning.seldon.io_seldondeployments.yaml b/operator/testing/machinelearning.seldon.io_seldondeployments.yaml index a7fb62a2bc..49f422cfd9 100644 --- a/operator/testing/machinelearning.seldon.io_seldondeployments.yaml +++ b/operator/testing/machinelearning.seldon.io_seldondeployments.yaml @@ -4,7 +4,6 @@ metadata: annotations: cert-manager.io/inject-ca-from: seldon-system/seldon-serving-cert controller-gen.kubebuilder.io/version: v0.7.0 - creationTimestamp: null labels: app: seldon app.kubernetes.io/instance: seldon1 From 1a1947e125b6d6f4a54c50c16b72117debd51651 Mon Sep 17 00:00:00 2001 From: David McWhorter Date: Mon, 1 May 2023 11:59:49 -0400 Subject: [PATCH 5/5] go mod tidy --- executor/go.mod | 6 +----- executor/go.sum | 39 +-------------------------------------- 2 files changed, 2 insertions(+), 43 deletions(-) diff --git a/executor/go.mod b/executor/go.mod index 86d698ff45..294c08760c 100644 --- a/executor/go.mod +++ b/executor/go.mod @@ -21,6 +21,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tensorflow/tensorflow/tensorflow/go/core v0.0.0-00010101000000-000000000000 github.com/uber/jaeger-client-go v2.25.0+incompatible + github.com/wagslane/go-rabbitmq v0.12.1 go.uber.org/automaxprocs v1.4.0 go.uber.org/zap v1.19.1 golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f @@ -67,20 +68,15 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.4.0 // indirect github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - github.com/wagslane/go-rabbitmq v0.12.1 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/mod v0.5.1 // indirect - golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect - golang.org/x/tools v0.1.8 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03 // indirect diff --git a/executor/go.sum b/executor/go.sum index c6a4d2692c..d2ea40cc60 100644 --- a/executor/go.sum +++ b/executor/go.sum @@ -486,8 +486,6 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU= -github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -524,8 +522,6 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= -github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -534,7 +530,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -546,41 +541,8 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.12.0/go.mod h1:229t1eWu9UXTPmoUkbpN/fctKPBY4IJoFXQnxHGXy6E= -github.com/valyala/fasthttp v1.15.1/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA= -github.com/valyala/fasthttp v1.16.0/go.mod h1:YOKImeEosDdBPnxc0gy7INqi3m1zK6A+xl6TwOBhHCA= -github.com/valyala/fasthttp v1.31.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= -github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= -github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/valyala/quicktemplate v1.5.1/go.mod h1:v7yYWpBEiutDyNfVaph6oC/yKwejzVyTX/2cwwHxyok= -github.com/valyala/quicktemplate v1.6.2/go.mod h1:mtEJpQtUiBV0SHhMX6RtiJtqxncgrfmjcUy5T68X8TM= -github.com/valyala/quicktemplate v1.6.3/go.mod h1:fwPzK2fHuYEODzJ9pkw0ipCPNHZ2tD5KW4lOuSdPKzY= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= -github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= -github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= -github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= -github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= -github.com/vmware/govmomi v0.20.3/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= -github.com/wagslane/go-password-validator v0.3.0/go.mod h1:TI1XJ6T5fRdRnHqHt14pvy1tNVnrwe7m3/f1f2fDphQ= github.com/wagslane/go-rabbitmq v0.12.1 h1:A3ec8wmP3hr1SKsbXwFaf42xXd5D7yAeJfdFZJydKlU= github.com/wagslane/go-rabbitmq v0.12.1/go.mod h1:jTSN7opv/tmphx0MYaRR/++HQCuhxrBZEyTd0xCym2c= -github.com/whilp/git-urls v0.0.0-20191001220047-6db9661140c0/go.mod h1:2rx5KE5FLD0HRfkkpyn8JwbVLBdhgeiOb2D2D9LLKM4= -github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= -github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= -github.com/xanzy/go-gitlab v0.37.0/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug= -github.com/xanzy/go-gitlab v0.39.0/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug= -github.com/xanzy/ssh-agent v0.2.0/go.mod h1:0NyE30eGUDliuLEHJgYte/zncp2zdTStcOnWhgSqHD8= -github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4= -github.com/xanzy/ssh-agent v0.3.0/go.mod h1:3s9xbODqPuuhK9JV1R321M/FlMZSBvE5aY6eAcqrDh0= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= -github.com/xdg-go/scram v1.1.0/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= -github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= -github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -github.com/xhit/go-str2duration/v2 v2.0.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= -github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -631,6 +593,7 @@ go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhW go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=