diff --git a/executor/api/rabbitmq/connection.go b/executor/api/rabbitmq/connection.go deleted file mode 100644 index 8697ebc236..0000000000 --- a/executor/api/rabbitmq/connection.go +++ /dev/null @@ -1,137 +0,0 @@ -package rabbitmq - -import ( - "fmt" - "time" - - "github.com/go-logr/logr" - amqp "github.com/rabbitmq/amqp091-go" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher.go - */ - -const ( - connectionRetryLimit = 5 - - queueDurable = true - queueAutoDelete = false - queueExclusive = false - queueNoWait = false - - amqpExchange = "" -) - -var ( - connectionRetryDelay = 5 * time.Second - queueArgs = amqp.Table{} -) - -type connection struct { - log logr.Logger - uri string - - conn Connection - channel Channel - err chan error -} - -// NewConnection creates a new AMQP connection that targets a specific broker uri and queue. -func NewConnection(uri string, logger logr.Logger) (*connection, error) { - conn := &connection{ - uri: uri, - err: make(chan error), - log: logger.WithName("Connection"), - } - - if err := conn.connect(); err != nil { - conn.log.Error(err, "error connecting", "uri", uri) - return nil, fmt.Errorf("error '%w' connecting to '%v'", err, uri) - } - return conn, nil -} - -func (c *connection) DeclareQueue(queueName string) (amqp.Queue, error) { - return c.channel.QueueDeclare( - queueName, - queueDurable, - queueAutoDelete, - queueExclusive, - queueNoWait, - queueArgs, - ) -} - -// Close will close the underlying AMQP connection if one has been set, and this operation will cascade down to any -// channels created under this connection. -func (c *connection) Close() error { - if c.conn != nil { - err := c.conn.Close() - if err != nil { - return fmt.Errorf("error '%w' closing connection '%v'", err, c) - } - } - return nil -} - -// implements retry logic with delays for establishing AMQP connections. -func (c *connection) connect() error { - - ticker := time.NewTicker(connectionRetryDelay) - defer ticker.Stop() - - for counter := 0; counter < connectionRetryLimit; <-ticker.C { - var err error - - c.conn, err = defaultDialerAdapter(c.uri) - if err != nil { - c.log.Error(err, "cannot dial rabbitmq", "uri", c.uri, "attempt", counter+1) - - counter++ - continue - } - - go func() { - closed := make(chan *amqp.Error, 1) - c.conn.NotifyClose(closed) - - reason, ok := <-closed - if ok { - c.log.Error(reason, "rabbitmq connection closed, registering err signal") - c.err <- reason - } - }() - - c.channel, err = c.conn.Channel() - if err != nil { - c.log.Error(err, "error creating rabbitmq channel", "uri", c.uri) - return fmt.Errorf("error '%w' creating rabbitmq channel to %q", err, c.uri) - } - - go func() { - closed := make(chan *amqp.Error, 1) - c.channel.NotifyClose(closed) - - reason, ok := <-closed - if ok { - c.log.Error(reason, "rabbitmq channel closed, registering err signal") - c.err <- reason - } - }() - - err = c.channel.Qos( - 1, - 0, - true, - ) - if err != nil { - c.log.Error(err, "error setting rabbitmq Qos") - return fmt.Errorf("error '%w' error setting rabbitmq Qos", err) - } - - return nil - } - - return fmt.Errorf("rabbitmq connection retry limit reached: %d", connectionRetryLimit) -} diff --git a/executor/api/rabbitmq/connection_test.go b/executor/api/rabbitmq/connection_test.go deleted file mode 100644 index 76fac8eb2a..0000000000 --- a/executor/api/rabbitmq/connection_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package rabbitmq - -import ( - "errors" - "github.com/go-logr/logr/testr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "testing" - "time" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go - */ - -var ( - uri = "amqp://test-rabbitmq:5672/" - queueName = "test-queue" - consumerTag = "tag" -) - -type connectFixture struct { - adapter *mockDialerAdapter - connection *mockConnection - channel *mockChannel -} - -func setupConnect(fn func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel)) ( - *connectFixture, func(), -) { - mockChan := &mockChannel{} - mockConn := &mockConnection{} - mockAdapter := &mockDialerAdapter{} - - fn(mockAdapter, mockConn, mockChan) - - origAdapter := defaultDialerAdapter - origRetryDelay := connectionRetryDelay - - defaultDialerAdapter = mockAdapter.Dial - connectionRetryDelay = 1 * time.Nanosecond - - fixture := &connectFixture{ - adapter: mockAdapter, - connection: mockConn, - channel: mockChan, - } - reset := func() { - defaultDialerAdapter = origAdapter - connectionRetryDelay = origRetryDelay - } - - return fixture, reset -} - -func TestNewConnection(t *testing.T) { - log := testr.New(t) - - t.Run("connect", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - conn.On("Channel").Return(channel, nil) - adapter.On("Dial", uri).Return(conn, nil) - }) - defer reset() - - actual, err := NewConnection(uri, log) - require.NoError(t, err) - assert.NotNil(t, actual.conn) - assert.NotNil(t, actual.channel) - assert.Equal(t, uri, actual.uri) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("reconnect", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - conn.On("Channel").Return(channel, nil) - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")).Once() - adapter.On("Dial", uri).Return(conn, nil).Once() - }) - defer reset() - - actual, err := NewConnection(uri, log) - require.NoError(t, err) - assert.NotNil(t, actual.conn) - assert.NotNil(t, actual.channel) - assert.Equal(t, uri, actual.uri) - - f.adapter.AssertExpectations(t) - f.adapter.AssertNumberOfCalls(t, "Dial", 2) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("channel_failure", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - conn.On("Channel").Return(nil, errors.New("test channel failure")) - adapter.On("Dial", uri).Return(conn, nil) - }) - defer reset() - - _, err := NewConnection(uri, log) - assert.Error(t, err) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - }) - - t.Run("retry_limit_failure", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")) - }) - defer reset() - - _, err := NewConnection(uri, log) - assert.Error(t, err) - - f.adapter.AssertExpectations(t) - f.adapter.AssertNumberOfCalls(t, "Dial", connectionRetryLimit) - }) -} - -func TestConnection_Close(t *testing.T) { - t.Run("success", func(t *testing.T) { - mockConn := &mockConnection{} - mockConn.On("Close").Return(nil) - con := &connection{ - conn: mockConn, - } - - assert.NoError(t, con.Close()) - mockConn.AssertExpectations(t) - }) - - t.Run("failure", func(t *testing.T) { - mockConn := &mockConnection{} - mockConn.On("Close").Return(errors.New("test failed to close connection")) - con := &connection{ - conn: mockConn, - } - - assert.ErrorContains(t, con.Close(), "test failed to close connection") - mockConn.AssertExpectations(t) - }) - - t.Run("no_connection", func(t *testing.T) { - assert.NoError(t, (&connection{}).Close()) - }) -} diff --git a/executor/api/rabbitmq/consumer.go b/executor/api/rabbitmq/consumer.go deleted file mode 100644 index 735b90d357..0000000000 --- a/executor/api/rabbitmq/consumer.go +++ /dev/null @@ -1,138 +0,0 @@ -package rabbitmq - -import ( - "fmt" - "github.com/go-logr/logr" - amqp "github.com/rabbitmq/amqp091-go" - "os" - "os/signal" - "syscall" -) - -/* - * based on patterns from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher.go - */ - -type consumer struct { - connection - queueName string - consumerTag string -} - -func NewConsumer(uri, queueName, consumerTag string, logger logr.Logger) (*consumer, error) { - c, err := NewConnection(uri, logger.WithName("Consumer")) - if err != nil { - c.log.Error(err, "error creating connection for consumer", "uri", c.uri) - return nil, fmt.Errorf("error '%w' creating connection to '%v' for consumer", err, uri) - } - return &consumer{ - *c, - queueName, - consumerTag, - }, nil -} - -// In the event that the underlying connection was closed after connection creation, this function will attempt to -// reconnection to the AMQP broker before performing these operations. -// this is a blocking function while the consumer is running, run it in a goroutine if needed -func (c *consumer) Consume( - payloadHandler func(*SeldonPayloadWithHeaders) error, - errorHandler func(args ConsumerError) error, -) error { - select { - case <-c.err: - c.log.Info("attempting to reconnect to rabbitmq", "uri", c.uri) - - if err := c.connect(); err != nil { - c.log.Error(err, "error reconnecting to rabbitmq") - return fmt.Errorf("error '%w' reconnecting to rabbitmq", err) - } - default: - } - - // default exchange with queue name as name is the same as a direct exchange routed to the queue - deliveries, err := c.channel.Consume( - c.queueName, // name - c.consumerTag, // consumerTag, - false, // autoAck - false, // exclusive - false, // noLocal - false, // noWait - amqp.Table{}, // arguments TODO should something go here? - ) - if err != nil { - c.log.Error(err, "error consuming from rabbitmq queue") - return fmt.Errorf("error '%w' consuming from rabbitmq queue", err) - } - - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - for delivery := range deliveries { - select { - case sig := <-sigChan: - c.log.Info("terminating due to signal", "signal", sig) - break - default: - pl, err := DeliveryToPayload(delivery) - if err != nil { - errorHandlerErr := c.handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler) - if errorHandlerErr != nil { - // fatal error, abort the consumer - return errorHandlerErr - } - continue - } - err = payloadHandler(pl) - if err != nil { - errorHandlerErr := c.handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler) - if errorHandlerErr != nil { - // fatal error, abort the consumer - return errorHandlerErr - } - continue - } - ackErr := delivery.Ack(false) - if ackErr != nil { - // fatal error, abort the consumer - return ackErr - } - } - } - close(sigChan) - - select { - case err = <-c.err: - return fmt.Errorf("error '%w' with rabbitmq connection", err) - default: - } - - return nil -} - -type ConsumerError struct { - err error - delivery amqp.Delivery - pl *SeldonPayloadWithHeaders // might be nil -} - -func (c *consumer) handleConsumerError( - args ConsumerError, - errorHandler func(args ConsumerError) error, -) error { - delivery := args.delivery - - err := errorHandler(args) - if err != nil { - c.log.Error(err, "error handler encountered an error", "original error", args.err) - return fmt.Errorf("error handler encountered an error '%w' when handling original error '%v'", err, args.err) - } - - rejectErr := delivery.Reject(false) - if rejectErr != nil { - c.log.Error(rejectErr, "error rejecting") - return fmt.Errorf("error '%w' rejecting", rejectErr) - } - - return nil -} diff --git a/executor/api/rabbitmq/consumer_test.go b/executor/api/rabbitmq/consumer_test.go deleted file mode 100644 index d5d2afe8a9..0000000000 --- a/executor/api/rabbitmq/consumer_test.go +++ /dev/null @@ -1,173 +0,0 @@ -package rabbitmq - -import ( - "errors" - "github.com/go-logr/logr/testr" - proto2 "github.com/golang/protobuf/proto" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" - "github.com/seldonio/seldon-core/executor/api/payload" - "github.com/seldonio/seldon-core/executor/api/rest" - "github.com/stretchr/testify/assert" - "testing" -) - -/* - * based on patterns from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go - */ - -func TestConsume(t *testing.T) { - log := testr.New(t) - testMessageStr := `"hello"` - seldonMessage := proto.SeldonMessage{ - Status: &proto.Status{ - Status: proto.Status_SUCCESS, - }, - Meta: nil, - DataOneof: &proto.SeldonMessage_StrData{ - StrData: testMessageStr, - }, - } - seldonMessageEnc, _ := proto2.Marshal(&seldonMessage) - seldonMessage.XXX_sizecache = 0 // to make test cases match - - t.Run("success", func(t *testing.T) { - mockChan := &mockChannel{} - - mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer - mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) - close(mockDeliveries) - - mockChan.On("Consume", queueName, consumerTag, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, - nil) - mockChan.On("Ack", uint64(0), false).Return(nil) - - cons := &consumer{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - consumerTag: consumerTag, - } - - payloadHandler := func(pl *SeldonPayloadWithHeaders) error { - assert.Equal( - t, - &SeldonPayloadWithHeaders{ - &payload.BytesPayload{ - Msg: []byte(`"hello"`), - ContentType: rest.ContentTypeJSON, - ContentEncoding: "", - }, - make(map[string][]string), - }, - pl, - "payloads not equal", - ) - return nil - } - - errorHandler := func(err ConsumerError) error { - assert.NoError(t, err.err, "unexpected error") - return nil - } - - err := cons.Consume(payloadHandler, errorHandler) - - assert.NoError(t, err) - mockChan.AssertExpectations(t) - }) - - t.Run("on failed message don't retry, but reject method", func(t *testing.T) { - mockChan := &mockChannel{} - - mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer - mockDeliveries <- createTestDelivery(mockChan, []byte(testMessageStr), rest.ContentTypeJSON) - close(mockDeliveries) - - mockChan.On("Consume", queueName, consumerTag, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, - nil) - mockChan.On("Reject", uint64(0), false).Return(nil) - - cons := &consumer{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - consumerTag: consumerTag, - } - - payloadHandler := func(pl *SeldonPayloadWithHeaders) error { - return errors.New("Something bad happened during prediction") - } - - errorHandler := func(err ConsumerError) error { - return nil - } - - err := cons.Consume(payloadHandler, errorHandler) - - assert.NoError(t, err) - mockChan.AssertExpectations(t) - }) - - t.Run("encoded seldon msg", func(t *testing.T) { - mockChan := &mockChannel{} - - mockDeliveries := make(chan amqp.Delivery, 1) // buffer 1 so that we send returns before starting consumer - mockDeliveries <- createTestDelivery(mockChan, seldonMessageEnc, payload.APPLICATION_TYPE_PROTOBUF) - close(mockDeliveries) - - mockChan.On("Consume", queueName, consumerTag, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, - nil) - mockChan.On("Ack", uint64(0), false).Return(nil) - - cons := &consumer{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - consumerTag: consumerTag, - } - - payloadHandler := func(pl *SeldonPayloadWithHeaders) error { - assert.Equal( - t, - &SeldonPayloadWithHeaders{ - &payload.ProtoPayload{ - Msg: &seldonMessage, - }, - make(map[string][]string), - }, - pl, - "payloads not equal", - ) - return nil - } - - errorHandler := func(err ConsumerError) error { - assert.NoError(t, err.err, "unexpected error") - return nil - } - - err := cons.Consume(payloadHandler, errorHandler) - - assert.NoError(t, err) - mockChan.AssertExpectations(t) - }) -} - -func createTestDelivery(ack amqp.Acknowledger, body []byte, contentType string) amqp.Delivery { - return amqp.Delivery{ - Acknowledger: ack, - Body: body, - ContentType: contentType, - ContentEncoding: "", - } -} diff --git a/executor/api/rabbitmq/publisher.go b/executor/api/rabbitmq/publisher.go deleted file mode 100644 index 660ebf2128..0000000000 --- a/executor/api/rabbitmq/publisher.go +++ /dev/null @@ -1,68 +0,0 @@ -package rabbitmq - -import ( - "fmt" - - "github.com/go-logr/logr" - amqp "github.com/rabbitmq/amqp091-go" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher.go - */ - -const ( - publishMandatory = true - publishImmediate = false -) - -type publisher struct { - connection - queueName string -} - -func NewPublisher(uri, queueName string, logger logr.Logger) (*publisher, error) { - c, err := NewConnection(uri, logger.WithName("Publisher")) - if err != nil { - c.log.Error(err, "error creating connection for publisher", "uri", c.uri) - return nil, fmt.Errorf("error '%w' creating connection to '%v' for publisher", err, uri) - } - return &publisher{ - *c, - queueName, - }, nil -} - -// In the event that the underlying connection was closed after connection creation, this function will attempt to -// reconnection to the AMQP broker before performing these operations. -func (p *publisher) Publish(payload SeldonPayloadWithHeaders) error { - select { - case <-p.err: - p.log.Info("attempting to reconnect to rabbitmq", "uri", p.uri) - - if err := p.connect(); err != nil { - p.log.Error(err, "error reconnecting to rabbitmq") - return fmt.Errorf("error '%w' reconnecting to rabbitmq", err) - } - default: - } - - body, err := payload.GetBytes() - if err != nil { - p.log.Error(err, "error retrieving payload bytes") - return fmt.Errorf("error '%w' retrieving payload bytes", err) - } - message := amqp.Publishing{ - Headers: StringMapToTable(payload.Headers), - ContentType: payload.GetContentType(), - ContentEncoding: payload.GetContentEncoding(), - DeliveryMode: amqp.Persistent, - Body: body, - } - err = p.channel.Publish(amqpExchange, p.queueName, publishMandatory, publishImmediate, message) - if err != nil { - p.log.Error(err, "error publishing rabbitmq message") - return fmt.Errorf("error '%w' publishing rabbitmq message", err) - } - return nil -} diff --git a/executor/api/rabbitmq/publisher_test.go b/executor/api/rabbitmq/publisher_test.go deleted file mode 100644 index 1b2030b57b..0000000000 --- a/executor/api/rabbitmq/publisher_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package rabbitmq - -import ( - "errors" - "github.com/go-logr/logr/testr" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/stretchr/testify/assert" - "testing" -) - -/* - * mostly taken from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/publisher_test.go - */ - -func TestPublisher(t *testing.T) { - log := testr.New(t) - testMessage := SeldonPayloadWithHeaders{ - &TestPayload{Msg: `"hello"`}, - make(map[string][]string), - } - - t.Run("success", func(t *testing.T) { - mockChan := &mockChannel{} - - msg := amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - } - mockChan.On("Publish", "", queueName, true, false, msg).Return(nil) - - pub := &publisher{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - } - - assert.NoError(t, pub.Publish(testMessage)) - - mockChan.AssertExpectations(t) - }) - - t.Run("publish_failure", func(t *testing.T) { - mockChan := &mockChannel{} - - mockChan.On("Publish", "", queueName, true, false, amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - }).Return(errors.New("test error")) - - pub := &publisher{ - connection: connection{ - log: log, - channel: mockChan, - }, - queueName: queueName, - } - - assert.ErrorContains(t, pub.Publish(testMessage), "test error") - - mockChan.AssertExpectations(t) - }) - - t.Run("connection_closed", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - channel.On("Publish", "", queueName, true, false, amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - }).Return(nil) - - conn.On("Channel").Return(channel, nil) - adapter.On("Dial", uri).Return(conn, nil) - }) - defer reset() - - pub := &publisher{ - connection: connection{ - uri: uri, - log: log, - err: make(chan error, 1), - }, - queueName: queueName, - } - - pub.err <- errors.New("dang, conn be broke") - - assert.NoError(t, pub.Publish(testMessage)) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("connection_closed_retry", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - channel.On("Qos", 1, 0, true).Return(nil) - channel.On("Publish", "", queueName, true, false, amqp.Publishing{ - Headers: make(map[string]interface{}), - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(`"hello"`), - }).Return(nil) - - conn.On("Channel").Return(channel, nil) - - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")).Once() - adapter.On("Dial", uri).Return(conn, nil).Once() - }) - defer reset() - - pub := &publisher{ - connection: connection{ - uri: uri, - log: log, - err: make(chan error, 1), - }, - queueName: queueName, - } - - pub.err <- errors.New("dang, conn be broke") - - assert.NoError(t, pub.Publish(testMessage)) - - f.adapter.AssertExpectations(t) - f.connection.AssertExpectations(t) - f.channel.AssertExpectations(t) - }) - - t.Run("connection_closed_retry_failure", func(t *testing.T) { - f, reset := setupConnect(func(adapter *mockDialerAdapter, conn *mockConnection, channel *mockChannel) { - adapter.On("Dial", uri).Return(nil, errors.New("test dial error")) - }) - defer reset() - - pub := &publisher{ - connection: connection{ - uri: uri, - log: log, - err: make(chan error, 1), - }, - queueName: queueName, - } - - pub.err <- errors.New("dang, conn be broke") - - assert.Error(t, pub.Publish(testMessage)) - - f.adapter.AssertExpectations(t) - }) -} diff --git a/executor/api/rabbitmq/rabbitmq_client.go b/executor/api/rabbitmq/rabbitmq_client.go new file mode 100644 index 0000000000..7d5c2db316 --- /dev/null +++ b/executor/api/rabbitmq/rabbitmq_client.go @@ -0,0 +1,145 @@ +package rabbitmq + +import ( + "fmt" + "github.com/go-logr/logr" + "github.com/wagslane/go-rabbitmq" +) + +const ( + publishMandatory = true + publishImmediate = false +) + +type RabbitMqConnection struct { + conn *rabbitmq.Conn + log logr.Logger +} + +type RabbitMqConsumer struct { + rabbitmq.Consumer +} + +type RabbitMqPublisher struct { + rabbitmq.Publisher + log logr.Logger +} + +func createRabbitMQConnection(brokerUrl string, log logr.Logger) (ConnectionWrapper, error) { + conn, err := rabbitmq.NewConn(brokerUrl, rabbitmq.WithConnectionOptionsLogging) + if err != nil { + log.Error(err, "error connecting to rabbitmq") + return nil, fmt.Errorf("error '%w' connecting to rabbitmq", err) + } + return &RabbitMqConnection{conn, log}, nil +} + +func (c *RabbitMqConnection) Close() error { + return c.conn.Close() +} + +func (c *RabbitMqConnection) NewPublisher() (PublisherWrapper, error) { + publisher, err := rabbitmq.NewPublisher( + c.conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName(""), //default exchange + ) + if err != nil { + c.log.Error(err, "error creating publisher") + return nil, fmt.Errorf("error '%w' creating publisher", err) + } + return &RabbitMqPublisher{*publisher, c.log}, nil +} + +func (c *RabbitMqConnection) NewConsumer(handler rabbitmq.Handler, queue string, consumerTag string) (ConsumerWrapper, error) { + consumer, err := rabbitmq.NewConsumer( + c.conn, + handler, + queue, + rabbitmq.WithConsumerOptionsConsumerName(consumerTag), + rabbitmq.WithConsumerOptionsQueueNoDeclare, + ) + if err != nil { + c.log.Error(err, "error creating publisher") + return nil, fmt.Errorf("error '%w' creating publisher", err) + } + return &RabbitMqConsumer{*consumer}, nil +} + +func (c *RabbitMqConsumer) Close() { + c.Consumer.Close() +} + +func (p *RabbitMqPublisher) Close() { + p.Publisher.Close() +} + +func (p *RabbitMqPublisher) Publish( + payloadWithHeaders SeldonPayloadWithHeaders, + queueName string, +) error { + + payload := payloadWithHeaders.Payload + body, err := payload.GetBytes() + if err != nil { + p.log.Error(err, "error retrieving payload bytes") + return fmt.Errorf("error '%w' retrieving payload bytes", err) + } + + options := []func(options *rabbitmq.PublishOptions){ + rabbitmq.WithPublishOptionsHeaders(StringMapToTable(payloadWithHeaders.Headers)), + rabbitmq.WithPublishOptionsContentType(payload.GetContentType()), + rabbitmq.WithPublishOptionsContentEncoding(""), + rabbitmq.WithPublishOptionsPersistentDelivery, + } + if publishMandatory { + options = append(options, rabbitmq.WithPublishOptionsMandatory) + } + if publishImmediate { + options = append(options, rabbitmq.WithPublishOptionsImmediate) + } + + err = p.Publisher.Publish( + body, + []string{queueName}, + options..., + ) + if err != nil { + p.log.Error(err, "error publishing rabbitmq message") + return fmt.Errorf("error '%w' publishing rabbitmq message", err) + } + + return nil +} + +func CreateConsumerHandler( + payloadHandler func(*SeldonPayloadWithHeaders) error, + errorHandler func(args ConsumerError) error, + log logr.Logger, +) rabbitmq.Handler { + return func(delivery rabbitmq.Delivery) rabbitmq.Action { + pl, err := DeliveryToPayload(delivery) + if err != nil { + return handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler, log) + } + err = payloadHandler(pl) + if err != nil { + return handleConsumerError(ConsumerError{err, delivery, pl}, errorHandler, log) + } + return rabbitmq.Ack + } +} + +func handleConsumerError( + args ConsumerError, + errorHandler func(args ConsumerError) error, + log logr.Logger, +) rabbitmq.Action { + + err := errorHandler(args) + if err != nil { + log.Error(err, "error handler encountered an error", "original error", args.err) + } + + return rabbitmq.NackDiscard +} diff --git a/executor/api/rabbitmq/rabbitmq_test.go b/executor/api/rabbitmq/rabbitmq_test.go index 007d5494b5..9054561b01 100644 --- a/executor/api/rabbitmq/rabbitmq_test.go +++ b/executor/api/rabbitmq/rabbitmq_test.go @@ -1,91 +1,65 @@ package rabbitmq import ( - amqp "github.com/rabbitmq/amqp091-go" "github.com/seldonio/seldon-core/executor/api/rest" "github.com/stretchr/testify/mock" + "github.com/wagslane/go-rabbitmq" ) -/* - * adapted from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/amqp_test.go - */ - -type mockDialerAdapter struct { - mock.Mock -} - -func (m *mockDialerAdapter) Dial(url string) (Connection, error) { - args := m.Called(url) - amqpConn, _ := args.Get(0).(Connection) - - return amqpConn, args.Error(1) -} - -type mockConnection struct { +type MockConnectionWrapper struct { mock.Mock } -func (m *mockConnection) Channel() (Channel, error) { - args := m.Called() - amqpCh, _ := args.Get(0).(Channel) - - return amqpCh, args.Error(1) -} - -func (m *mockConnection) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { - return receiver -} - -func (m *mockConnection) Close() error { - args := m.Called() - return args.Error(0) +func (m *MockConnectionWrapper) Close() error { + returnArgs := m.Called() + return returnArgs.Error(0) } -type mockChannel struct { - mock.Mock -} - -func (m *mockChannel) QueueDeclare( - name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, -) (amqp.Queue, error) { - mArgs := m.Called(name, durable, autoDelete, exclusive, noWait, args) - return mArgs.Get(0).(amqp.Queue), mArgs.Error(1) +func (m *MockConnectionWrapper) NewPublisher() (PublisherWrapper, error) { + returnArgs := m.Called() + publisher := returnArgs.Get(0).(PublisherWrapper) + return publisher, returnArgs.Error(1) } -func (m *mockChannel) Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error { - args := m.Called(exchange, key, mandatory, immediate, msg) - return args.Error(0) +func (m *MockConnectionWrapper) NewConsumer(handler rabbitmq.Handler, queue string, consumerTag string) (ConsumerWrapper, error) { + returnArgs := m.Called(handler, inputQueue, consumerTag) + consumer := returnArgs.Get(0).(*TestConsumerWrapper) // unchecked for now + consumer.handler = handler + consumer.queue = queue + consumer.consumerTag = consumerTag + return consumer, returnArgs.Error(1) } -func (m *mockChannel) Consume( - name string, consumerTag string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table, -) (<-chan amqp.Delivery, error) { - mArgs := m.Called(name, consumerTag, autoAck, exclusive, noLocal, noWait, args) - return mArgs.Get(0).(chan amqp.Delivery), mArgs.Error(1) +type TestConsumerWrapper struct { + handler rabbitmq.Handler + queue string + consumerTag string + isClosed bool } -func (m *mockChannel) Ack(tag uint64, multiple bool) error { - args := m.Called(tag, multiple) - return args.Error(0) +func (m *TestConsumerWrapper) Close() { + println("closed consumer wrapper") + m.isClosed = true } -func (m *mockChannel) Nack(tag uint64, multiple bool, requeue bool) error { - args := m.Called(tag, multiple, requeue) - return args.Error(0) +func (m *TestConsumerWrapper) SimulateDelivery(delivery rabbitmq.Delivery) rabbitmq.Action { + return m.handler(delivery) } -func (m *mockChannel) Reject(tag uint64, requeue bool) error { - args := m.Called(tag, requeue) - return args.Error(0) +type MockPublisherWrapper struct { + mock.Mock } -func (m *mockChannel) Qos(prefetchCount int, prefetchSize int, global bool) error { - args := m.Called(prefetchCount, prefetchSize, global) - return args.Error(0) +func (m *MockPublisherWrapper) Close() { + m.Called() } -func (m *mockChannel) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error { - return receiver +func (m *MockPublisherWrapper) Publish( + payload SeldonPayloadWithHeaders, + queueName string, +) error { + returnArgs := m.Called(payload, queueName) + return returnArgs.Error(0) } type TestPayload struct { diff --git a/executor/api/rabbitmq/server.go b/executor/api/rabbitmq/server.go index ae3c9bbf91..a0a15c48e5 100644 --- a/executor/api/rabbitmq/server.go +++ b/executor/api/rabbitmq/server.go @@ -4,19 +4,21 @@ import ( "context" "errors" "fmt" - "log" "net/url" "os" + "os/signal" "strconv" + "sync" + "syscall" "time" "github.com/golang/protobuf/jsonpb" guuid "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" - amqp "github.com/rabbitmq/amqp091-go" "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" "github.com/seldonio/seldon-core/executor/k8s" + "github.com/wagslane/go-rabbitmq" "github.com/go-logr/logr" "github.com/seldonio/seldon-core/executor/api" @@ -121,29 +123,61 @@ func CreateRabbitMQServer(args RabbitMQServerOptions) (*SeldonRabbitMQServer, er } func (rs *SeldonRabbitMQServer) Serve() error { - conn, err := NewConnection(rs.BrokerUrl, rs.Log.WithName("RabbitMQServerConnection")) + conn, err := createRabbitMQConnection(rs.BrokerUrl, rs.Log) if err != nil { rs.Log.Error(err, "error connecting to rabbitmq") return fmt.Errorf("error '%w' connecting to rabbitmq", err) } + defer func(conn ConnectionWrapper) { + err := conn.Close() + if err != nil { + rs.Log.Error(err, "error closing rabbitMQ connection") + } + }(conn) + + wg := new(sync.WaitGroup) + terminateChan, err := rs.serve(conn, wg) + if err != nil { + rs.Log.Error(err, "error starting rabbitmq server") + return fmt.Errorf("error '%w' starting rabbitmq server", err) + } + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + wg.Add(1) + // wait for shutdown signal and terminate if received go func() { - err := <-conn.err - log.Fatal("RabbitMQ connection died", err) // causes app to exit with error + rs.Log.Info("awaiting OS shutdown signals") + sig := <-sigs + rs.Log.Info("sending termination message due to signal", "signal", sig) + terminateChan <- true + wg.Done() }() - return rs.serve(conn) + wg.Wait() + + rs.Log.Info("RabbitMQ server terminated normally") + return nil } -func (rs *SeldonRabbitMQServer) serve(conn *connection) error { - //TODO not sure if this is the best pattern or better to pass in pod name explicitly somehow +func (rs *SeldonRabbitMQServer) serve(conn ConnectionWrapper, wg *sync.WaitGroup) (chan<- bool, error) { + // not sure if this is the best pattern or better to pass in pod name explicitly somehow consumerTag, err := os.Hostname() if err != nil { - return fmt.Errorf("error '%w' retrieving hostname", err) + return nil, fmt.Errorf("error '%w' retrieving hostname", err) } - consumer := &consumer{*conn, rs.InputQueueName, consumerTag} - rs.Log.Info("Created", "consumer", consumer, "input queue", rs.InputQueueName) + publisher, err := conn.NewPublisher() + if err != nil { + return nil, fmt.Errorf("error '%w' creating RMQ publisher", err) + } + rs.Log.Info("Created", "publisher", publisher) + + consumerHandler := CreateConsumerHandler( + func(reqPl *SeldonPayloadWithHeaders) error { return rs.predictAndPublishResponse(reqPl, publisher) }, + func(args ConsumerError) error { return rs.createAndPublishErrorResponse(args, publisher) }, + rs.Log, + ) // wait for graph to be ready ready := false @@ -156,25 +190,31 @@ func (rs *SeldonRabbitMQServer) serve(conn *connection) error { } } - producer := &publisher{*conn, rs.OutputQueueName} - - // consumer creates input queue if it doesn't exist - err = consumer.Consume( - func(reqPl *SeldonPayloadWithHeaders) error { return rs.predictAndPublishResponse(reqPl, producer) }, - func(args ConsumerError) error { return rs.createAndPublishErrorResponse(args, producer) }, - ) + consumer, err := conn.NewConsumer(consumerHandler, rs.InputQueueName, consumerTag) if err != nil { - rs.Log.Error(err, "error in consumer") - return fmt.Errorf("error '%w' in consumer", err) + return nil, fmt.Errorf("error '%w' creating RMQ consumer", err) } + rs.Log.Info("Created", "consumer", consumer, "input queue", rs.InputQueueName) - rs.Log.Info("Consumer exited without error") - return nil + // provide a channel to terminate the server + terminate := make(chan bool, 1) + + wg.Add(1) + go func() { + rs.Log.Info("awaiting group termination") + <-terminate + rs.Log.Info("termination initiated, shutting down") + consumer.Close() + publisher.Close() + wg.Done() + }() + + return terminate, nil } func (rs *SeldonRabbitMQServer) predictAndPublishResponse( reqPayload *SeldonPayloadWithHeaders, - publisher *publisher, + publisher PublisherWrapper, ) error { if reqPayload == nil { err := errors.New("missing request payload") @@ -198,7 +238,7 @@ func (rs *SeldonRabbitMQServer) predictAndPublishResponse( seldonPredictorProcess := pred.NewPredictorProcess( ctx, rs.Client, rs.Log.WithName("RabbitMqClient"), &rs.ServerUrl, rs.Namespace, reqPayload.Headers, "") - resPayload, err := seldonPredictorProcess.Predict(&rs.Predictor.Graph, reqPayload) + resPayload, err := seldonPredictorProcess.Predict(&rs.Predictor.Graph, reqPayload.Payload) if err != nil && resPayload == nil { // normal errors from the predict process contain a status failed payload // this is handling an unexpected case, so failing entirely, at least for now @@ -238,15 +278,15 @@ func (rs *SeldonRabbitMQServer) predictAndPublishResponse( } } - updatedPayload, err := UpdatePayloadWithPuid(reqPayload, resPayload) + updatedPayload, err := UpdatePayloadWithPuid(reqPayload.Payload, resPayload) if err != nil { rs.Log.Error(err, UNHANDLED_ERROR) return fmt.Errorf("unhandled error %w from predictor process", err) } - return publishPayload(publisher, updatedPayload, seldonPuid) + return rs.publishPayload(publisher, updatedPayload, seldonPuid) } -func (rs *SeldonRabbitMQServer) createAndPublishErrorResponse(errorArgs ConsumerError, publisher *publisher) error { +func (rs *SeldonRabbitMQServer) createAndPublishErrorResponse(errorArgs ConsumerError, publisher PublisherWrapper) error { reqPayload := errorArgs.pl seldonPuid := assignAndReturnPUID(reqPayload, &errorArgs.delivery) @@ -285,10 +325,23 @@ func (rs *SeldonRabbitMQServer) createAndPublishErrorResponse(errorArgs Consumer break } - return publishPayload(publisher, resPayload, seldonPuid) + return rs.publishPayload(publisher, resPayload, seldonPuid) +} + +func addPuidHeader(pl payload.SeldonPayload, seldonPuid string) SeldonPayloadWithHeaders { + resHeaders := map[string][]string{payload.SeldonPUIDHeader: {seldonPuid}} + return SeldonPayloadWithHeaders{ + pl, + resHeaders, + } +} + +func (rs *SeldonRabbitMQServer) publishPayload(publisher PublisherWrapper, pl payload.SeldonPayload, seldonPuid string) error { + plWithHeaders := addPuidHeader(pl, seldonPuid) + return publisher.Publish(plWithHeaders, rs.OutputQueueName) } -func assignAndReturnPUID(pl *SeldonPayloadWithHeaders, delivery *amqp.Delivery) string { +func assignAndReturnPUID(pl *SeldonPayloadWithHeaders, delivery *rabbitmq.Delivery) string { if pl == nil { if delivery != nil && delivery.Headers != nil && delivery.Headers[payload.SeldonPUIDHeader] != nil { return delivery.Headers[payload.SeldonPUIDHeader].(string) @@ -303,15 +356,3 @@ func assignAndReturnPUID(pl *SeldonPayloadWithHeaders, delivery *amqp.Delivery) } return pl.Headers[payload.SeldonPUIDHeader][0] } - -func publishPayload(publisher *publisher, pl payload.SeldonPayload, seldonPuid string) error { - resHeaders := map[string][]string{payload.SeldonPUIDHeader: {seldonPuid}} - //TODO might need more headers - - resPayloadWithHeaders := SeldonPayloadWithHeaders{ - pl, - resHeaders, - } - - return publisher.Publish(resPayloadWithHeaders) -} diff --git a/executor/api/rabbitmq/server_test.go b/executor/api/rabbitmq/server_test.go index 56b8633fac..3263b73f10 100644 --- a/executor/api/rabbitmq/server_test.go +++ b/executor/api/rabbitmq/server_test.go @@ -1,25 +1,29 @@ package rabbitmq import ( - "errors" + "bytes" "fmt" "github.com/go-logr/logr/testr" + "github.com/golang/protobuf/jsonpb" guuid "github.com/google/uuid" . "github.com/onsi/gomega" amqp "github.com/rabbitmq/amqp091-go" "github.com/seldonio/seldon-core/executor/api" + "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/rest" "github.com/seldonio/seldon-core/executor/api/test" v1 "github.com/seldonio/seldon-core/operator/apis/machinelearning.seldon.io/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/wagslane/go-rabbitmq" "io/ioutil" "net/http" "net/http/httptest" "net/url" "strconv" "strings" + "sync" "testing" ) @@ -49,11 +53,11 @@ func TestRabbitMqServer(t *testing.T) { }) server := httptest.NewServer(handler) defer server.Close() - serverUrl, err := url.Parse(server.URL) - g.Expect(err).Should(BeNil()) + serverUrl, setupErr := url.Parse(server.URL) + g.Expect(setupErr).Should(BeNil()) urlParts := strings.Split(serverUrl.Host, ":") - port, err := strconv.Atoi(urlParts[1]) - g.Expect(err).Should(BeNil()) + port, setupErr := strconv.Atoi(urlParts[1]) + g.Expect(setupErr).Should(BeNil()) p := v1.PredictorSpec{ Name: "p", @@ -87,15 +91,78 @@ func TestRabbitMqServer(t *testing.T) { testPuid := guuid.New().String() testHeaders := map[string]interface{}{payload.SeldonPUIDHeader: testPuid} - invalidErrorJsonResponse := fmt.Sprintf( + // test predictor process returns the request payload as the response + validJson := fmt.Sprintf( + `{"meta":{"puid":"%v"},"jsonData":{"start":1,"stop":10}}`, + testPuid, + ) + validRequestDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: "application/json", + DeliveryMode: rabbitmq.Persistent, + Body: []byte(validJson), + Headers: testHeaders, + }, + } + validPayloadResponseWithHeaders := SeldonPayloadWithHeaders{ + &payload.BytesPayload{ + Msg: []byte(validJson), + ContentType: "application/json", + }, + TableToStringMap(testHeaders), + } + + // test predictor process returns the request payload as the response + validJsonNoPuid := `{"jsonData":{"start":1,"stop":10}}` + validRequestNoPuidDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: "application/json", + DeliveryMode: rabbitmq.Persistent, + Body: []byte(validJsonNoPuid), + Headers: map[string]interface{}{}, + }, + } + validPayloadResponseNoPuid := + &payload.BytesPayload{ + Msg: []byte(validJsonNoPuid), + ContentType: "application/json", + } + + invalidContentType := "bogus" + invalidDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: invalidContentType, + Body: []byte(`bogus`), + Headers: testHeaders, + }, + } + invalidJsonResponse := fmt.Sprintf( + `{"status":{"info":"Prediction Failed","reason":"unknown payload type '%v'","status":"FAILURE"}}`, + invalidContentType, + ) + invalidPayloadResponse := + &payload.BytesPayload{ + Msg: []byte(invalidJsonResponse), + ContentType: "application/json", + } + + // test predictor process returns the request payload as the response + errorJson := fmt.Sprintf( `{"status":{"info":"Prediction Failed","reason":"unknown payload type 'bogus'","status":"FAILURE"},"meta":{"puid":"%v"}}`, testPuid, ) - invalidErrorPublishing := amqp.Publishing{ - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(invalidErrorJsonResponse), - Headers: testHeaders, + errorPayloadResponse := + &payload.BytesPayload{ + Msg: []byte(errorJson), + ContentType: "application/json", + } + errorDelivery := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + ContentType: "application/json", + DeliveryMode: rabbitmq.Persistent, + Body: []byte(errorJson), + Headers: testHeaders, + }, } t.Run("create server", func(t *testing.T) { @@ -131,144 +198,246 @@ func TestRabbitMqServer(t *testing.T) { }) /* - * This makes sure the Serve() and predictAndPublishResponse() code runs and makes the proper calls - * by hacking a bunch of mocks. + * This makes sure the serve() code runs and makes the proper calls by setting up mocks. * It is not doing anything to validate the messages are properly processed. That's challenging in a * unit test since the code connects to RabbitMQ. */ t.Run("serve", func(t *testing.T) { - mockRmqConn := &mockConnection{} - mockRmqChan := &mockChannel{} - mockConn := &connection{ - conn: mockRmqConn, - channel: mockRmqChan, - } + mockRmqConn := new(MockConnectionWrapper) - testDelivery := amqp.Delivery{ - Acknowledger: mockRmqChan, - ContentType: rest.ContentTypeJSON, - Body: []byte(`{ "data": { "ndarray": [[1,2,3,4]] } }`), - Headers: testHeaders, - } + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") - mockDeliveries := make(chan amqp.Delivery, 1) - mockDeliveries <- testDelivery - close(mockDeliveries) + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) - mockRmqChan.On("Consume", inputQueue, mock.Anything, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, nil) - mockRmqChan.On("Publish", "", outputQueue, publishMandatory, publishImmediate, - mock.MatchedBy(func(p amqp.Publishing) bool { return true })).Return(nil) - mockRmqChan.On("Ack", uint64(0), false).Return(nil) + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) - err := testServer.serve(mockConn) + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") assert.NoError(t, err) - mockRmqChan.AssertExpectations(t) + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) }) - /* - * This makes sure the Serve(), predictAndPublishResponse(), and createAndPublishErrorResponse() code runs and - * makes the proper calls and returns an appropriate error message by hacking a bunch of mocks. - */ - t.Run("serveError", func(t *testing.T) { - mockRmqConn := &mockConnection{} - mockRmqChan := &mockChannel{} - mockConn := &connection{ - conn: mockRmqConn, - channel: mockRmqChan, - } + t.Run("process valid request", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) - invalidDelivery := amqp.Delivery{ - Acknowledger: mockRmqChan, - ContentType: "bogus", - Body: []byte(`bogus`), - Headers: testHeaders, - Redelivered: true, - } + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") - mockDeliveries := make(chan amqp.Delivery, 1) - mockDeliveries <- invalidDelivery - close(mockDeliveries) + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) - mockRmqChan.On("Consume", inputQueue, mock.Anything, false, false, false, false, - amqp.Table{}).Return(mockDeliveries, nil) - mockRmqChan.On("Publish", "", outputQueue, publishMandatory, publishImmediate, - invalidErrorPublishing).Return(nil) - mockRmqChan.On("Reject", uint64(0), false).Return(nil) + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) - err := testServer.serve(mockConn) + mockPublisher.On("Publish", validPayloadResponseWithHeaders, outputQueue).Return(nil) + action := testConsumer.SimulateDelivery(validRequestDelivery) + assert.Equal(t, rabbitmq.Ack, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") assert.NoError(t, err) - mockRmqChan.AssertExpectations(t) + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) }) - t.Run("createAndPublishErrorResponse", func(t *testing.T) { - mockRmqConn := &mockConnection{} - mockRmqChan := &mockChannel{} - mockConn := &connection{ - conn: mockRmqConn, - channel: mockRmqChan, - } + t.Run("process valid request missing puid", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) + + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") + + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) + + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) + + mockPublisher.On( + "Publish", + mock.MatchedBy(matchingSeldonPayloadsWithPuid(t, validPayloadResponseNoPuid, false)), + outputQueue, + ).Return(nil) + action := testConsumer.SimulateDelivery(validRequestNoPuidDelivery) + assert.Equal(t, rabbitmq.Ack, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") - testDelivery := amqp.Delivery{ - Acknowledger: mockRmqChan, - ContentType: rest.ContentTypeJSON, - Body: []byte(`{ "data": { "ndarray": [[1,2,3,4]] } }`), - Headers: testHeaders, - } + assert.NoError(t, err) - publisher := &publisher{*mockConn, outputQueue} + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) + }) - // valid payload - error1Text := "error 1" - error1 := errors.New(error1Text) - generatedErrorPublishing1 := amqp.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf( - `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, - error1Text, - testPuid, - )), - DeliveryMode: amqp.Persistent, - Headers: testHeaders, - } - mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing1).Return(nil) - pl1, _ := DeliveryToPayload(testDelivery) - consumerError1 := ConsumerError{ - err: error1, - delivery: testDelivery, - pl: pl1, - } - err1 := testServer.createAndPublishErrorResponse(consumerError1, publisher) - assert.NoError(t, err1) + t.Run("process invalid request", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) + + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") + + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) + + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) + + // test that payload is expected and Puid is added + mockPublisher.On( + "Publish", + mock.MatchedBy(matchingSeldonPayloadsWithPuid(t, invalidPayloadResponse, true)), + outputQueue, + ).Return(nil) + action := testConsumer.SimulateDelivery(invalidDelivery) + assert.Equal(t, rabbitmq.NackDiscard, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") - mockRmqChan.AssertExpectations(t) + assert.NoError(t, err) - // no payload - error2Text := "error 2" - error2 := errors.New(error2Text) - generatedErrorPublishing2 := amqp.Publishing{ - ContentType: "application/json", - DeliveryMode: amqp.Persistent, - Body: []byte(fmt.Sprintf( - `{"status":{"info":"Prediction Failed","reason":"%v","status":"FAILURE"},"meta":{"puid":"%v"}}`, - error2Text, - testPuid, - )), - Headers: testHeaders, + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) + }) + + t.Run("process error response", func(t *testing.T) { + mockRmqConn := new(MockConnectionWrapper) + + mockPublisher := new(MockPublisherWrapper) + mockRmqConn.On("NewPublisher").Return(mockPublisher, nil) + mockPublisher.On("Close") + + testConsumer := new(TestConsumerWrapper) + testConsumer.isClosed = false + mockRmqConn.On( + "NewConsumer", + mock.Anything, + inputQueue, + mock.Anything, + ).Return(testConsumer, nil) + + wg := new(sync.WaitGroup) + termChan, err := testServer.serve(mockRmqConn, wg) + + mockPublisher.On( + "Publish", + mock.MatchedBy(matchingSeldonPayloadsWithPuid(t, errorPayloadResponse, true)), + outputQueue, + ).Return(nil) + action := testConsumer.SimulateDelivery(errorDelivery) + assert.Equal(t, rabbitmq.Ack, action) + + termChan <- true + t.Log("waiting") + wg.Wait() + t.Log("done waiting") + + assert.NoError(t, err) + + mockRmqConn.AssertExpectations(t) + mockPublisher.AssertExpectations(t) + assert.True(t, testConsumer.isClosed) + }) +} + +func addPuid(pl payload.SeldonPayload, puid string) (payload.SeldonPayload, error) { + switch pl.GetContentType() { + case rest.ContentTypeJSON: + requestBody := &proto.SeldonMessage{} + err := jsonpb.UnmarshalString(string(pl.GetPayload().([]byte)), requestBody) + if err != nil { + return nil, err } - mockRmqChan.On("Publish", "", outputQueue, true, false, generatedErrorPublishing2).Return(nil) - consumerError2 := ConsumerError{ - err: error2, - delivery: testDelivery, + requestBody.Meta = &proto.Meta{ + Puid: puid, } - err2 := testServer.createAndPublishErrorResponse(consumerError2, publisher) - assert.NoError(t, err2) + ma := jsonpb.Marshaler{} + marshaled, err := ma.MarshalToString(requestBody) + if err != nil { + return nil, err + } + return &payload.BytesPayload{ + Msg: []byte(marshaled), + ContentType: rest.ContentTypeJSON, + }, nil + default: + return nil, fmt.Errorf("unsupported content type '%v'", pl.GetContentType()) + } +} - mockRmqChan.AssertExpectations(t) - }) +func samePayloads(pl1 payload.SeldonPayload, pl2 payload.SeldonPayload) bool { + providedBytes, _ := pl1.GetBytes() + referenceBytes, _ := pl2.GetBytes() + return bytes.Equal(providedBytes, referenceBytes) && + pl1.GetContentType() == pl2.GetContentType() && + pl1.GetContentEncoding() == pl2.GetContentEncoding() +} +func matchingSeldonPayloadsWithPuid( + t *testing.T, + reference payload.SeldonPayload, + addPuidToPayload bool, +) func(SeldonPayloadWithHeaders) bool { + return func(pl SeldonPayloadWithHeaders) bool { + puidHeader := pl.Headers[payload.SeldonPUIDHeader] + var referencePayloadToUse payload.SeldonPayload + var err error + if addPuidToPayload { + referencePayloadToUse, err = addPuid(reference, puidHeader[0]) + if err != nil { + t.Logf("error adding puid %v", err) + return false + } + } else { + referencePayloadToUse = reference + } + return puidHeader != nil && samePayloads(pl.Payload, referencePayloadToUse) + } } diff --git a/executor/api/rabbitmq/types.go b/executor/api/rabbitmq/types.go index 32348061f0..14cb9a6d63 100644 --- a/executor/api/rabbitmq/types.go +++ b/executor/api/rabbitmq/types.go @@ -1,65 +1,38 @@ package rabbitmq import ( - "fmt" "github.com/seldonio/seldon-core/executor/api/payload" + "github.com/wagslane/go-rabbitmq" "io" - - amqp "github.com/rabbitmq/amqp091-go" ) -/* - * adapted from https://github.com/dominodatalab/forge/blob/master/internal/message/amqp/amqp_test.go - */ - -// default implementation leverages the real "streadway/amqp" dialer -var defaultDialerAdapter DialerAdapter = func(url string) (Connection, error) { - conn, err := amqp.Dial(url) - if err != nil { - return nil, fmt.Errorf("error %w dialing rabbitmq at url %v", err, url) - } - - return ConnectionAdapter{conn}, nil -} - -// DialerAdapter is a function that returns a handle to a Connection type. -type DialerAdapter func(url string) (Connection, error) +// wrapper around `rabbitmq.Conn` +type ConnectionWrapper interface { + io.Closer -// ConnectionAdapter adapts the amqp.Connection type so that it adheres to our libraries interfaces. -type ConnectionAdapter struct { - *amqp.Connection + NewPublisher() (PublisherWrapper, error) + NewConsumer(handler rabbitmq.Handler, queue string, consumerTag string) (ConsumerWrapper, error) } -// Channel adapts an amqp.Channel to our Channel interface. -func (c ConnectionAdapter) Channel() (Channel, error) { - return c.Connection.Channel() +type ConsumerWrapper interface { + Close() } -// Connection defines the AMQP connections operations required by this library. -type Connection interface { - io.Closer - - Channel() (Channel, error) - NotifyClose(receiver chan *amqp.Error) chan *amqp.Error +type PublisherWrapper interface { + Close() + Publish( + payload SeldonPayloadWithHeaders, + queueName string, + ) error } -// Channel defines the AMQP channel operations required by this library. -type Channel interface { - QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) ( - amqp.Queue, error, - ) - Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error - Consume( - name string, consumerTag string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table, - ) (<-chan amqp.Delivery, error) - Ack(tag uint64, multiple bool) error - Nack(tag uint64, multiple bool, requeue bool) error - Reject(tag uint64, requeue bool) error - Qos(prefetchCount int, prefetchSize int, global bool) error - NotifyClose(receiver chan *amqp.Error) chan *amqp.Error +type ConsumerError struct { + err error + delivery rabbitmq.Delivery + pl *SeldonPayloadWithHeaders // might be nil } type SeldonPayloadWithHeaders struct { - payload.SeldonPayload + Payload payload.SeldonPayload Headers map[string][]string } diff --git a/executor/api/rabbitmq/utils.go b/executor/api/rabbitmq/utils.go index 960e74fb64..9c5c6aea09 100644 --- a/executor/api/rabbitmq/utils.go +++ b/executor/api/rabbitmq/utils.go @@ -8,6 +8,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/grpc/seldon/proto" "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/rest" + rabbitmq "github.com/wagslane/go-rabbitmq" ) func TableToStringMap(t amqp.Table) map[string][]string { @@ -18,7 +19,7 @@ func TableToStringMap(t amqp.Table) map[string][]string { return stringMap } -func StringMapToTable(m map[string][]string) amqp.Table { +func StringMapToTable(m map[string][]string) rabbitmq.Table { table := make(map[string]interface{}) for key, values := range m { // just take the first value, at least for now @@ -27,7 +28,7 @@ func StringMapToTable(m map[string][]string) amqp.Table { return table } -func DeliveryToPayload(delivery amqp.Delivery) (*SeldonPayloadWithHeaders, error) { +func DeliveryToPayload(delivery rabbitmq.Delivery) (*SeldonPayloadWithHeaders, error) { var pl *SeldonPayloadWithHeaders = nil var err error = nil diff --git a/executor/api/rabbitmq/utils_test.go b/executor/api/rabbitmq/utils_test.go index 8f43ede6cb..07f99b94f4 100644 --- a/executor/api/rabbitmq/utils_test.go +++ b/executor/api/rabbitmq/utils_test.go @@ -8,6 +8,7 @@ import ( "github.com/seldonio/seldon-core/executor/api/payload" "github.com/seldonio/seldon-core/executor/api/rest" "github.com/stretchr/testify/assert" + "github.com/wagslane/go-rabbitmq" "google.golang.org/protobuf/types/known/structpb" "testing" @@ -26,7 +27,7 @@ func TestStringMapTableFunctions(t *testing.T) { "key1": {"value1", "value2"}, "key2": {"45"}, } - derivedTable1 := amqp.Table{ + derivedTable1 := rabbitmq.Table{ "key1": "value1", "key2": "45", } @@ -44,10 +45,12 @@ func TestStringMapTableFunctions(t *testing.T) { func TestDeliveryToPayload(t *testing.T) { bytesBody := []byte(`{"status":{"status":0},"strData":"\"hello\""}`) - testDeliveryRest := amqp.Delivery{ - Body: bytesBody, - ContentType: rest.ContentTypeJSON, - ContentEncoding: "", + testDeliveryRest := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + Body: bytesBody, + ContentType: rest.ContentTypeJSON, + ContentEncoding: "", + }, } protoMessage := &proto.SeldonMessage{ Status: &proto.Status{ @@ -60,27 +63,29 @@ func TestDeliveryToPayload(t *testing.T) { } protoMessageEnc, _ := proto2.Marshal(protoMessage) protoMessage.XXX_sizecache = 0 // to make test cases match - testDeliveryProto := amqp.Delivery{ - Body: protoMessageEnc, - ContentType: payload.APPLICATION_TYPE_PROTOBUF, - ContentEncoding: "", + testDeliveryProto := rabbitmq.Delivery{ + Delivery: amqp.Delivery{ + Body: protoMessageEnc, + ContentType: payload.APPLICATION_TYPE_PROTOBUF, + ContentEncoding: "", + }, } t.Run("proto payload", func(t *testing.T) { pl, err := DeliveryToPayload(testDeliveryProto) assert.NoError(t, err) - assert.Equal(t, protoMessage, pl.GetPayload()) + assert.Equal(t, protoMessage, pl.Payload.GetPayload()) }) t.Run("rest payload", func(t *testing.T) { pl, err := DeliveryToPayload(testDeliveryRest) assert.NoError(t, err) - assert.Equal(t, bytesBody, pl.GetPayload()) + assert.Equal(t, bytesBody, pl.Payload.GetPayload()) body := &proto.SeldonMessage{} - err = jsonpb.UnmarshalString(string(pl.GetPayload().([]byte)), body) + err = jsonpb.UnmarshalString(string(pl.Payload.GetPayload().([]byte)), body) assert.NoError(t, err) assert.Equal(t, protoMessage, body) diff --git a/executor/go.mod b/executor/go.mod index d687d27dc6..294c08760c 100644 --- a/executor/go.mod +++ b/executor/go.mod @@ -16,11 +16,12 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/prometheus/common v0.34.0 - github.com/rabbitmq/amqp091-go v1.3.4 + github.com/rabbitmq/amqp091-go v1.5.0 github.com/seldonio/seldon-core/operator v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.8.0 github.com/tensorflow/tensorflow/tensorflow/go/core v0.0.0-00010101000000-000000000000 github.com/uber/jaeger-client-go v2.25.0+incompatible + github.com/wagslane/go-rabbitmq v0.12.1 go.uber.org/automaxprocs v1.4.0 go.uber.org/zap v1.19.1 golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f diff --git a/executor/go.sum b/executor/go.sum index 4b751fb330..d2ea40cc60 100644 --- a/executor/go.sum +++ b/executor/go.sum @@ -486,8 +486,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU= -github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -522,8 +522,6 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= -github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -532,7 +530,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -544,6 +541,8 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/wagslane/go-rabbitmq v0.12.1 h1:A3ec8wmP3hr1SKsbXwFaf42xXd5D7yAeJfdFZJydKlU= +github.com/wagslane/go-rabbitmq v0.12.1/go.mod h1:jTSN7opv/tmphx0MYaRR/++HQCuhxrBZEyTd0xCym2c= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -594,6 +593,7 @@ go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhW go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/executor/licenses/dep.txt b/executor/licenses/dep.txt index a938fda166..90a208afe3 100644 --- a/executor/licenses/dep.txt +++ b/executor/licenses/dep.txt @@ -342,6 +342,7 @@ github.com/ulikunitz/unixtime github.com/valyala/bytebufferpool github.com/valyala/fasthttp github.com/wagslane/go-password-validator +github.com/wagslane/go-rabbitmq github.com/xdg-go/pbkdf2 github.com/xdg-go/scram github.com/xdg-go/stringprep diff --git a/executor/licenses/license.txt b/executor/licenses/license.txt index d092d3c2a8..d3f2b81581 100644 --- a/executor/licenses/license.txt +++ b/executor/licenses/license.txt @@ -394,28 +394,28 @@ Azure/azure-storage-queue-go MIT License https://github.com/Azure/azure-storag -------------------------------------------------------------------------------- Azure/go-amqp MIT License https://github.com/Azure/go-amqp/blob/main/LICENSE -------------------------------------------------------------------------------- - MIT License - - Copyright (C) 2017 Kale Blankenship - Portions Copyright (C) Microsoft Corporation - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE + MIT License + + Copyright (C) 2017 Kale Blankenship + Portions Copyright (C) Microsoft Corporation + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE -------------------------------------------------------------------------------- Azure/go-ansiterm MIT License https://github.com/Azure/go-ansiterm/blob/master/LICENSE @@ -32531,6 +32531,31 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +wagslane/go-rabbitmq MIT License https://github.com/wagslane/go-rabbitmq/blob/main/LICENSE +-------------------------------------------------------------------------------- +MIT License + +Copyright (c) 2021 Lane Wagner + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- xdg-go/pbkdf2 Apache License 2.0 https://github.com/xdg-go/pbkdf2/blob/main/LICENSE -------------------------------------------------------------------------------- diff --git a/executor/licenses/license_info.csv b/executor/licenses/license_info.csv index 7c300b67d6..5f202a806c 100644 --- a/executor/licenses/license_info.csv +++ b/executor/licenses/license_info.csv @@ -306,6 +306,7 @@ ulikunitz/unixtime,https://github.com/ulikunitz/unixtime/blob/master/LICENSE,BSD valyala/bytebufferpool,https://github.com/valyala/bytebufferpool/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/valyala/bytebufferpool/master/LICENSE valyala/fasthttp,https://github.com/valyala/fasthttp/blob/master/LICENSE,MIT License,https://raw.githubusercontent.com/valyala/fasthttp/master/LICENSE wagslane/go-password-validator,https://github.com/wagslane/go-password-validator/blob/main/LICENSE,MIT License,https://raw.githubusercontent.com/wagslane/go-password-validator/main/LICENSE +wagslane/go-rabbitmq,https://github.com/wagslane/go-rabbitmq/blob/main/LICENSE,MIT License,https://raw.githubusercontent.com/wagslane/go-rabbitmq/main/LICENSE xdg-go/pbkdf2,https://github.com/xdg-go/pbkdf2/blob/main/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/xdg-go/pbkdf2/main/LICENSE xdg-go/scram,https://github.com/xdg-go/scram/blob/master/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/xdg-go/scram/master/LICENSE xdg-go/stringprep,https://github.com/xdg-go/stringprep/blob/master/LICENSE,Apache License 2.0,https://raw.githubusercontent.com/xdg-go/stringprep/master/LICENSE diff --git a/executor/licenses/repo.txt b/executor/licenses/repo.txt index 0a8cbcb199..88db4723f5 100644 --- a/executor/licenses/repo.txt +++ b/executor/licenses/repo.txt @@ -308,6 +308,7 @@ ulikunitz/unixtime valyala/bytebufferpool valyala/fasthttp wagslane/go-password-validator +wagslane/go-rabbitmq xdg-go/pbkdf2 xdg-go/scram xdg-go/stringprep diff --git a/licenses/additional_license_info.csv b/licenses/additional_license_info.csv index 9125dfeca3..d6578827d7 100644 --- a/licenses/additional_license_info.csv +++ b/licenses/additional_license_info.csv @@ -114,4 +114,4 @@ https://github.com/go-httprequest/httprequest/blob/v1/LICENSE,The GNU lesser Gen https://github.com/go-mgo/mgo/blob/v2-unstable/LICENSE,BSD 2-Clause "Simplified" License https://github.com/go-retry/retry/blob/v1/LICENSE,The GNU lesser General Public License Version 3.0 with exceptions for static and dynamic linking https://github.com/kubernetes-sigs/json/blob/main/LICENSE,Apache License 2.0 -https://github.com/invopop/yaml/blob/main/LICENSE,MIT License \ No newline at end of file +https://github.com/invopop/yaml/blob/main/LICENSE,MIT License diff --git a/operator/testing/machinelearning.seldon.io_seldondeployments.yaml b/operator/testing/machinelearning.seldon.io_seldondeployments.yaml index e217c49b79..b85182bf3b 100644 --- a/operator/testing/machinelearning.seldon.io_seldondeployments.yaml +++ b/operator/testing/machinelearning.seldon.io_seldondeployments.yaml @@ -3,7 +3,7 @@ kind: CustomResourceDefinition metadata: annotations: cert-manager.io/inject-ca-from: seldon-system/seldon-serving-cert - controller-gen.kubebuilder.io/version: v0.9.2 + controller-gen.kubebuilder.io/version: v0.7.0 labels: app: seldon app.kubernetes.io/instance: seldon1