From edecbfe26dccd0de2a7557cb7a361beb16d5780f Mon Sep 17 00:00:00 2001 From: Artem Chernyshev Date: Fri, 9 Aug 2024 22:35:26 +0300 Subject: [PATCH] fix: improve emulator cloud provider reliability Copy the labels from machine request to the machine request status. Use a single client for rtnetlink watches and management. Retry service account creation. Signed-off-by: Artem Chernyshev --- .gitignore | 3 +- .kres.yaml | 72 ++++++++++ Dockerfile | 4 +- Makefile | 24 ++-- cmd/talemu-cloud-provider/main.go | 29 +++- cmd/talemu/main.go | 15 +- hack/compose/docker-compose-provider.yml | 36 +++++ internal/pkg/emu/runtime.go | 17 ++- .../pkg/machine/controllers/address_spec.go | 29 +--- internal/pkg/machine/controllers/link_spec.go | 74 ++++++---- .../pkg/machine/controllers/link_status.go | 57 +------- .../pkg/machine/controllers/siderolink.go | 15 +- internal/pkg/machine/events/events.go | 49 ++++--- internal/pkg/machine/machine.go | 13 +- internal/pkg/machine/network/client.go | 131 ++++++++++++++++++ internal/pkg/machine/options.go | 10 ++ internal/pkg/machine/runtime/runtime.go | 16 ++- internal/pkg/machine/services/apid.go | 8 +- .../pkg/provider/clientconfig/clientconfig.go | 77 ++-------- internal/pkg/provider/controllers/machine.go | 10 +- .../provider/controllers/machine/machine.go | 5 +- .../controllers/machine_request_status.go | 7 + internal/pkg/provider/provider.go | 5 +- 23 files changed, 480 insertions(+), 226 deletions(-) create mode 100644 hack/compose/docker-compose-provider.yml create mode 100644 internal/pkg/machine/network/client.go diff --git a/.gitignore b/.gitignore index b206700..b44c1cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-04-23T19:38:57Z by kres 0610b40-dirty. +# Generated on 2024-08-14T14:55:16Z by kres 7be2a05. _out hack/compose/docker-compose.override.yml +hack/compose/docker-compose-provider.override.yml diff --git a/.kres.yaml b/.kres.yaml index 82c49ee..b0a2690 100644 --- a/.kres.yaml +++ b/.kres.yaml @@ -3,6 +3,7 @@ kind: common.Build spec: ignoredPaths: - "hack/compose/docker-compose.override.yml" + - "hack/compose/docker-compose-provider.override.yml" --- kind: service.CodeCov spec: @@ -24,6 +25,10 @@ spec: toplevel: true - name: docker-compose-down toplevel: true + - name: docker-compose-provider-up + toplevel: true + - name: docker-compose-provider-down + toplevel: true --- kind: custom.Step name: docker-compose-up @@ -91,3 +96,70 @@ spec: DOCKER_BUILDKIT=1 GO_LDFLAGS="$(GO_LDFLAGS)" docker compose -p talemu --file ./hack/compose/docker-compose.yml --file ./hack/compose/docker-compose.override.yml down --rmi local --remove-orphans --volumes=$(REMOVE_VOLUMES) +--- +kind: custom.Step +name: docker-compose-provider-up +spec: + makefile: + enabled: true + phony: true + script: + - >- + ARTIFACTS="$(ARTIFACTS)" + SHA="$(SHA)" + TAG="$(TAG)" + USERNAME="$(USERNAME)" + REGISTRY="$(REGISTRY)" + PROTOBUF_TS_VERSION="$(PROTOBUF_TS_VERSION)" + NODE_BUILD_ARGS="$(NODE_BUILD_ARGS)" + TOOLCHAIN="$(TOOLCHAIN)" + CGO_ENABLED="$(CGO_ENABLED)" + GO_BUILDFLAGS="$(GO_BUILDFLAGS)" + GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)" + GOFUMPT_VERSION="$(GOFUMPT_VERSION)" + GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)" + PROTOBUF_GO_VERSION="$(PROTOBUF_GO_VERSION)" + GRPC_GO_VERSION="$(GRPC_GO_VERSION)" + GRPC_GATEWAY_VERSION="$(GRPC_GATEWAY_VERSION)" + VTPROTOBUF_VERSION="$(VTPROTOBUF_VERSION)" + DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)" + TESTPKGS="$(TESTPKGS)" + COMPOSE_DOCKER_CLI_BUILD=1 + DOCKER_BUILDKIT=1 + GO_LDFLAGS="$(GO_LDFLAGS)" + docker compose -p talemu-cloud-provider --file ./hack/compose/docker-compose-provider.yml --file ./hack/compose/docker-compose-provider.override.yml up --build +--- +kind: custom.Step +name: docker-compose-provider-down +spec: + makefile: + enabled: true + phony: true + variables: + - name: REMOVE_VOLUMES + defaultValue: false + script: + - >- + ARTIFACTS="$(ARTIFACTS)" + SHA="$(SHA)" + TAG="$(TAG)" + USERNAME="$(USERNAME)" + REGISTRY="$(REGISTRY)" + PROTOBUF_TS_VERSION="$(PROTOBUF_TS_VERSION)" + NODE_BUILD_ARGS="$(NODE_BUILD_ARGS)" + TOOLCHAIN="$(TOOLCHAIN)" + CGO_ENABLED="$(CGO_ENABLED)" + GO_BUILDFLAGS="$(GO_BUILDFLAGS)" + GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)" + GOFUMPT_VERSION="$(GOFUMPT_VERSION)" + GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)" + PROTOBUF_GO_VERSION="$(PROTOBUF_GO_VERSION)" + GRPC_GO_VERSION="$(GRPC_GO_VERSION)" + GRPC_GATEWAY_VERSION="$(GRPC_GATEWAY_VERSION)" + VTPROTOBUF_VERSION="$(VTPROTOBUF_VERSION)" + DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)" + TESTPKGS="$(TESTPKGS)" + COMPOSE_DOCKER_CLI_BUILD=1 + DOCKER_BUILDKIT=1 + GO_LDFLAGS="$(GO_LDFLAGS)" + docker compose -p talemu-cloud-provider --file ./hack/compose/docker-compose-provider.yml --file ./hack/compose/docker-compose-provider.override.yml down --rmi local --remove-orphans --volumes=$(REMOVE_VOLUMES) diff --git a/Dockerfile b/Dockerfile index ae2e584..f62bc96 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,7 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-07-31T12:19:21Z by kres faf91e3. +# Generated on 2024-08-14T14:55:16Z by kres 7be2a05. ARG TOOLCHAIN @@ -11,7 +11,7 @@ FROM ghcr.io/siderolabs/ca-certificates:v1.7.0 AS image-ca-certificates FROM ghcr.io/siderolabs/fhs:v1.7.0 AS image-fhs # runs markdownlint -FROM docker.io/oven/bun:1.1.20-alpine AS lint-markdown +FROM docker.io/oven/bun:1.1.22-alpine AS lint-markdown WORKDIR /src RUN bun i markdownlint-cli@0.41.0 sentences-per-line@0.2.1 COPY .markdownlint.json . diff --git a/Makefile b/Makefile index 8918c9c..e360328 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-07-31T12:19:21Z by kres faf91e3. +# Generated on 2024-08-14T14:58:43Z by kres 7be2a05. # common variables @@ -18,14 +18,14 @@ REGISTRY ?= ghcr.io USERNAME ?= siderolabs REGISTRY_AND_USERNAME ?= $(REGISTRY)/$(USERNAME) PROTOBUF_GO_VERSION ?= 1.34.2 -GRPC_GO_VERSION ?= 1.4.0 -GRPC_GATEWAY_VERSION ?= 2.20.0 +GRPC_GO_VERSION ?= 1.5.1 +GRPC_GATEWAY_VERSION ?= 2.21.0 VTPROTOBUF_VERSION ?= 0.6.0 -GOIMPORTS_VERSION ?= 0.23.0 +GOIMPORTS_VERSION ?= 0.24.0 DEEPCOPY_VERSION ?= v0.5.6 -GOLANGCILINT_VERSION ?= v1.59.1 +GOLANGCILINT_VERSION ?= v1.60.1 GOFUMPT_VERSION ?= v0.6.0 -GO_VERSION ?= 1.22.5 +GO_VERSION ?= 1.23.0 GO_BUILDFLAGS ?= GO_LDFLAGS ?= CGO_ENABLED ?= 0 @@ -67,7 +67,7 @@ COMMON_ARGS += --build-arg=DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)" COMMON_ARGS += --build-arg=GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)" COMMON_ARGS += --build-arg=GOFUMPT_VERSION="$(GOFUMPT_VERSION)" COMMON_ARGS += --build-arg=TESTPKGS="$(TESTPKGS)" -TOOLCHAIN ?= docker.io/golang:1.22-alpine +TOOLCHAIN ?= docker.io/golang:1.23-alpine # extra variables @@ -135,7 +135,7 @@ else GO_LDFLAGS += -s endif -all: unit-tests talemu image-talemu talemu-cloud-provider image-talemu-cloud-provider docker-compose-up docker-compose-down lint +all: unit-tests talemu image-talemu talemu-cloud-provider image-talemu-cloud-provider docker-compose-up docker-compose-down docker-compose-provider-up docker-compose-provider-down lint $(ARTIFACTS): ## Creates artifacts directory. @mkdir -p $(ARTIFACTS) @@ -225,6 +225,14 @@ docker-compose-up: docker-compose-down: ARTIFACTS="$(ARTIFACTS)" SHA="$(SHA)" TAG="$(TAG)" USERNAME="$(USERNAME)" REGISTRY="$(REGISTRY)" PROTOBUF_TS_VERSION="$(PROTOBUF_TS_VERSION)" NODE_BUILD_ARGS="$(NODE_BUILD_ARGS)" TOOLCHAIN="$(TOOLCHAIN)" CGO_ENABLED="$(CGO_ENABLED)" GO_BUILDFLAGS="$(GO_BUILDFLAGS)" GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)" GOFUMPT_VERSION="$(GOFUMPT_VERSION)" GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)" PROTOBUF_GO_VERSION="$(PROTOBUF_GO_VERSION)" GRPC_GO_VERSION="$(GRPC_GO_VERSION)" GRPC_GATEWAY_VERSION="$(GRPC_GATEWAY_VERSION)" VTPROTOBUF_VERSION="$(VTPROTOBUF_VERSION)" DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)" TESTPKGS="$(TESTPKGS)" COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 GO_LDFLAGS="$(GO_LDFLAGS)" docker compose -p talemu --file ./hack/compose/docker-compose.yml --file ./hack/compose/docker-compose.override.yml down --rmi local --remove-orphans --volumes=$(REMOVE_VOLUMES) +.PHONY: docker-compose-provider-up +docker-compose-provider-up: + ARTIFACTS="$(ARTIFACTS)" SHA="$(SHA)" TAG="$(TAG)" USERNAME="$(USERNAME)" REGISTRY="$(REGISTRY)" PROTOBUF_TS_VERSION="$(PROTOBUF_TS_VERSION)" NODE_BUILD_ARGS="$(NODE_BUILD_ARGS)" TOOLCHAIN="$(TOOLCHAIN)" CGO_ENABLED="$(CGO_ENABLED)" GO_BUILDFLAGS="$(GO_BUILDFLAGS)" GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)" GOFUMPT_VERSION="$(GOFUMPT_VERSION)" GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)" PROTOBUF_GO_VERSION="$(PROTOBUF_GO_VERSION)" GRPC_GO_VERSION="$(GRPC_GO_VERSION)" GRPC_GATEWAY_VERSION="$(GRPC_GATEWAY_VERSION)" VTPROTOBUF_VERSION="$(VTPROTOBUF_VERSION)" DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)" TESTPKGS="$(TESTPKGS)" COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 GO_LDFLAGS="$(GO_LDFLAGS)" docker compose -p talemu-cloud-provider --file ./hack/compose/docker-compose-provider.yml --file ./hack/compose/docker-compose-provider.override.yml up --build + +.PHONY: docker-compose-provider-down +docker-compose-provider-down: + ARTIFACTS="$(ARTIFACTS)" SHA="$(SHA)" TAG="$(TAG)" USERNAME="$(USERNAME)" REGISTRY="$(REGISTRY)" PROTOBUF_TS_VERSION="$(PROTOBUF_TS_VERSION)" NODE_BUILD_ARGS="$(NODE_BUILD_ARGS)" TOOLCHAIN="$(TOOLCHAIN)" CGO_ENABLED="$(CGO_ENABLED)" GO_BUILDFLAGS="$(GO_BUILDFLAGS)" GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)" GOFUMPT_VERSION="$(GOFUMPT_VERSION)" GOIMPORTS_VERSION="$(GOIMPORTS_VERSION)" PROTOBUF_GO_VERSION="$(PROTOBUF_GO_VERSION)" GRPC_GO_VERSION="$(GRPC_GO_VERSION)" GRPC_GATEWAY_VERSION="$(GRPC_GATEWAY_VERSION)" VTPROTOBUF_VERSION="$(VTPROTOBUF_VERSION)" DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)" TESTPKGS="$(TESTPKGS)" COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 GO_LDFLAGS="$(GO_LDFLAGS)" docker compose -p talemu-cloud-provider --file ./hack/compose/docker-compose-provider.yml --file ./hack/compose/docker-compose-provider.override.yml down --rmi local --remove-orphans --volumes=$(REMOVE_VOLUMES) + .PHONY: rekres rekres: @docker pull $(KRES_IMAGE) diff --git a/cmd/talemu-cloud-provider/main.go b/cmd/talemu-cloud-provider/main.go index 2d45501..9c9c956 100644 --- a/cmd/talemu-cloud-provider/main.go +++ b/cmd/talemu-cloud-provider/main.go @@ -25,6 +25,7 @@ import ( emuruntime "github.com/siderolabs/talemu/internal/pkg/emu" "github.com/siderolabs/talemu/internal/pkg/kubefactory" + "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talemu/internal/pkg/machine/runtime" "github.com/siderolabs/talemu/internal/pkg/machine/runtime/resources/emu" "github.com/siderolabs/talemu/internal/pkg/provider" @@ -50,9 +51,19 @@ var rootCmd = &cobra.Command{ } if cfg.createServiceAccount { - err = createServiceAccount(cmd.Context()) - if err != nil { - return err + for { + err = createServiceAccount(cmd.Context()) + if err == nil { + break + } + + logger.Error("failed to create service account", zap.Error(err)) + + select { + case <-cmd.Context().Done(): + return err + case <-time.After(time.Second * 5): + } } } @@ -97,7 +108,15 @@ var rootCmd = &cobra.Command{ return err } - if err = provider.RegisterControllers(runtime, kubernetes); err != nil { + nc := network.NewClient() + + if err = nc.Run(cmd.Context()); err != nil { + return err + } + + defer nc.Close() //nolint:errcheck + + if err = provider.RegisterControllers(runtime, kubernetes, nc); err != nil { return err } @@ -113,6 +132,8 @@ func createServiceAccount(ctx context.Context) error { return err } + defer rootClient.Close() //nolint:errcheck + name := access.CloudProviderServiceAccountPrefix + meta.ProviderID sa := access.ParseServiceAccountFromName(name) diff --git a/cmd/talemu/main.go b/cmd/talemu/main.go index 0e19628..5ea67fa 100644 --- a/cmd/talemu/main.go +++ b/cmd/talemu/main.go @@ -22,6 +22,7 @@ import ( emuruntime "github.com/siderolabs/talemu/internal/pkg/emu" "github.com/siderolabs/talemu/internal/pkg/kubefactory" "github.com/siderolabs/talemu/internal/pkg/machine" + "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talemu/internal/pkg/machine/runtime" "github.com/siderolabs/talemu/internal/pkg/machine/runtime/resources/emu" ) @@ -81,17 +82,25 @@ var rootCmd = &cobra.Command{ return runtime.Run(ctx) }) + nc := network.NewClient() + + if err = nc.Run(cmd.Context()); err != nil { + return err + } + + defer nc.Close() //nolint:errcheck + for i := range cfg.machinesCount { - machine, err := machine.NewMachine(fmt.Sprintf("%04d1802-c798-4da7-a410-f09abb48c8d8", i+1000), logger, emulatorState) + m, err := machine.NewMachine(fmt.Sprintf("%04d1802-c798-4da7-a410-f09abb48c8d8", i+1000), logger, emulatorState) if err != nil { return err } eg.Go(func() error { - return machine.Run(ctx, params, i+1000, kubernetes) + return m.Run(ctx, params, i+1000, kubernetes, machine.WithNetworkClient(nc)) }) - machines = append(machines, machine) + machines = append(machines, m) } var errors error diff --git a/hack/compose/docker-compose-provider.yml b/hack/compose/docker-compose-provider.yml new file mode 100644 index 0000000..8f13871 --- /dev/null +++ b/hack/compose/docker-compose-provider.yml @@ -0,0 +1,36 @@ +version: '3.8' +services: + talemu-cloud-provider: + volumes: + - state:/_out/provider + container_name: talemu-cloud-provider + restart: on-failure + cap_add: + - NET_ADMIN + build: + target: image-talemu-cloud-provider + context: ../../ + dockerfile: Dockerfile + args: + - ARTIFACTS=${ARTIFACTS:?error} + - SHA=${SHA:?error} + - TAG=${TAG:?error} + - USERNAME=${USERNAME:?error} + - REGISTRY=${REGISTRY:?error} + - NODE_BUILD_ARGS=${NODE_BUILD_ARGS} + - TOOLCHAIN=${TOOLCHAIN:?error} + - CGO_ENABLED=${CGO_ENABLED:?error} + - GO_BUILDFLAGS=${GO_BUILDFLAGS} + - GOLANGCILINT_VERSION=${GOLANGCILINT_VERSION:?error} + - GOFUMPT_VERSION=${GOFUMPT_VERSION:?error} + - GOIMPORTS_VERSION=${GOIMPORTS_VERSION:?error} + - PROTOBUF_GO_VERSION=${PROTOBUF_GO_VERSION:?error} + - GRPC_GO_VERSION=${GRPC_GO_VERSION:?error} + - GRPC_GATEWAY_VERSION=${GRPC_GATEWAY_VERSION:?error} + - VTPROTOBUF_VERSION=${VTPROTOBUF_VERSION:?error} + - DEEPCOPY_VERSION=${DEEPCOPY_VERSION:?error} + - TESTPKGS=${TESTPKGS:?error} + - GO_LDFLAGS=${GO_LDFLAGS} + +volumes: + state: diff --git a/internal/pkg/emu/runtime.go b/internal/pkg/emu/runtime.go index 6a7d49a..9966b4c 100644 --- a/internal/pkg/emu/runtime.go +++ b/internal/pkg/emu/runtime.go @@ -6,6 +6,7 @@ package emu import ( "context" + "time" "github.com/cosi-project/runtime/pkg/controller" "github.com/cosi-project/runtime/pkg/controller/runtime" @@ -70,5 +71,19 @@ func (rt *Runtime) RegisterController(ctrl controller.Controller) error { func (rt *Runtime) Run(ctx context.Context) error { rt.logger.Info("starting global runtime") - return rt.runtime.Run(ctx) + for { + err := rt.runtime.Run(ctx) + + if err == nil { + return nil + } + + rt.logger.Error("global runtime crashed", zap.Error(err)) + + select { + case <-ctx.Done(): + return err + case <-time.After(time.Second * 10): + } + } } diff --git a/internal/pkg/machine/controllers/address_spec.go b/internal/pkg/machine/controllers/address_spec.go index 657ec97..5d47053 100644 --- a/internal/pkg/machine/controllers/address_spec.go +++ b/internal/pkg/machine/controllers/address_spec.go @@ -17,17 +17,17 @@ import ( "github.com/cosi-project/runtime/pkg/safe" "github.com/jsimonetti/rtnetlink" "github.com/mdlayher/arp" + machinenetwork "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talos/pkg/machinery/nethelpers" "github.com/siderolabs/talos/pkg/machinery/resources/network" "go.uber.org/zap" "go4.org/netipx" - "golang.org/x/sys/unix" - - "github.com/siderolabs/talemu/internal/pkg/machine/network/watch" ) // AddressSpecController applies network.AddressSpec to the actual interfaces. -type AddressSpecController struct{} +type AddressSpecController struct { + NC *machinenetwork.Client +} // Name implements controller.Controller interface. func (ctrl *AddressSpecController) Name() string { @@ -59,21 +59,6 @@ func (ctrl *AddressSpecController) Outputs() []controller.Output { // //nolint:gocognit func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { - // watch link changes as some address might need to be re-applied if the link appears - watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK) - if err != nil { - return err - } - - defer watcher.Done() - - conn, err := rtnetlink.Dial(nil) - if err != nil { - return fmt.Errorf("error dialing rtnetlink socket: %w", err) - } - - defer conn.Close() //nolint:errcheck - for { select { case <-ctx.Done(): @@ -103,13 +88,13 @@ func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime } // list rtnetlink links (interfaces) - links, err := conn.Link.List() + links, err := ctrl.NC.Conn().Link.List() if err != nil { return fmt.Errorf("error listing links: %w", err) } // list rtnetlink addresses - addrs, err := conn.Address.List() + addrs, err := ctrl.NC.Conn().Address.List() if err != nil { return fmt.Errorf("error listing addresses: %w", err) } @@ -120,7 +105,7 @@ func (ctrl *AddressSpecController) Run(ctx context.Context, r controller.Runtime for _, res := range list.Items { address := res.(*network.AddressSpec) //nolint:forcetypeassert,errcheck - if err = ctrl.syncAddress(ctx, r, logger, conn, links, addrs, address); err != nil { + if err = ctrl.syncAddress(ctx, r, logger, ctrl.NC.Conn(), links, addrs, address); err != nil { return err } diff --git a/internal/pkg/machine/controllers/link_spec.go b/internal/pkg/machine/controllers/link_spec.go index 3a9e053..8bad24d 100644 --- a/internal/pkg/machine/controllers/link_spec.go +++ b/internal/pkg/machine/controllers/link_spec.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "log" "net" "net/netip" "slices" @@ -18,6 +19,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/jsimonetti/rtnetlink" "github.com/siderolabs/gen/xslices" + machinenetwork "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talos/pkg/machinery/nethelpers" "github.com/siderolabs/talos/pkg/machinery/resources/network" "go.uber.org/zap" @@ -25,12 +27,12 @@ import ( "golang.org/x/sys/unix" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - - "github.com/siderolabs/talemu/internal/pkg/machine/network/watch" ) // LinkSpecController applies network.LinkSpec to the actual interfaces. -type LinkSpecController struct{} +type LinkSpecController struct { + NC *machinenetwork.Client +} // Name implements controller.Controller interface. func (ctrl *LinkSpecController) Name() string { @@ -61,27 +63,6 @@ func (ctrl *LinkSpecController) Outputs() []controller.Output { // Run implements controller.Controller interface. func (ctrl *LinkSpecController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { // watch link changes as some routes might need to be re-applied if the link appears - watcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK) - if err != nil { - return err - } - - defer watcher.Done() - - conn, err := rtnetlink.Dial(nil) - if err != nil { - return fmt.Errorf("error dialing rtnetlink socket: %w", err) - } - - defer conn.Close() //nolint:errcheck - - wgClient, err := wgctrl.New() - if err != nil { - logger.Warn("error creating wireguard client", zap.Error(err)) - } else { - defer wgClient.Close() //nolint:errcheck - } - for { select { case <-ctx.Done(): @@ -107,7 +88,7 @@ func (ctrl *LinkSpecController) Run(ctx context.Context, r controller.Runtime, l } // list rtnetlink links (interfaces) - links, err := conn.Link.List() + links, err := ctrl.NC.Conn().Link.List() if err != nil { return fmt.Errorf("error listing links: %w", err) } @@ -118,7 +99,7 @@ func (ctrl *LinkSpecController) Run(ctx context.Context, r controller.Runtime, l for _, res := range list.Items { link := res.(*network.LinkSpec) //nolint:forcetypeassert,errcheck - if err = ctrl.syncLink(ctx, r, logger, conn, wgClient, &links, link); err != nil { + if err = ctrl.syncLink(ctx, r, logger, ctrl.NC.Conn(), ctrl.NC.Wg(), &links, link); err != nil { multiErr = multierror.Append(multiErr, err) } } @@ -294,7 +275,7 @@ func (ctrl *LinkSpecController) syncLink(ctx context.Context, r controller.Runti link.TypedSpec().Wireguard.Sort() // order here is important: we allow listenPort to be zero in the configuration - if !existingSpec.Equal(&link.TypedSpec().Wireguard) { + if !Equal(existingSpec, link.TypedSpec().Wireguard) { config, err := WireguardSpec(&link.TypedSpec().Wireguard).Encode(&existingSpec) if err != nil { return fmt.Errorf("error creating wireguard config patch for %q: %w", link.TypedSpec().Name, err) @@ -361,6 +342,45 @@ func (ctrl *LinkSpecController) syncLink(ctx context.Context, r controller.Runti return nil } +// Equal checks two WireguardSpecs for equality. +// +// Both specs should be sorted before calling this method. +// +// `spec` is considered to be the result of getting current Wireguard configuration, +// while `other` is the new (updated configuration). +func Equal(spec, other network.WireguardSpec) bool { + if spec.PrivateKey != other.PrivateKey { + log.Printf("key differs %s <> %s", spec.PrivateKey, other.PrivateKey) + return false + } + + // listenPort of '0' means use any available port, so we shouldn't consider this to be a "change" + if spec.ListenPort != other.ListenPort && other.ListenPort != 0 { + log.Printf("port differs %d <> %d", spec.ListenPort, other.ListenPort) + return false + } + + if spec.FirewallMark != other.FirewallMark { + log.Printf("fw mark differs %d <> %d", spec.FirewallMark, other.FirewallMark) + return false + } + + if len(spec.Peers) != len(other.Peers) { + log.Printf("number of peers change %d <> %d", len(spec.Peers), len(other.Peers)) + return false + } + + for i := range spec.Peers { + if !spec.Peers[i].Equal(&other.Peers[i]) { + log.Printf(">> peer %d differs", i) + + return false + } + } + + return true +} + // WireguardSpec adapter provides encoding/decoding to netlink structures. // //nolint:revive,golint diff --git a/internal/pkg/machine/controllers/link_status.go b/internal/pkg/machine/controllers/link_status.go index e286a9b..e795db8 100644 --- a/internal/pkg/machine/controllers/link_status.go +++ b/internal/pkg/machine/controllers/link_status.go @@ -17,22 +17,22 @@ import ( "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/safe" "github.com/cosi-project/runtime/pkg/state" - "github.com/jsimonetti/rtnetlink/v2" + "github.com/jsimonetti/rtnetlink" "github.com/mdlayher/ethtool" ethtoolioctl "github.com/safchain/ethtool" + machinenetwork "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talos/pkg/machinery/constants" "github.com/siderolabs/talos/pkg/machinery/nethelpers" "github.com/siderolabs/talos/pkg/machinery/resources/network" "go.uber.org/zap" - "golang.org/x/sys/unix" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - - "github.com/siderolabs/talemu/internal/pkg/machine/network/watch" ) // LinkStatusController manages secrets.Etcd based on configuration. -type LinkStatusController struct{} +type LinkStatusController struct { + NC *machinenetwork.Client +} // Name implements controller.Controller interface. func (ctrl *LinkStatusController) Name() string { @@ -62,51 +62,6 @@ func (ctrl *LinkStatusController) Outputs() []controller.Output { // Run implements controller.Controller interface. func (ctrl *LinkStatusController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { - // create watch connections to rtnetlink and ethtool via genetlink - // these connections are used only to join multicast groups and receive notifications on changes - // other connections are used to send requests and receive responses, as we can't mix the notifications and request/responses - rtnetlinkWatcher, err := watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, r), unix.RTMGRP_LINK) - if err != nil { - return err - } - - defer rtnetlinkWatcher.Done() - - ethtoolWatcher, err := watch.NewEthtool(watch.NewDefaultRateLimitedTrigger(ctx, r)) - if err != nil { - logger.Warn("ethtool watcher failed to start", zap.Error(err)) - } else { - defer ethtoolWatcher.Done() - } - - conn, err := rtnetlink.Dial(nil) - if err != nil { - return fmt.Errorf("error dialing rtnetlink socket: %w", err) - } - - defer conn.Close() //nolint:errcheck - - ethClient, err := ethtool.New() - if err != nil { - logger.Warn("error dialing ethtool socket", zap.Error(err)) - } else { - defer ethClient.Close() //nolint:errcheck - } - - ethIoctlClient, err := ethtoolioctl.NewEthtool() - if err != nil { - logger.Warn("error dialing ethtool ioctl socket", zap.Error(err)) - } else { - defer ethIoctlClient.Close() - } - - wgClient, err := wgctrl.New() - if err != nil { - logger.Warn("error creating wireguard client", zap.Error(err)) - } else { - defer wgClient.Close() //nolint:errcheck - } - for { select { case <-ctx.Done(): @@ -114,7 +69,7 @@ func (ctrl *LinkStatusController) Run(ctx context.Context, r controller.Runtime, case <-r.EventCh(): } - if err = ctrl.reconcile(ctx, r, logger, conn, ethClient, ethIoctlClient, wgClient); err != nil { + if err := ctrl.reconcile(ctx, r, logger, ctrl.NC.Conn(), ctrl.NC.Eth(), ctrl.NC.EthIoCtl(), ctrl.NC.Wg()); err != nil { return err } diff --git a/internal/pkg/machine/controllers/siderolink.go b/internal/pkg/machine/controllers/siderolink.go index 6ea312d..8cdd730 100644 --- a/internal/pkg/machine/controllers/siderolink.go +++ b/internal/pkg/machine/controllers/siderolink.go @@ -22,6 +22,7 @@ import ( pointer "github.com/siderolabs/go-pointer" pb "github.com/siderolabs/siderolink/api/siderolink" "github.com/siderolabs/siderolink/pkg/wireguard" + machinenetwork "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talos/pkg/machinery/constants" "github.com/siderolabs/talos/pkg/machinery/nethelpers" "github.com/siderolabs/talos/pkg/machinery/resources/config" @@ -40,6 +41,7 @@ import ( // ManagerController interacts with SideroLink API and brings up the SideroLink Wireguard interface. type ManagerController struct { + NC *machinenetwork.Client pd provisionData nodeKey wgtypes.Key Slot int @@ -100,17 +102,6 @@ func (ctrl *ManagerController) Outputs() []controller.Output { // //nolint:gocyclo,cyclop,gocognit func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error { - wgClient, wgClientErr := wgctrl.New() - if wgClientErr != nil { - return wgClientErr - } - - defer func() { - if closeErr := wgClient.Close(); closeErr != nil { - logger.Error("failed to close wg client", zap.Error(closeErr)) - } - }() - var zeroKey wgtypes.Key if bytes.Equal(ctrl.nodeKey[:], zeroKey[:]) { @@ -130,7 +121,7 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo case <-ctx.Done(): return nil case <-ticker.C: - reconnect, err := ctrl.shouldReconnect(wgClient) + reconnect, err := ctrl.shouldReconnect(ctrl.NC.Wg()) if err != nil { return err } diff --git a/internal/pkg/machine/events/events.go b/internal/pkg/machine/events/events.go index d0f090a..67ecfc1 100644 --- a/internal/pkg/machine/events/events.go +++ b/internal/pkg/machine/events/events.go @@ -8,6 +8,7 @@ package events import ( "context" "fmt" + "math/rand" "net" "net/netip" "strings" @@ -39,15 +40,24 @@ import ( // Handler watches machine status resource and turns each resource change into an event. type Handler struct { - state state.State - client events.EventSinkServiceClient + state state.State } // NewHandler creates new events handler. func NewHandler(ctx context.Context, st state.State) (*Handler, error) { - config, err := safe.ReaderGetByID[*runtime.EventSinkConfig](ctx, st, runtime.EventSinkConfigID) + return &Handler{ + state: st, + }, nil +} + +// Run starts the events handler. +func (h *Handler) Run(ctx context.Context, logger *zap.Logger) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + config, err := safe.ReaderGetByID[*runtime.EventSinkConfig](ctx, h.state, runtime.EventSinkConfigID) if err != nil { - return nil, err + return err } var ( @@ -69,7 +79,7 @@ func NewHandler(ctx context.Context, st state.State) (*Handler, error) { ) if bindAddress == nil { - list, e := safe.ReaderListAll[*network.AddressStatus](ctx, st) + list, e := safe.ReaderListAll[*network.AddressStatus](ctx, h.state) if err != nil { return nil, e } @@ -99,23 +109,19 @@ func NewHandler(ctx context.Context, st state.State) (*Handler, error) { }), ) if err != nil { - return nil, fmt.Errorf("error establishing connection to event sink: %w", err) + return fmt.Errorf("error establishing connection to event sink: %w", err) } - return &Handler{ - state: st, - client: events.NewEventSinkServiceClient(conn), - }, nil -} + defer conn.Close() //nolint:errcheck -// Run starts the events handler. -func (h *Handler) Run(ctx context.Context, logger *zap.Logger) error { var eg errgroup.Group + client := events.NewEventSinkServiceClient(conn) + for _, id := range []string{emuconst.APIDService, emuconst.ETCDService, emuconst.KubeletService} { eg.Go(func() error { return h.runWithRetries(ctx, logger, func() error { - return generateEvents(ctx, h, v1alpha1.NewService(id), func(res *v1alpha1.Service) (*events.EventRequest, error) { + return generateEvents(ctx, h, v1alpha1.NewService(id), client, func(res *v1alpha1.Service) (*events.EventRequest, error) { id := xid.NewWithTime(res.Metadata().Updated()) state := "Stopped" @@ -148,7 +154,7 @@ func (h *Handler) Run(ctx context.Context, logger *zap.Logger) error { eg.Go(func() error { return h.runWithRetries(ctx, logger, func() error { - return generateEvents(ctx, h, runtime.NewMachineStatus(), func(res *runtime.MachineStatus) (*events.EventRequest, error) { + return generateEvents(ctx, h, runtime.NewMachineStatus(), client, func(res *runtime.MachineStatus) (*events.EventRequest, error) { id := xid.NewWithTime(res.Metadata().Updated()) payload := &machine.MachineStatusEvent{ @@ -183,12 +189,14 @@ func (h *Handler) Run(ctx context.Context, logger *zap.Logger) error { } func (h *Handler) runWithRetries(ctx context.Context, logger *zap.Logger, cb func() error) error { + backoff := time.Second + for { err := cb() if err != nil { logger.WithOptions(zap.AddStacktrace(zap.PanicLevel)).Warn("event sink connector crashed", zap.Error(err)) - time.Sleep(time.Second) + time.Sleep(backoff) select { case <-ctx.Done(): @@ -196,6 +204,11 @@ func (h *Handler) runWithRetries(ctx context.Context, logger *zap.Logger, cb fun default: } + backoff = backoff*2 + time.Second*time.Duration(rand.Intn(10)) + if backoff > time.Second*30 { + backoff = time.Second * 30 + } + continue } @@ -204,7 +217,7 @@ func (h *Handler) runWithRetries(ctx context.Context, logger *zap.Logger, cb fun } //nolint:gocognit,gocyclo,cyclop -func generateEvents[T resource.Resource](ctx context.Context, h *Handler, res T, callback func(res T) (*events.EventRequest, error), logger *zap.Logger) error { +func generateEvents[T resource.Resource](ctx context.Context, h *Handler, res T, client events.EventSinkServiceClient, callback func(res T) (*events.EventRequest, error), logger *zap.Logger) error { latest, err := h.state.Get(ctx, res.Metadata()) if err != nil && !state.IsNotFoundError(err) { return err @@ -277,7 +290,7 @@ func generateEvents[T resource.Resource](ctx context.Context, h *Handler, res T, return err } - if _, err = h.client.Publish(ctx, event); err != nil { + if _, err = client.Publish(ctx, event); err != nil { return err } diff --git a/internal/pkg/machine/machine.go b/internal/pkg/machine/machine.go index 30d0cc3..849c4f3 100644 --- a/internal/pkg/machine/machine.go +++ b/internal/pkg/machine/machine.go @@ -31,6 +31,7 @@ import ( "github.com/siderolabs/talemu/internal/pkg/machine/controllers" "github.com/siderolabs/talemu/internal/pkg/machine/events" "github.com/siderolabs/talemu/internal/pkg/machine/logging" + machinenetwork "github.com/siderolabs/talemu/internal/pkg/machine/network" truntime "github.com/siderolabs/talemu/internal/pkg/machine/runtime" "github.com/siderolabs/talemu/internal/pkg/machine/runtime/resources/talos" ) @@ -65,6 +66,16 @@ func (m *Machine) Run(ctx context.Context, siderolinkParams *SideroLinkParams, s o(&opts) } + if opts.nc == nil { + opts.nc = machinenetwork.NewClient() + + if err := opts.nc.Run(ctx); err != nil { + return err + } + + defer opts.nc.Close() //nolint:errcheck + } + logSink, err := logging.NewZapCore(siderolinkParams.LogsEndpoint) if err != nil { return err @@ -76,7 +87,7 @@ func (m *Machine) Run(ctx context.Context, siderolinkParams *SideroLinkParams, s m.logger = zap.New(core).With(zap.String("machine", m.uuid)) - rt, err := truntime.NewRuntime(ctx, m.logger, slot, m.uuid, m.globalState, kubernetes, logSink) + rt, err := truntime.NewRuntime(ctx, m.logger, slot, m.uuid, m.globalState, kubernetes, opts.nc, logSink) if err != nil { return err } diff --git a/internal/pkg/machine/network/client.go b/internal/pkg/machine/network/client.go new file mode 100644 index 0000000..40d6619 --- /dev/null +++ b/internal/pkg/machine/network/client.go @@ -0,0 +1,131 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package network + +import ( + "context" + "fmt" + + "github.com/jsimonetti/rtnetlink" + "github.com/mdlayher/ethtool" + ethtoolioctl "github.com/safchain/ethtool" + "golang.org/x/sys/unix" + "golang.zx2c4.com/wireguard/wgctrl" + + "github.com/siderolabs/talemu/internal/pkg/machine/network/watch" +) + +// Client defines a shared network client for all machines. +type Client struct { + rtnetlinkWatcher watch.Watcher + ethtoolWatcher watch.Watcher + rtnetlinkConn *rtnetlink.Conn + ethIoctlClient *ethtoolioctl.Ethtool + ethClient *ethtool.Client + wgClient *wgctrl.Client + + listeners map[string]func() +} + +// NewClient creates a new network client. +func NewClient() *Client { + return &Client{ + listeners: make(map[string]func()), + } +} + +// QueueReconcile implements watch.Trigger. +func (nc *Client) QueueReconcile() { + for _, listener := range nc.listeners { + listener() + } +} + +// AddListener adds watches listener. +func (nc *Client) AddListener(id string, cb func()) { + nc.listeners[id] = cb +} + +// RemoveListener removes watches listener. +func (nc *Client) RemoveListener(id string) { + delete(nc.listeners, id) +} + +// Run starts the network clients. +func (nc *Client) Run(ctx context.Context) error { + var err error + + // create watch connections to rtnetlink and ethtool via genetlink + // these connections are used only to join multicast groups and receive notifications on changes + // other connections are used to send requests and receive responses, as we can't mix the notifications and request/responses + nc.rtnetlinkWatcher, err = watch.NewRtNetlink(watch.NewDefaultRateLimitedTrigger(ctx, nc), unix.RTMGRP_LINK) + if err != nil { + return err + } + + nc.ethtoolWatcher, err = watch.NewEthtool(watch.NewDefaultRateLimitedTrigger(ctx, nc)) + if err != nil { + return err + } + + nc.rtnetlinkConn, err = rtnetlink.Dial(nil) + if err != nil { + return fmt.Errorf("error dialing rtnetlink socket: %w", err) + } + + nc.ethClient, err = ethtool.New() + if err != nil { + return err + } + + nc.ethIoctlClient, err = ethtoolioctl.NewEthtool() + if err != nil { + return err + } + + nc.wgClient, err = wgctrl.New() + if err != nil { + return err + } else { + } + + return nil +} + +func (nc *Client) Close() error { + if err := nc.ethClient.Close(); err != nil { + return err + } + + if err := nc.wgClient.Close(); err != nil { + return err + } + + nc.ethIoctlClient.Close() + nc.ethtoolWatcher.Done() + nc.rtnetlinkWatcher.Done() + + return nc.rtnetlinkConn.Close() +} + +// Conn returns rtnetlink conn. +func (nc *Client) Conn() *rtnetlink.Conn { + return nc.rtnetlinkConn +} + +// EthIoCtl client. +func (nc *Client) EthIoCtl() *ethtoolioctl.Ethtool { + return nc.ethIoctlClient +} + +// Eth client. +func (nc *Client) Eth() *ethtool.Client { + return nc.ethClient +} + +// Wg client. +func (nc *Client) Wg() *wgctrl.Client { + return nc.wgClient +} diff --git a/internal/pkg/machine/options.go b/internal/pkg/machine/options.go index 99dac3e..b172fc1 100644 --- a/internal/pkg/machine/options.go +++ b/internal/pkg/machine/options.go @@ -4,8 +4,11 @@ package machine +import "github.com/siderolabs/talemu/internal/pkg/machine/network" + // Options is the extra machine options. type Options struct { + nc *network.Client talosVersion string schematic string } @@ -26,3 +29,10 @@ func WithSchematic(schematic string) Option { o.schematic = schematic } } + +// WithNetworkClient explicitly sets the network client to use in the machine controllers. +func WithNetworkClient(nc *network.Client) Option { + return func(o *Options) { + o.nc = nc + } +} diff --git a/internal/pkg/machine/runtime/runtime.go b/internal/pkg/machine/runtime/runtime.go index 4cc082f..efecf27 100644 --- a/internal/pkg/machine/runtime/runtime.go +++ b/internal/pkg/machine/runtime/runtime.go @@ -20,6 +20,7 @@ import ( "github.com/siderolabs/talemu/internal/pkg/kubefactory" "github.com/siderolabs/talemu/internal/pkg/machine/controllers" "github.com/siderolabs/talemu/internal/pkg/machine/logging" + "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talemu/internal/pkg/machine/runtime/resources/emu" "github.com/siderolabs/talemu/internal/pkg/machine/runtime/resources/talos" "github.com/siderolabs/talemu/internal/pkg/machine/services" @@ -36,7 +37,7 @@ type Runtime struct { // NewRuntime creates new runtime. func NewRuntime(ctx context.Context, logger *zap.Logger, slot int, id string, globalState state.State, - kubernetes *kubefactory.Kubernetes, logSink *logging.ZapCore, + kubernetes *kubefactory.Kubernetes, nc *network.Client, logSink *logging.ZapCore, ) (*Runtime, error) { stateDir := GetStateDir(id) @@ -66,13 +67,20 @@ func NewRuntime(ctx context.Context, logger *zap.Logger, slot int, id string, gl controllers := []controller.Controller{ &controllers.ManagerController{ Slot: slot, + NC: nc, + }, + &controllers.LinkSpecController{ + NC: nc, + }, + &controllers.LinkStatusController{ + NC: nc, }, - &controllers.LinkSpecController{}, - &controllers.LinkStatusController{}, &controllers.APIDController{ APID: services.NewAPID(id, st, globalState), }, - &controllers.AddressSpecController{}, + &controllers.AddressSpecController{ + NC: nc, + }, &controllers.GRPCTLSController{}, &controllers.MachineTypeController{}, &controllers.HostnameConfigController{ diff --git a/internal/pkg/machine/services/apid.go b/internal/pkg/machine/services/apid.go index f7a9c2a..4aa2f8d 100644 --- a/internal/pkg/machine/services/apid.go +++ b/internal/pkg/machine/services/apid.go @@ -101,8 +101,6 @@ func (apid *APID) Run(ctx context.Context, endpoint netip.Prefix, logger *zap.Lo tlsCredentials := credentials.NewTLS(cfg) - eg, ctx := errgroup.WithContext(ctx) - backendFactory := backend.NewAPIDFactory(provider) remoteFactory := backendFactory.Get @@ -171,6 +169,10 @@ func (apid *APID) Run(ctx context.Context, endpoint netip.Prefix, logger *zap.Lo storage.RegisterStorageServiceServer(localServer, machineSrv) cosiv1alpha1.RegisterStateServer(localServer, resourceState) + eg, ctx := errgroup.WithContext(ctx) + + apid.eg = eg + eg.Go(func() error { listener, err := memconn.Listener() if err != nil { @@ -213,8 +215,6 @@ func (apid *APID) Run(ctx context.Context, endpoint netip.Prefix, logger *zap.Lo return s.Serve(lis) }) - apid.eg = eg - return nil } diff --git a/internal/pkg/provider/clientconfig/clientconfig.go b/internal/pkg/provider/clientconfig/clientconfig.go index ab5bade..dda1130 100644 --- a/internal/pkg/provider/clientconfig/clientconfig.go +++ b/internal/pkg/provider/clientconfig/clientconfig.go @@ -7,6 +7,7 @@ package clientconfig import ( "context" + "crypto/tls" "fmt" "os" "path/filepath" @@ -14,34 +15,19 @@ import ( "time" "github.com/adrg/xdg" - "github.com/hashicorp/go-multierror" - "github.com/siderolabs/gen/containers" - authpb "github.com/siderolabs/go-api-signature/api/auth" authcli "github.com/siderolabs/go-api-signature/pkg/client/auth" "github.com/siderolabs/go-api-signature/pkg/client/interceptor" "github.com/siderolabs/go-api-signature/pkg/message" "github.com/siderolabs/go-api-signature/pkg/pgp" "github.com/siderolabs/omni/client/pkg/client" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) const ( defaultEmail = "test-user@siderolabs.com" ) -type clientCacheKey struct { - role string - email string - skipUserRole bool -} - -type clientOrError struct { - client *client.Client - err error -} - -var clientCache = containers.ConcurrentMap[clientCacheKey, clientOrError]{} - // ClientConfig is a test client. type ClientConfig struct { endpoint string @@ -67,54 +53,17 @@ func (t *ClientConfig) GetClient(publicKeyOpts ...authcli.RegisterPGPPublicKeyOp // Clients are cached by their configuration, so if a client with the // given configuration was created before, the cached one will be returned. func (t *ClientConfig) GetClientForEmail(email string, publicKeyOpts ...authcli.RegisterPGPPublicKeyOption) (*client.Client, error) { - cacheKey := t.buildCacheKey(email, publicKeyOpts) - - cliOrErr, _ := clientCache.GetOrCall(cacheKey, func() clientOrError { - signatureInterceptor := buildSignatureInterceptor(email, publicKeyOpts...) - - cli, err := client.New(t.endpoint, - client.WithGrpcOpts( - grpc.WithUnaryInterceptor(signatureInterceptor.Unary()), - grpc.WithStreamInterceptor(signatureInterceptor.Stream()), - ), - ) - - return clientOrError{ - client: cli, - err: err, - } - }) - - return cliOrErr.client, cliOrErr.err -} - -// Close closes all the clients created by this config. -func (t *ClientConfig) Close() error { - var multiErr error - - clientCache.ForEach(func(_ clientCacheKey, cliOrErr clientOrError) { - if cliOrErr.client != nil { - if err := cliOrErr.client.Close(); err != nil { - multiErr = multierror.Append(multiErr, err) - } - } - }) - - return multiErr -} - -func (t *ClientConfig) buildCacheKey(email string, publicKeyOpts []authcli.RegisterPGPPublicKeyOption) clientCacheKey { - var req authpb.RegisterPublicKeyRequest - - for _, o := range publicKeyOpts { - o(&req) - } - - return clientCacheKey{ - role: req.Role, - email: email, - skipUserRole: req.SkipUserRole, - } + signatureInterceptor := buildSignatureInterceptor(email, publicKeyOpts...) + + return client.New(t.endpoint, + client.WithGrpcOpts( + grpc.WithUnaryInterceptor(signatureInterceptor.Unary()), + grpc.WithStreamInterceptor(signatureInterceptor.Stream()), + grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + InsecureSkipVerify: true, + })), + ), + ) } var talosAPIKeyMutex sync.Mutex diff --git a/internal/pkg/provider/controllers/machine.go b/internal/pkg/provider/controllers/machine.go index 4b84377..bb3cbf0 100644 --- a/internal/pkg/provider/controllers/machine.go +++ b/internal/pkg/provider/controllers/machine.go @@ -24,6 +24,7 @@ import ( "github.com/siderolabs/talemu/internal/pkg/kubefactory" "github.com/siderolabs/talemu/internal/pkg/machine" + "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talemu/internal/pkg/machine/runtime" "github.com/siderolabs/talemu/internal/pkg/machine/runtime/resources/emu" machinetask "github.com/siderolabs/talemu/internal/pkg/provider/controllers/machine" @@ -34,15 +35,17 @@ import ( type MachineController struct { runner *task.Runner[any, machinetask.TaskSpec] kubernetes *kubefactory.Kubernetes + nc *network.Client globalState state.State } // NewMachineController creates new machine controller. -func NewMachineController(globalState state.State, kubernetes *kubefactory.Kubernetes) *MachineController { +func NewMachineController(globalState state.State, kubernetes *kubefactory.Kubernetes, nc *network.Client) *MachineController { return &MachineController{ runner: task.NewEqualRunner[machinetask.TaskSpec](), globalState: globalState, kubernetes: kubernetes, + nc: nc, } } @@ -89,6 +92,8 @@ func (ctrl *MachineController) Run(ctx context.Context, r controller.Runtime, lo for { select { case <-ctx.Done(): + ctrl.runner.Stop() + return nil case <-r.EventCh(): } @@ -114,6 +119,8 @@ func (ctrl *MachineController) Run(ctx context.Context, r controller.Runtime, lo } if err = safe.WriterModify(ctx, r, cloud.NewMachineRequestStatus(m.Metadata().ID()), func(r *cloud.MachineRequestStatus) error { + *r.Metadata().Labels() = *m.Metadata().Labels() + r.TypedSpec().Value.Id = m.TypedSpec().Value.Uuid r.TypedSpec().Value.Stage = cloudspecs.MachineRequestStatusSpec_PROVISIONED @@ -146,6 +153,7 @@ func (ctrl *MachineController) Run(ctx context.Context, r controller.Runtime, lo GlobalState: ctrl.globalState, Kubernetes: ctrl.kubernetes, Params: params, + NC: ctrl.nc, }, nil) touchedIDs[m.Metadata().ID()] = struct{}{} diff --git a/internal/pkg/provider/controllers/machine/machine.go b/internal/pkg/provider/controllers/machine/machine.go index f66071a..071d844 100644 --- a/internal/pkg/provider/controllers/machine/machine.go +++ b/internal/pkg/provider/controllers/machine/machine.go @@ -14,6 +14,7 @@ import ( "github.com/siderolabs/talemu/internal/pkg/kubefactory" "github.com/siderolabs/talemu/internal/pkg/machine" + "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talemu/internal/pkg/provider/resources" ) @@ -25,6 +26,7 @@ type TaskSpec struct { GlobalState state.State Params *machine.SideroLinkParams Kubernetes *kubefactory.Kubernetes + NC *network.Client } // ID implements task.TaskSpec. @@ -34,7 +36,7 @@ func (s TaskSpec) ID() task.ID { // Equal implements task.TaskSpec. func (s TaskSpec) Equal(other TaskSpec) bool { - return s.Machine.Metadata().Equal(*other.Machine.Metadata()) && s.Machine.TypedSpec().Value.EqualVT(other.Machine.TypedSpec().Value) + return s.Machine.Metadata().ID() == other.Machine.Metadata().ID() && s.Machine.TypedSpec().Value.EqualVT(other.Machine.TypedSpec().Value) } // RunTask implements task.TaskSpec. @@ -53,5 +55,6 @@ func (s TaskSpec) RunTask(ctx context.Context, logger *zap.Logger, _ any) error s.Kubernetes, machine.WithTalosVersion(s.Machine.TypedSpec().Value.TalosVersion), machine.WithSchematic(s.Machine.TypedSpec().Value.Schematic), + machine.WithNetworkClient(s.NC), ) } diff --git a/internal/pkg/provider/controllers/machine_request_status.go b/internal/pkg/provider/controllers/machine_request_status.go index 9b09635..ed3f7b9 100644 --- a/internal/pkg/provider/controllers/machine_request_status.go +++ b/internal/pkg/provider/controllers/machine_request_status.go @@ -8,6 +8,7 @@ package controllers import ( "context" "fmt" + "strings" "github.com/cosi-project/runtime/pkg/controller" "github.com/cosi-project/runtime/pkg/controller/generic/qtransform" @@ -49,6 +50,10 @@ func NewMachineRequestStatusController() *MachineRequestStatusController { schematicID := request.TypedSpec().Value.SchematicId talosVersion := request.TypedSpec().Value.TalosVersion + if strings.HasPrefix(talosVersion, "v") { + talosVersion = "v" + talosVersion + } + logger.Info("received machine request", zap.String("schematic_id", schematicID), zap.String("talos_version", talosVersion)) var err error @@ -71,6 +76,8 @@ func NewMachineRequestStatusController() *MachineRequestStatusController { machine.TypedSpec().Value.TalosVersion = talosVersion machine.TypedSpec().Value.Uuid = uuid + *machine.Metadata().Labels() = *request.Metadata().Labels() + return nil }, }, diff --git a/internal/pkg/provider/provider.go b/internal/pkg/provider/provider.go index 3f6d1ed..8864dd1 100644 --- a/internal/pkg/provider/provider.go +++ b/internal/pkg/provider/provider.go @@ -10,17 +10,18 @@ import ( "github.com/siderolabs/talemu/internal/pkg/emu" "github.com/siderolabs/talemu/internal/pkg/kubefactory" + "github.com/siderolabs/talemu/internal/pkg/machine/network" "github.com/siderolabs/talemu/internal/pkg/provider/controllers" ) // RegisterControllers registers additional controllers required for the cloud provider. -func RegisterControllers(runtime *emu.Runtime, kubernetes *kubefactory.Kubernetes) error { +func RegisterControllers(runtime *emu.Runtime, kubernetes *kubefactory.Kubernetes, nc *network.Client) error { qcontrollers := []controller.QController{ controllers.NewMachineRequestStatusController(), } controllers := []controller.Controller{ - controllers.NewMachineController(runtime.State(), kubernetes), + controllers.NewMachineController(runtime.State(), kubernetes, nc), } for _, ctrl := range qcontrollers {