Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[full-ci] fix: always select next gateway client #10133

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-select-next-gateway-client.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Always select next gateway client

We now use the gateway selector to always select the next gateway client. This ensures that we can always connect to the gateway during up- and downscaling.

https://github.com/owncloud/ocis/pull/10133
7 changes: 7 additions & 0 deletions services/collaboration/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ packages:
dir: "mocks"
interfaces:
LockParser:
github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool:
config:
dir: "mocks"
interfaces:
Selectable:
config:
filename: "gateway_selector.go"
104 changes: 104 additions & 0 deletions services/collaboration/mocks/gateway_selector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions services/collaboration/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"net"

"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
registry "github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config/parser"
Expand Down Expand Up @@ -44,17 +46,24 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gwc, err := helpers.GetCS3apiClient(cfg, false)
micbar marked this conversation as resolved.
Show resolved Hide resolved
tm, err := pool.StringToTLSMode(cfg.Commons.GRPCClientTLS.Mode)
if err != nil {
return err
}
gatewaySelector, err := pool.GatewaySelector(
cfg.CS3Api.Gateway.Name,
pool.WithTLSCACert(cfg.Commons.GRPCClientTLS.CACert),
pool.WithTLSMode(tm),
pool.WithRegistry(registry.GetRegistry()),
pool.WithTracerProvider(traceProvider),
)

appUrls, err := helpers.GetAppURLs(cfg, logger)
if err != nil {
return err
}

