Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Control Loop #225

Merged
merged 22 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a99b3a6
Migrate to kube_codegen.sh
samuelattwood Nov 20, 2024
4c499e0
Migrate to reconciliation loop using controller-runtime (#208)
adriandieter Dec 9, 2024
e9dd49c
feat(controller-runtime): Implement stream controller (#211)
adriandieter Dec 19, 2024
c353ccc
feat(controller-runtime): Add consumer controller (#212)
adriandieter Dec 31, 2024
5bd0945
feat(controller-runtime): Add keyvalue store spec and controller (#215)
samuelattwood Jan 7, 2025
60d50f8
Add initial object store types. Update Stream config and reorg to kee…
samuelattwood Jan 7, 2025
f15f695
Add ObjectStore tests and remaining options
samuelattwood Jan 7, 2025
726b0d5
Add test for sealed stream option
samuelattwood Jan 7, 2025
180099a
Add Account Controller (#224)
samuelattwood Jan 16, 2025
af63722
Deps
samuelattwood Jan 16, 2025
4b8a530
Create configured cache dir if DNE
samuelattwood Jan 21, 2025
aee234b
Move stream controller to jsm.go for pedantic mode
samuelattwood Jan 21, 2025
f9e8121
Move consumer controller to jsm.go for pedantic mode
samuelattwood Jan 21, 2025
b0f8736
Remove debug log entry
samuelattwood Jan 21, 2025
5db0cff
deps
samuelattwood Jan 24, 2025
ad4cf12
Improve connection config priority. Add missing option from consumer …
samuelattwood Jan 31, 2025
18dec43
Deps. Fix placement config enforcement
samuelattwood Jan 31, 2025
6ddcced
Bump jsm.go. Fix typo
samuelattwood Jan 31, 2025
577ce79
Log diff on resource update
samuelattwood Jan 31, 2025
ad9693f
Merge branch 'main' into feature/controller-runtime
samuelattwood Feb 18, 2025
fc8ad13
Improve README. Modernize examples.
samuelattwood Feb 18, 2025
7a196bb
Avoid excess disk writes to cache directory. README tweaks.
samuelattwood Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
/nats-boot-config
/nats-boot-config.docker
/tools
/bin
/.idea
67 changes: 50 additions & 17 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export GO111MODULE := on

SHELL=/usr/bin/env bash

ENVTEST_K8S_VERSION = 1.31.0

now := $(shell date -u +%Y-%m-%dT%H:%M:%S%z)
gitBranch := $(shell git rev-parse --abbrev-ref HEAD)
gitCommit := $(shell git rev-parse --short HEAD)
Expand All @@ -9,8 +13,7 @@ VERSION ?= version-not-set
linkerVars := -X main.BuildTime=$(now) -X main.GitInfo=$(gitBranch)-$(gitCommit)$(repoDirty) -X main.Version=$(VERSION)
drepo ?= natsio

jetstreamGenIn:= $(shell grep -l -R -F "// +k8s:" pkg/jetstream/apis)
jetstreamSrc := $(shell find cmd/jetstream-controller pkg/jetstream controllers/jetstream -name "*.go") pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go
jetstreamSrc := $(shell find cmd/jetstream-controller pkg/jetstream internal/controller controllers/jetstream -name "*.go") pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go

configReloaderSrc := $(shell find cmd/nats-server-config-reloader/ pkg/natsreloader/ -name "*.go")

Expand All @@ -27,20 +30,21 @@ default:
# make nats-server-config-reloader
# make nats-boot-config

pkg/jetstream/generated pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go: fetch-modules $(jetstreamGenIn) pkg/k8scodegen/file-header.txt
generate: fetch-modules pkg/k8scodegen/file-header.txt
rm -rf pkg/jetstream/generated
# Temporary chmod fix until we migrate to kube_codegen.sh
D="$(codeGeneratorDir)"; : "$${D:=`go list -m -f '{{.Dir}}' k8s.io/code-generator`}"; \
chmod u+x "$$D/generate-internal-groups.sh"; \
GOFLAGS='' bash "$$D/generate-groups.sh" all \
github.com/nats-io/nack/pkg/jetstream/generated \
github.com/nats-io/nack/pkg/jetstream/apis \
"jetstream:v1beta2" \
--output-base . \
--go-header-file pkg/k8scodegen/file-header.txt
mv github.com/nats-io/nack/pkg/jetstream/generated pkg/jetstream/generated
mv github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go
rm -rf github.com
source "$$D/kube_codegen.sh" ; \
kube::codegen::gen_helpers \
--boilerplate pkg/k8scodegen/file-header.txt \
pkg/jetstream/apis; \
kube::codegen::gen_client \
--with-watch \
--with-applyconfig \
--boilerplate pkg/k8scodegen/file-header.txt \
--output-dir pkg/jetstream/generated \
--output-pkg github.com/nats-io/nack/pkg/jetstream/generated \
--one-input-api jetstream/v1beta2 \
pkg/jetstream/apis

jetstream-controller: $(jetstreamSrc)
go build -race -o $@ \
Expand Down Expand Up @@ -173,10 +177,39 @@ fetch-modules:
.PHONY: build
build: jetstream-controller nats-server-config-reloader nats-boot-config

# Setup envtest tools based on a operator-sdk project makefile
LOCALBIN ?= $(shell pwd)/bin
$(LOCALBIN):
mkdir -p $(LOCALBIN)

# go-install-tool will 'go install' any package with custom target and name of binary, if it doesn't exist
# $1 - target path with name of binary (ideally with version)
# $2 - package url which can be installed
# $3 - specific version of package
define go-install-tool
@[ -f $(1) ] || { \
set -e; \
package=$(2)@$(3) ;\
echo "Downloading $${package}" ;\
GOBIN=$(LOCALBIN) go install $${package} ;\
mv "$$(echo "$(1)" | sed "s/-$(3)$$//")" $(1) ;\
}
endef

ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION)
ENVTEST_VERSION ?= release-0.19

.PHONY: envtest
envtest: $(ENVTEST) ## Download setup-envtest locally if necessary.
$(ENVTEST): $(LOCALBIN)
$(call go-install-tool,$(ENVTEST),sigs.k8s.io/controller-runtime/tools/setup-envtest,$(ENVTEST_VERSION))


.PHONY: test
test:
go vet ./controllers/... ./pkg/natsreloader/...
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/...
test: envtest
go vet ./controllers/... ./pkg/natsreloader/... ./internal/controller/...
$(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path ## Get k8s binaries
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/... ./internal/controller/...

.PHONY: clean
clean:
Expand Down
2 changes: 1 addition & 1 deletion cicd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#syntax=docker/dockerfile-upstream:1.5
#syntax=docker/dockerfile-upstream:1.13
ARG GO_APP

FROM alpine:3.21.2 as deps
Expand Down
2 changes: 1 addition & 1 deletion cicd/Dockerfile_goreleaser
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#syntax=docker/dockerfile-upstream:1.5
#syntax=docker/dockerfile-upstream:1.13
FROM --platform=$BUILDPLATFORM golang:1.23.5-bullseye as build

RUN <<EOT
Expand Down
112 changes: 96 additions & 16 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ import (
"syscall"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/nats-io/nack/controllers/jetstream"
"github.com/nats-io/nack/internal/controller"
v1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
clientset "github.com/nats-io/nack/pkg/jetstream/generated/clientset/versioned"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
)

var (
Expand All @@ -49,7 +55,10 @@ func main() {

func run() error {
klog.InitFlags(nil)
kubeConfig := flag.String("kubeconfig", "", "Path to kubeconfig")

// Explicitly register controller-runtime flags
ctrl.RegisterFlags(nil)

namespace := flag.String("namespace", v1.NamespaceAll, "Restrict to a namespace")
version := flag.Bool("version", false, "Print the version and exit")
creds := flag.String("creds", "", "NATS Credentials")
Expand All @@ -59,9 +68,13 @@ func run() error {
ca := flag.String("tlsca", "", "NATS TLS certificate authority chain")
tlsfirst := flag.Bool("tlsfirst", false, "If enabled, forces explicit TLS without waiting for Server INFO")
server := flag.String("s", "", "NATS Server URL")
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config")
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config. Ignored if running with control loop, CRD options will always override global config")
cleanupPeriod := flag.Duration("cleanup-period", 30*time.Second, "Period to run object cleanup")
readOnly := flag.Bool("read-only", false, "Starts the controller without causing changes to the NATS resources")
cacheDir := flag.String("cache-dir", "", "Directory to store cached credential and TLS files")
controlLoop := flag.Bool("control-loop", false, "Experimental: Run controller with a full reconciliation control loop.")
controlLoopSyncInterval := flag.Duration("sync-interval", 5*time.Minute, "Interval to perform scheduled reconcile")

flag.Parse()

if *version {
Expand All @@ -73,18 +86,37 @@ func run() error {
return errors.New("NATS Server URL is required")
}

var config *rest.Config
var err error
if *kubeConfig == "" {
config, err = rest.InClusterConfig()
if err != nil {
return err
config, err := ctrl.GetConfig()
if err != nil {
return fmt.Errorf("get kubernetes rest config: %w", err)
}

if *controlLoop {
klog.Warning("Starting JetStream controller in experimental control loop mode")

natsCfg := &controller.NatsConfig{
ClientName: "jetstream-controller",
Credentials: *creds,
NKey: *nkey,
ServerURL: *server,
CAs: []string{},
Certificate: *cert,
Key: *key,
TLSFirst: *tlsfirst,
}
} else {
config, err = clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
return err

if *ca != "" {
natsCfg.CAs = []string{*ca}
}

controllerCfg := &controller.Config{
ReadOnly: *readOnly,
Namespace: *namespace,
CacheDir: *cacheDir,
RequeueInterval: *controlLoopSyncInterval,
}

return runControlLoop(config, natsCfg, controllerCfg)
}

// K8S API Client.
Expand Down Expand Up @@ -129,6 +161,54 @@ func run() error {
return ctrl.Run()
}

func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, controllerCfg *controller.Config) error {
// Setup scheme
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))

log.SetLogger(klog.NewKlogr())

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
Logger: log.Log,
})
if err != nil {
return fmt.Errorf("unable to start manager: %w", err)
}

if controllerCfg.CacheDir == "" {
cacheDir, err := os.MkdirTemp(".", "nack")
if err != nil {
return fmt.Errorf("create cache dir: %w", err)
}
defer os.RemoveAll(cacheDir)
controllerCfg.CacheDir = cacheDir
} else {
if _, err := os.Stat(controllerCfg.CacheDir); os.IsNotExist(err) {
err = os.MkdirAll(controllerCfg.CacheDir, 0o755)
if err != nil {
return fmt.Errorf("create cache dir: %w", err)
}
}
}

err = controller.RegisterAll(mgr, natsCfg, controllerCfg)
if err != nil {
return fmt.Errorf("register jetstream controllers: %w", err)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return fmt.Errorf("unable to set up health check: %w", err)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
return fmt.Errorf("unable to set up ready check: %w", err)
}

klog.Info("starting manager")
return mgr.Start(ctrl.SetupSignalHandler())
}

func handleSignals(cancel context.CancelFunc) {
sigc := make(chan os.Signal, 2)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
Expand Down
8 changes: 4 additions & 4 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts = append(opts, jsm.AcknowledgeExplicit())
case "":
default:
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'.", spec.AckPolicy)
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'", spec.AckPolicy)
}

if spec.AckWait != "" {
Expand All @@ -262,7 +262,7 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts = append(opts, jsm.ReplayAsReceived())
case "":
default:
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'.", spec.ReplayPolicy)
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'", spec.ReplayPolicy)
}

if spec.SampleFreq != "" {
Expand Down Expand Up @@ -354,7 +354,7 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf
sc := s.DeepCopy()

sc.Status.ObservedGeneration = s.Generation
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down Expand Up @@ -382,7 +382,7 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume
}

sc := s.DeepCopy()
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
Loading