Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBD integration into controller and replica #1109

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Dockerfile.dapper
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ RUN cd /go/src/github.com/longhorn && \
cp -r integration/rpc/ ${DAPPER_SOURCE}/integration/rpc/ && \
cp longhorn-instance-manager /usr/local/bin

# Build libnbd
RUN zypper -n install libxml2-devel
RUN cd /usr/src && \
wget -O - https://download.libguestfs.org/libnbd/1.13-development/libnbd-1.13.1.tar.gz | tar -xzf - && \
cd libnbd-1.13.1 && \
./configure --disable-ocaml --disable-python --disable-golang; \
make; \
make install

VOLUME /tmp
ENV TMPDIR /tmp
ENTRYPOINT ["./scripts/entry"]
Expand Down
24 changes: 21 additions & 3 deletions app/cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func ControllerCmd() cli.Command {
Value: int64(controller.DefaultEngineReplicaTimeout.Seconds()),
Usage: "In seconds. Timeout between engine and replica(s)",
},
cli.IntFlag{
Name: "frontend-streams",
Required: false,
Value: 1,
Usage: "Number of concurrent streams to the frontend",
},
cli.StringFlag{
Name: "data-server-protocol",
Value: "tcp",
Expand All @@ -83,6 +89,12 @@ func ControllerCmd() cli.Command {
Value: 5,
Usage: "HTTP client timeout for replica file sync server",
},
cli.IntFlag{
Name: "nbd-enabled",
Required: false,
Value: 0,
Usage: "Flag to enable NBD data server. Option 0 to disable, options >0 for number of connections",
},
},
Action: func(c *cli.Context) {
if err := startController(c); err != nil {
Expand Down Expand Up @@ -114,6 +126,7 @@ func startController(c *cli.Context) error {
dataServerProtocol := c.String("data-server-protocol")
fileSyncHTTPClientTimeout := c.Int("file-sync-http-client-timeout")
engineInstanceName := c.GlobalString("engine-instance-name")
nbdEnabled := c.Int("nbd-enabled")

size := c.String("size")
if size == "" {
Expand All @@ -138,6 +151,11 @@ func startController(c *cli.Context) error {
engineReplicaTimeout = controller.DetermineEngineReplicaTimeout(engineReplicaTimeout)
iscsiTargetRequestTimeout := controller.DetermineIscsiTargetRequestTimeout(engineReplicaTimeout)

frontendStreams := c.Int("frontend-streams")
if frontendStreams < 1 {
return errors.New("at least one stream to the frontend is required")
}

factories := map[string]types.BackendFactory{}
for _, backend := range backends {
switch backend {
Expand All @@ -152,7 +170,7 @@ func startController(c *cli.Context) error {

var frontend types.Frontend
if frontendName != "" {
f, err := controller.NewFrontend(frontendName, iscsiTargetRequestTimeout)
f, err := controller.NewFrontend(frontendName, frontendStreams, iscsiTargetRequestTimeout)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontendName)
}
Expand All @@ -161,9 +179,9 @@ func startController(c *cli.Context) error {

logrus.Infof("Creating volume %v controller with iSCSI target request timeout %v and engine to replica(s) timeout %v",
volumeName, iscsiTargetRequestTimeout, engineReplicaTimeout)
control := controller.NewController(volumeName, dynamic.New(factories), frontend, isUpgrade, disableRevCounter, salvageRequested,
control := controller.NewController(volumeName, dynamic.New(factories), frontend, frontendStreams, isUpgrade, disableRevCounter, salvageRequested,
unmapMarkSnapChainRemoved, iscsiTargetRequestTimeout, engineReplicaTimeout, types.DataServerProtocol(dataServerProtocol),
fileSyncHTTPClientTimeout)
fileSyncHTTPClientTimeout, nbdEnabled)

// need to wait for Shutdown() completion
control.ShutdownWG.Add(1)
Expand Down
9 changes: 8 additions & 1 deletion app/cmd/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ func ReplicaCmd() cli.Command {
Value: "",
Usage: "Name of the replica instance (for validation purposes)",
},
cli.IntFlag{
Name: "nbd-enabled",
Required: false,
Value: 0,
Usage: "Flag to enable NBD data server",
},
},
Action: func(c *cli.Context) {
if err := startReplica(c); err != nil {
Expand Down Expand Up @@ -115,6 +121,7 @@ func startReplica(c *cli.Context) error {
volumeName := c.GlobalString("volume-name")
replicaInstanceName := c.String("replica-instance-name")
dataServerProtocol := c.String("data-server-protocol")
nbdEnabled := c.Int("nbd-enabled")

controlAddress, dataAddress, syncAddress, syncPort, err :=
util.GetAddresses(volumeName, address, types.DataServerProtocol(dataServerProtocol))
Expand All @@ -141,7 +148,7 @@ func startReplica(c *cli.Context) error {
}()

go func() {
rpcServer := replicarpc.NewDataServer(types.DataServerProtocol(dataServerProtocol), dataAddress, s)
rpcServer := replicarpc.NewDataServer(types.DataServerProtocol(dataServerProtocol), dataAddress, s, nbdEnabled)
logrus.Infof("Listening on data server %s", dataAddress)
err := rpcServer.ListenAndServe()
logrus.WithError(err).Warnf("Replica rest server at %v is down", dataAddress)
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ require (
github.com/longhorn/sparse-tools v0.0.0-20230408015858-c849def39d3c
github.com/moby/moby v23.0.2+incompatible
github.com/pkg/errors v0.9.1
github.com/pmorjan/kmod v1.1.0
github.com/pojntfx/go-nbd v0.3.2
github.com/rancher/go-fibmap v0.0.0-20160418233256-5fc9f8c1ed47
github.com/rancher/go-rancher v0.1.1-0.20190307222549-9756097e5e4c
github.com/sirupsen/logrus v1.9.0
Expand All @@ -23,6 +25,7 @@ require (
google.golang.org/grpc v1.53.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/cheggaaa/pb.v2 v2.0.0-20190301131520-f907f6f5dd81
libguestfs.org/libnbd v1.13.1
)

require (
Expand Down Expand Up @@ -59,6 +62,7 @@ require (
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/slok/goresilience v0.2.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand All @@ -69,3 +73,5 @@ require (
k8s.io/mount-utils v0.27.1 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
)

replace github.com/pojntfx/go-nbd => github.com/chazapis/go-nbd v0.0.0-20231031233644-40daa63e22c3
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/c9s/goprocinfo v0.0.0-20190309065803-0b2ad9ac246b h1:4yfM1Zm+7U+m0inJ
github.com/c9s/goprocinfo v0.0.0-20190309065803-0b2ad9ac246b/go.mod h1:uEyr4WpAH4hio6LFriaPkL938XnrvLpNPmQHBdrmbIE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chazapis/go-nbd v0.0.0-20231031233644-40daa63e22c3 h1:5DIm8VSCX68W/CehdyRTPnGMY+W807gpHDSlukpBSO4=
github.com/chazapis/go-nbd v0.0.0-20231031233644-40daa63e22c3/go.mod h1:6rBj1gu9NwR0nOEYJgdafavnKlVBitsdRmAi1GG34Es=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -96,6 +98,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmorjan/kmod v1.1.0 h1:ZLb0WalLhz4ENECpySrXMgRqFfBkoqWju680MWL5X94=
github.com/pmorjan/kmod v1.1.0/go.mod h1:iGxkdcq8DCjMw61SXKPMxG7taOrEqjNTIQPPgfvgX88=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_golang v1.15.0 h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM=
github.com/prometheus/client_golang v1.15.0/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
Expand Down Expand Up @@ -156,6 +160,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -217,3 +223,5 @@ k8s.io/mount-utils v0.27.1 h1:RSd0wslbIuwLRaGGNAGMZ3m9FLcvukxJ3FWlOm76W2A=
k8s.io/mount-utils v0.27.1/go.mod h1:vmcjYdi2Vg1VTWY7KkhvwJVY6WDHxb/QQhiQKkR8iNs=
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk=
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
libguestfs.org/libnbd v1.13.1 h1:5E77fcDS3DRJiSK5JVMvR6IYADPvIxLw05C/IYMv86E=
libguestfs.org/libnbd v1.13.1/go.mod h1:Qd8vaULc6nlNgj9+6qDNm1vSD/J7/wgoIlwmCh8uxAc=
23 changes: 22 additions & 1 deletion package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@
make; \
make install

RUN wget -O - https://s3-us-west-1.amazonaws.com/rancher-longhorn/libqcow-alpha-20181117.tar.gz | tar xvzf - -C /usr/src

Check notice on line 30 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L30

Avoid use of wget without progress bar. Use `wget --progress=dot:giga <url>`. Or consider using `-q` or `-nv` (shorthands for `--quiet` or `--no-verbose`). (DL3047)
RUN cd /usr/src/libqcow-20181117 && \
./configure
RUN cd /usr/src/libqcow-20181117 && \
make -j$(nproc) && \
make install

RUN zypper -n install libxml2-devel nbd

Check warning on line 37 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L37

Specify version with `zypper install -y <package>=<version>`. (DL3037)

Check warning on line 37 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L37

`zypper clean` missing after zypper use. (DL3036)
RUN cd /usr/src && \

Check notice on line 38 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L38

Avoid use of wget without progress bar. Use `wget --progress=dot:giga <url>`. Or consider using `-q` or `-nv` (shorthands for `--quiet` or `--no-verbose`). (DL3047)
wget -O - https://download.libguestfs.org/libnbd/1.13-development/libnbd-1.13.1.tar.gz | tar -xzf - && \
cd libnbd-1.13.1 && \
./configure --disable-ocaml --disable-python --disable-golang; \
make; \
make install

RUN ldconfig

# Install grpc_health_probe
RUN wget https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/v0.4.21/grpc_health_probe-linux-${ARCH} -O /usr/local/bin/grpc_health_probe && \
chmod +x /usr/local/bin/grpc_health_probe
Expand All @@ -39,7 +56,7 @@
zypper -n addrepo --refresh https://download.opensuse.org/repositories/network:/utilities/SLE_15_SP5/network:utilities.repo && \
zypper --gpg-auto-import-keys ref

RUN zypper -n install nfs-client nfs4-acl-tools cifs-utils libaio1 sg3_utils \
RUN zypper -n install nfs-client nfs4-acl-tools cifs-utils libaio1 sg3_utils nbd \

Check warning on line 59 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L59

Specify version with `zypper install -y <package>=<version>`. (DL3037)

Check warning on line 59 in package/Dockerfile

View check run for this annotation

codefactor.io / CodeFactor

package/Dockerfile#L59

`zypper clean` missing after zypper use. (DL3036)
iputils iproute2 qemu-tools e2fsprogs && \
rm -rf /var/cache/zypp/*

Expand All @@ -57,6 +74,10 @@

COPY package/launch-simple-longhorn package/engine-manager package/launch-simple-file /usr/local/bin/

COPY --from=builder /usr/sbin/nbd-client /usr/sbin/nbd-client
COPY --from=builder /usr/local/lib /usr/local/lib
ENV LD_LIBRARY_PATH=/usr/local/lib

VOLUME /usr/local/bin

# Add Tini
Expand Down
23 changes: 21 additions & 2 deletions package/launch-simple-longhorn
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mount --rbind /host/dev /dev
volume=$1
size=$2
frontend=$3
frontendStreams=$4
nbdEnabled=$5

if [ -z $volume ]
then
Expand All @@ -28,6 +30,23 @@ then
frontend="tgt-blockdev"
fi

if [ -z $frontendStreams ]
then
echo Use default frontend streams. No frontend nbd streams
frontendStreams=""
else
frontendStreams="--frontend-streams "$frontendStreams
fi

if [ -z $nbdEnabled ]
then
echo Use default dataserver. Dataconn dataserver
nbdEnabled=""
else
nbdEnabled="--nbd-enabled "$nbdEnabled
fi


function start() {
set +e
while true;
Expand All @@ -44,12 +63,12 @@ function start() {

tgtd -f 2>&1 | tee /var/log/tgtd.log &

longhorn-instance-manager process create --name "$volume-r" --binary /usr/local/bin/longhorn --port-count 15 --port-args "--listen,localhost:" -- replica /volume/ "--size" $size
longhorn-instance-manager process create --name "$volume-r" --binary /usr/local/bin/longhorn --port-count 15 --port-args "--listen,localhost:" -- replica /volume/ "--size" $size $nbdEnabled

# wait for the replica to be started
sleep 5

longhorn-instance-manager process create --name "$volume-e" --binary /usr/local/bin/longhorn --port-count 1 --port-args "--listen,localhost:" -- controller $volume --frontend $frontend "--size" $size "--current-size" $size --replica tcp://localhost:10000
longhorn-instance-manager process create --name "$volume-e" --binary /usr/local/bin/longhorn --port-count 1 --port-args "--listen,localhost:" -- controller $volume --frontend $frontend "--size" $size "--current-size" $size --replica tcp://localhost:10000 $nbdEnabled $frontendStreams
}

start &
Expand Down
4 changes: 2 additions & 2 deletions pkg/backend/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func New(factories map[string]types.BackendFactory) types.BackendFactory {
}
}

func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (d *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, nbdEnabled int) (types.Backend, error) {
parts := strings.SplitN(address, "://", 2)

if len(parts) == 2 {
if factory, ok := d.factories[parts[0]]; ok {
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout)
return factory.Create(volumeName, parts[1], dataServerProtocol, engineToReplicaTimeout, nbdEnabled)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/backend/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (f *Wrapper) ResetRebuild() error {
return nil
}

func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (ff *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, nbdEnabled int) (types.Backend, error) {
logrus.Infof("Creating file: %s", address)
file, err := os.OpenFile(address, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
Expand Down
17 changes: 12 additions & 5 deletions pkg/backend/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (r *Remote) info() (*types.ReplicaInfo, error) {
return replicaClient.GetReplicaInfo(resp.Replica), nil
}

func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration) (types.Backend, error) {
func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.DataServerProtocol, engineToReplicaTimeout time.Duration, nbdEnabled int) (types.Backend, error) {
logrus.Infof("Connecting to remote: %s (%v)", address, dataServerProtocol)

controlAddress, dataAddress, _, _, err := util.GetAddresses(volumeName, address, dataServerProtocol)
Expand Down Expand Up @@ -360,15 +360,22 @@ func (rf *Factory) Create(volumeName, address string, dataServerProtocol types.D
if err != nil {
return nil, err
}

dataConnClient := dataconn.NewClient(conn, engineToReplicaTimeout)
r.ReaderWriterUnmapperAt = dataConnClient
var dataConnClient *dataconn.Client
if nbdEnabled > 0 {
dataConnClientNBD := dataconn.NewNbdClientWrapper(conn, engineToReplicaTimeout, nbdEnabled)
r.ReaderWriterUnmapperAt = dataConnClientNBD
} else {
dataConnClient = dataconn.NewClient(conn, engineToReplicaTimeout)
r.ReaderWriterUnmapperAt = dataConnClient
}

if err := r.open(); err != nil {
return nil, err
}

go r.monitorPing(dataConnClient)
if nbdEnabled == 0 {
go r.monitorPing(dataConnClient)
}

return r, nil
}
Expand Down
24 changes: 14 additions & 10 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ type Controller struct {
factory types.BackendFactory
backend *replicator
frontend types.Frontend
frontendStreams int
isUpgrade bool
iscsiTargetRequestTimeout time.Duration
engineReplicaTimeout time.Duration
DataServerProtocol types.DataServerProtocol
nbdEnabled int

isExpanding bool
revisionCounterDisabled bool
Expand Down Expand Up @@ -66,14 +68,15 @@ const (
lastModifyCheckPeriod = 5 * time.Second
)

func NewController(name string, factory types.BackendFactory, frontend types.Frontend, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool,
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout int) *Controller {
func NewController(name string, factory types.BackendFactory, frontend types.Frontend, frontendStreams int, isUpgrade, disableRevCounter, salvageRequested, unmapMarkSnapChainRemoved bool,
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout int, nbdEnabled int) *Controller {
c := &Controller{
factory: factory,
VolumeName: name,
frontend: frontend,
metrics: &types.Metrics{},
latestMetrics: &types.Metrics{},
factory: factory,
VolumeName: name,
frontend: frontend,
frontendStreams: frontendStreams,
metrics: &types.Metrics{},
latestMetrics: &types.Metrics{},

isUpgrade: isUpgrade,
revisionCounterDisabled: disableRevCounter,
Expand All @@ -85,6 +88,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro
DataServerProtocol: dataServerProtocol,

fileSyncHTTPClientTimeout: fileSyncHTTPClientTimeout,
nbdEnabled: nbdEnabled,
}
c.reset()
c.metricsStart()
Expand Down Expand Up @@ -164,7 +168,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type
return err
}

newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout, c.nbdEnabled)
if err != nil {
return err
}
Expand Down Expand Up @@ -484,7 +488,7 @@ func (c *Controller) StartFrontend(frontend string) error {
}
}

f, err := NewFrontend(frontend, c.iscsiTargetRequestTimeout)
f, err := NewFrontend(frontend, c.frontendStreams, c.iscsiTargetRequestTimeout)
if err != nil {
return errors.Wrapf(err, "failed to find frontend: %s", frontend)
}
Expand Down Expand Up @@ -728,7 +732,7 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str
errorCodes := map[string]codes.Code{}
first := true
for _, address := range addresses {
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout, c.nbdEnabled)
if err != nil {
if strings.Contains(err.Error(), "rpc error: code = Unavailable") {
errorCodes[address] = codes.Unavailable
Expand Down
Loading
Loading