if err := helpers.RegisterAppProvider(ctx, cfg, logger, gwc, appUrls); err != nil {
if err := helpers.RegisterAppProvider(ctx, cfg, logger, gatewaySelector, appUrls); err != nil {
return err
}

Expand Down Expand Up @@ -101,7 +110,7 @@ func Server(cfg *config.Config) *cli.Command {

// start HTTP server
httpServer, err := http.Server(
http.Adapter(connector.NewHttpAdapter(gwc, cfg)),
http.Adapter(connector.NewHttpAdapter(gatewaySelector, cfg)),
http.Logger(logger),
http.Config(cfg),
http.Context(ctx),
Expand Down
7 changes: 5 additions & 2 deletions services/collaboration/pkg/config/cs3api.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package config

import "github.com/owncloud/ocis/v2/ocis-pkg/shared"

// CS3Api defines the available configuration in order to access to the CS3 gateway.
type CS3Api struct {
Gateway Gateway `yaml:"gateway"`
DataGateway DataGateway `yaml:"datagateway"`
Gateway Gateway `yaml:"gateway"`
DataGateway DataGateway `yaml:"datagateway"`
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
}

// Gateway defines the available configuration for the CS3 API gateway
Expand Down
4 changes: 4 additions & 0 deletions services/collaboration/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package defaults

import (
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/ocis-pkg/structs"
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
)

Expand Down Expand Up @@ -93,6 +94,9 @@ func EnsureDefaults(cfg *config.Config) {
} else if cfg.TokenManager == nil {
cfg.TokenManager = &config.TokenManager{}
}
if cfg.CS3Api.GRPCClientTLS == nil && cfg.Commons != nil {
cfg.CS3Api.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
}
}

// Sanitize sanitized the configuration
Expand Down
37 changes: 29 additions & 8 deletions services/collaboration/pkg/connector/contentconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/services/collaboration/pkg/config"
"github.com/owncloud/ocis/v2/services/collaboration/pkg/middleware"
Expand Down Expand Up @@ -42,14 +43,14 @@ type ContentConnectorService interface {
// uploads (PutFile)
// Note that operations might return any kind of error, not just ConnectorError
type ContentConnector struct {
gwc gatewayv1beta1.GatewayAPIClient
gws pool.Selectable[gatewayv1beta1.GatewayAPIClient]
cfg *config.Config
}

// NewContentConnector creates a new content connector
func NewContentConnector(gwc gatewayv1beta1.GatewayAPIClient, cfg *config.Config) *ContentConnector {
func NewContentConnector(gws pool.Selectable[gatewayv1beta1.GatewayAPIClient], cfg *config.Config) *ContentConnector {
return &ContentConnector{
gwc: gwc,
gws: gws,
cfg: cfg,
}
}
Expand All @@ -76,7 +77,11 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
Logger()
logger.Debug().Msg("GetFile: start")

sResp, err := c.gwc.Stat(ctx, &providerv1beta1.StatRequest{
gwc, err := c.gws.Next()
if err != nil {
return err
}
sResp, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
Ref: wopiContext.FileReference,
})
if err != nil {
Expand All @@ -94,7 +99,11 @@ func (c *ContentConnector) GetFile(ctx context.Context, w http.ResponseWriter) e
if wopiContext.ViewMode == appproviderv1beta1.ViewMode_VIEW_MODE_VIEW_ONLY && wopiContext.ViewOnlyToken != "" {
ctx = revactx.ContextSetToken(ctx, wopiContext.ViewOnlyToken)
}
resp, err := c.gwc.InitiateFileDownload(ctx, req)
gwc, err = c.gws.Next()
if err != nil {
return err
}
resp, err := gwc.InitiateFileDownload(ctx, req)
if err != nil {
logger.Error().Err(err).Msg("GetFile: InitiateFileDownload failed")
return err
Expand Down Expand Up @@ -227,9 +236,13 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
Logger()
logger.Debug().Msg("PutFile: start")

gwc, err := c.gws.Next()
if err != nil {
return nil, err
}
// We need a stat call on the target file in order to get both the lock
// (if any) and the current size of the file
statRes, err := c.gwc.Stat(ctx, &providerv1beta1.StatRequest{
statRes, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
Ref: wopiContext.FileReference,
})
if err != nil {
Expand Down Expand Up @@ -286,8 +299,12 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
},
}

gwc, err = c.gws.Next()
if err != nil {
return nil, err
}
// Initiate the upload request
resp, err := c.gwc.InitiateFileUpload(ctx, req)
resp, err := gwc.InitiateFileUpload(ctx, req)
if err != nil {
logger.Error().Err(err).Msg("UploadHelper: InitiateFileUpload failed")
return nil, err
Expand Down Expand Up @@ -383,9 +400,13 @@ func (c *ContentConnector) PutFile(ctx context.Context, stream io.Reader, stream
Msg("UploadHelper: Put request to the upload endpoint failed with unexpected status")
return NewResponse(500), nil
}
gwc, err = c.gws.Next()
if err != nil {
return nil, err
}
// We need a stat call on the target file after the upload to get the
// new mtime
statResAfter, err := c.gwc.Stat(ctx, &providerv1beta1.StatRequest{
statResAfter, err := gwc.Stat(ctx, &providerv1beta1.StatRequest{
Ref: wopiContext.FileReference,
})
if err != nil {
Expand Down
20 changes: 14 additions & 6 deletions services/collaboration/pkg/connector/contentconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/owncloud/ocis/v2/services/collaboration/mocks"
"github.com/stretchr/testify/mock"

appproviderv1beta1 "github.com/cs3org/go-cs3apis/cs3/app/provider/v1beta1"
Expand All @@ -27,10 +28,11 @@ import (

var _ = Describe("ContentConnector", func() {
var (
cc *connector.ContentConnector
gatewayClient *cs3mocks.GatewayAPIClient
cfg *config.Config
wopiCtx middleware.WopiContext
cc *connector.ContentConnector
gatewayClient *cs3mocks.GatewayAPIClient
gatewaySelector *mocks.Selectable[gateway.GatewayAPIClient]
cfg *config.Config
wopiCtx middleware.WopiContext

srv *httptest.Server
srvReqHeader http.Header
Expand All @@ -40,8 +42,11 @@ var _ = Describe("ContentConnector", func() {
BeforeEach(func() {
// contentConnector only uses "cfg.CS3Api.DataGateway.Insecure", which is irrelevant for the tests
cfg = &config.Config{}
gatewayClient = &cs3mocks.GatewayAPIClient{}
cc = connector.NewContentConnector(gatewayClient, cfg)
gatewayClient = cs3mocks.NewGatewayAPIClient(GinkgoT())

gatewaySelector = mocks.NewSelectable[gateway.GatewayAPIClient](GinkgoT())
gatewaySelector.On("Next").Return(gatewayClient, nil)
cc = connector.NewContentConnector(gatewaySelector, cfg)

wopiCtx = middleware.WopiContext{
AccessToken: "abcdef123456",
Expand Down Expand Up @@ -91,6 +96,8 @@ var _ = Describe("ContentConnector", func() {
}, nil)
})
It("No valid context", func() {
gatewaySelector.EXPECT().Next().Unset()
gatewayClient.EXPECT().Stat(mock.Anything, mock.Anything).Unset()
sb := httptest.NewRecorder()
ctx := context.Background()
err := cc.GetFile(ctx, sb)
Expand Down Expand Up @@ -226,6 +233,7 @@ var _ = Describe("ContentConnector", func() {

Describe("PutFile", func() {
It("No valid context", func() {
gatewaySelector.EXPECT().Next().Unset()
reader := strings.NewReader("Content to upload is here!")
ctx := context.Background()
response, err := cc.PutFile(ctx, reader, reader.Size(), "notARandomLockId")
Expand Down
Loading