diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9681a2f2..50439cbd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -109,7 +109,9 @@ jobs: uses: actions/upload-artifact@v4 with: name: controller-test-cover - path: ./controller/cover.out + path: | + ./controller/cover.out + ./controller/cover_mcp_output.out coverage: timeout-minutes: 10 @@ -124,7 +126,10 @@ jobs: uses: codecov/codecov-action@v4.1.0 with: fail_ci_if_error: true - file: ./cover.out,plugins/tests/integration/cover.out,./controller/cover.out + files: ./unit-test-cover/cover.out, + ./plugins-integration-test-cover/cover.out, + ./controller-test-cover/cover.out, + ./controller-test-cover/cover_mcp_output.out token: ${{ secrets.CODECOV_TOKEN }} verbose: true diff --git a/controller/Dockerfile b/controller/Dockerfile index decdd798..17b92182 100644 --- a/controller/Dockerfile +++ b/controller/Dockerfile @@ -14,7 +14,7 @@ # Dockerfile has specific requirement: https://docs.docker.com/engine/reference/builder/#understand-how-arg-and-from-interact ARG CONTROLLER_BASE_IMAGE -# Build the manager binary +# Build the controller binary FROM golang:1.21 as builder ARG TARGETOS ARG TARGETARCH @@ -42,11 +42,11 @@ COPY controller/ controller/ # was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO # the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore, # by leaving it empty we can ensure that the container and binary shipped on it will have the same platform. -RUN cd controller && CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o /workspace/manager cmd/main.go +RUN cd controller && CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -tags ctrl -a -o /workspace/bin cmd/main.go FROM ${CONTROLLER_BASE_IMAGE} WORKDIR / -COPY --from=builder /workspace/manager . +COPY --from=builder /workspace/bin ./controller USER 65532:65532 -ENTRYPOINT ["/manager"] +ENTRYPOINT ["/controller"] diff --git a/controller/Makefile b/controller/Makefile index 4121cc61..65ef2838 100644 --- a/controller/Makefile +++ b/controller/Makefile @@ -79,6 +79,9 @@ generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and test: manifests generate envtest ## Run tests. KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -gcflags="all=-N -l" -race \ ./... -coverprofile cover.out -covermode=atomic -coverpkg=./... +# rerun controller integration test with different output + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -tags mcp_output -gcflags="all=-N -l" -race \ + ./tests/integration/controller -coverprofile cover_mcp_output.out -covermode=atomic -coverpkg=./... .PHONY: benchmark benchmark: manifests generate envtest ## Run benchmarks diff --git a/controller/cmd/main.go b/controller/cmd/main.go index 443ebbd6..cb631097 100644 --- a/controller/cmd/main.go +++ b/controller/cmd/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "fmt" "os" istioscheme "istio.io/client-go/pkg/clientset/versioned/scheme" @@ -34,7 +35,9 @@ import ( mosniov1 "mosn.io/htnn/controller/api/v1" "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" + controlleroutput "mosn.io/htnn/controller/internal/controller/output" "mosn.io/htnn/controller/internal/registry" + "mosn.io/htnn/controller/pkg/procession" "mosn.io/htnn/pkg/log" ) @@ -69,8 +72,17 @@ func main() { flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + + var outputDest string + flag.StringVar(&outputDest, "output", "k8s", "The output destination of reconciliation result, mcp or k8s. Default is k8s.") + flag.Parse() + if outputDest != "mcp" && outputDest != "k8s" { + setupLog.Error(fmt.Errorf("unknown output: %s", outputDest), "unable to start") + os.Exit(1) + } + ctrl.SetLogger(log.DefaultLogger) config.Init() @@ -103,9 +115,22 @@ func main() { os.Exit(1) } + ctx := ctrl.SetupSignalHandler() + var output procession.Output + if outputDest == "k8s" { + output = controlleroutput.NewK8sOutput(mgr.GetClient()) + } else { + output, err = controlleroutput.NewMcpOutput(ctx) + if err != nil { + setupLog.Error(err, "unable to new mcp output") + os.Exit(1) + } + } + if err = (&controller.HTTPFilterPolicyReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + Output: output, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "HTTPFilterPolicy") os.Exit(1) @@ -113,13 +138,14 @@ func main() { if err = (&controller.ConsumerReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + Output: output, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Consumer") os.Exit(1) } registry.InitRegistryManager(®istry.RegistryManagerOption{ - Client: mgr.GetClient(), + Output: output, }) if err = (&controller.ServiceRegistryReconciler{ Client: mgr.GetClient(), @@ -159,7 +185,7 @@ func main() { } setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/controller/internal/config/config.go b/controller/internal/config/config.go index e7d01fc8..db3888f0 100644 --- a/controller/internal/config/config.go +++ b/controller/internal/config/config.go @@ -24,6 +24,13 @@ var ( logger = log.DefaultLogger.WithName("config") ) +// istio's xds server listen address plus 100 +var mcpServerListenAddress = "127.0.0.1:15110" + +func McpServerListenAddress() string { + return mcpServerListenAddress +} + func GoSoPath() string { return "/etc/libgolang.so" } @@ -54,4 +61,9 @@ func Init() { if cfgRootNamespace != "" { rootNamespace = cfgRootNamespace } + + addr := viper.GetString("mcp.listen") + if addr != "" { + mcpServerListenAddress = addr + } } diff --git a/controller/internal/config/config_test.go b/controller/internal/config/config_test.go index ac709255..57e47899 100644 --- a/controller/internal/config/config_test.go +++ b/controller/internal/config/config_test.go @@ -26,9 +26,11 @@ func TestInit(t *testing.T) { // Check default values assert.Equal(t, "istio-system", RootNamespace()) + assert.Equal(t, "127.0.0.1:15110", McpServerListenAddress()) viper.AddConfigPath("./testdata") Init() assert.Equal(t, "htnn", RootNamespace()) + assert.Equal(t, ":9989", McpServerListenAddress()) } diff --git a/controller/internal/config/testdata/config.yaml b/controller/internal/config/testdata/config.yaml index 93526a3f..b1c32458 100644 --- a/controller/internal/config/testdata/config.yaml +++ b/controller/internal/config/testdata/config.yaml @@ -1,2 +1,4 @@ istio: rootNamespace: htnn +mcp: + listen: ":9989" diff --git a/controller/internal/controller/consumer_controller.go b/controller/internal/controller/consumer_controller.go index 9763d5ca..66de6b31 100644 --- a/controller/internal/controller/consumer_controller.go +++ b/controller/internal/controller/consumer_controller.go @@ -21,7 +21,6 @@ import ( "fmt" "github.com/go-logr/logr" - istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -36,18 +35,16 @@ import ( "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/istio" "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" ) // ConsumerReconciler reconciles a Consumer object type ConsumerReconciler struct { client.Client Scheme *runtime.Scheme + Output procession.Output } -const ( - ConsumerEnvoyFilterName = "htnn-consumer" -) - //+kubebuilder:rbac:groups=mosn.io,resources=consumers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=mosn.io,resources=consumers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=mosn.io,resources=consumers/finalizers,verbs=update @@ -68,7 +65,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - err = r.generateCustomResource(ctx, &logger, state) + err = r.generateCustomResource(ctx, state) if err != nil { return ctrl.Result{}, err } @@ -120,7 +117,7 @@ func (r *ConsumerReconciler) consumersToState(ctx context.Context, logger *logr. return state, nil } -func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, logger *logr.Logger, state *consumerReconcileState) error { +func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, state *consumerReconcileState) error { consumerData := map[string]interface{}{} for ns, consumers := range state.namespaceToConsumers { data := make(map[string]interface{}, len(consumers)) @@ -137,49 +134,12 @@ func (r *ConsumerReconciler) generateCustomResource(ctx context.Context, logger ef := istio.GenerateConsumers(consumerData) ef.Namespace = config.RootNamespace() - ef.Name = ConsumerEnvoyFilterName + ef.Name = model.ConsumerEnvoyFilterName if ef.Labels == nil { ef.Labels = map[string]string{} } ef.Labels[model.LabelCreatedBy] = "Consumer" - - nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} - var envoyfilters istiov1a3.EnvoyFilterList - if err := r.List(ctx, &envoyfilters, client.MatchingLabels{model.LabelCreatedBy: "Consumer"}); err != nil { - return fmt.Errorf("failed to list EnvoyFilter: %w", err) - } - - var envoyfilter *istiov1a3.EnvoyFilter - for _, e := range envoyfilters.Items { - if e.Namespace != nsName.Namespace || e.Name != nsName.Name { - logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) - - if err := r.Delete(ctx, e); err != nil { - return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", - err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) - } - } else { - envoyfilter = e - } - } - - if envoyfilter == nil { - logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - - if err := r.Create(ctx, ef.DeepCopy()); err != nil { - return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - } else { - logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - - ef = ef.DeepCopy() - ef.SetResourceVersion(envoyfilter.ResourceVersion) - if err := r.Update(ctx, ef); err != nil { - return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - } - - return nil + return r.Output.FromConsumer(ctx, ef) } func (r *ConsumerReconciler) updateConsumers(ctx context.Context, consumers *mosniov1.ConsumerList) error { diff --git a/controller/internal/controller/httpfilterpolicy_controller.go b/controller/internal/controller/httpfilterpolicy_controller.go index 32d9fef3..aa62581a 100644 --- a/controller/internal/controller/httpfilterpolicy_controller.go +++ b/controller/internal/controller/httpfilterpolicy_controller.go @@ -25,8 +25,6 @@ import ( "time" "github.com/go-logr/logr" - "google.golang.org/protobuf/proto" - istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" @@ -43,17 +41,17 @@ import ( gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" mosniov1 "mosn.io/htnn/controller/api/v1" - "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/k8s" "mosn.io/htnn/controller/internal/metrics" - "mosn.io/htnn/controller/internal/model" "mosn.io/htnn/controller/internal/translation" + "mosn.io/htnn/controller/pkg/procession" ) // HTTPFilterPolicyReconciler reconciles a HTTPFilterPolicy object type HTTPFilterPolicyReconciler struct { client.Client Scheme *runtime.Scheme + Output procession.Output istioGatewayIndexer *IstioGatewayIndexer k8sGatewayIndexer *K8sGatewayIndexer @@ -102,7 +100,7 @@ func (r *HTTPFilterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Req // We can add a configured concurrency to write to API server in parallel, if // the performance is not good. Note that the API server probably has rate limit. - err = r.translationStateToCustomResource(ctx, &logger, finalState) + err = r.translationStateToCustomResource(ctx, finalState) if err != nil { return ctrl.Result{}, err } @@ -348,65 +346,11 @@ func (r *HTTPFilterPolicyReconciler) policyToTranslationState(ctx context.Contex return initState, nil } -func fillEnvoyFilterMeta(ef *istiov1a3.EnvoyFilter) { - ef.Namespace = config.RootNamespace() - if ef.Labels == nil { - ef.Labels = map[string]string{} - } - ef.Labels[model.LabelCreatedBy] = "HTTPFilterPolicy" -} - -func (r *HTTPFilterPolicyReconciler) translationStateToCustomResource(ctx context.Context, logger *logr.Logger, +func (r *HTTPFilterPolicyReconciler) translationStateToCustomResource(ctx context.Context, finalState *translation.FinalState) error { - var envoyfilters istiov1a3.EnvoyFilterList - if err := r.List(ctx, &envoyfilters, - client.MatchingLabels{model.LabelCreatedBy: "HTTPFilterPolicy"}, - ); err != nil { - return fmt.Errorf("failed to list EnvoyFilter: %w", err) - } - - preEnvoyFilterMap := make(map[string]*istiov1a3.EnvoyFilter, len(envoyfilters.Items)) - for _, e := range envoyfilters.Items { - if _, ok := finalState.EnvoyFilters[e.Name]; !ok || e.Namespace != config.RootNamespace() { - logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) - if err := r.Delete(ctx, e); err != nil { - return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", - err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) - } - } else { - preEnvoyFilterMap[e.Name] = e - } - } - - for _, ef := range finalState.EnvoyFilters { - envoyfilter, ok := preEnvoyFilterMap[ef.Name] - if !ok { - logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - fillEnvoyFilterMeta(ef) - - if err := r.Create(ctx, ef); err != nil { - nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} - return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - - } else { - if proto.Equal(&envoyfilter.Spec, &ef.Spec) { - continue - } - - logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) - fillEnvoyFilterMeta(ef) - // Address metadata.resourceVersion: Invalid value: 0x0 error - ef.SetResourceVersion(envoyfilter.ResourceVersion) - if err := r.Update(ctx, ef); err != nil { - nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} - return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) - } - } - } - - return nil + generatedEnvoyFilters := finalState.EnvoyFilters + return r.Output.FromHTTPFilterPolicy(ctx, generatedEnvoyFilters) } func (r *HTTPFilterPolicyReconciler) updatePolicies(ctx context.Context, diff --git a/controller/internal/controller/output/k8s_output.go b/controller/internal/controller/output/k8s_output.go new file mode 100644 index 00000000..673455bc --- /dev/null +++ b/controller/internal/controller/output/k8s_output.go @@ -0,0 +1,155 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "google.golang.org/protobuf/proto" + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" + "mosn.io/htnn/pkg/log" +) + +type k8sOutput struct { + client.Client + logger logr.Logger + + serviceEntrySyncer *serviceEntrySyncer +} + +func NewK8sOutput(c client.Client) procession.Output { + o := &k8sOutput{ + Client: c, + logger: log.DefaultLogger.WithName("k8s output"), + } + o.serviceEntrySyncer = newServiceEntrySyncer(c, &o.logger) + return o +} + +func fillEnvoyFilterMeta(ef *istiov1a3.EnvoyFilter) { + ef.Namespace = config.RootNamespace() + if ef.Labels == nil { + ef.Labels = map[string]string{} + } + ef.Labels[model.LabelCreatedBy] = "HTTPFilterPolicy" +} + +func (o *k8sOutput) FromHTTPFilterPolicy(ctx context.Context, generatedEnvoyFilters map[string]*istiov1a3.EnvoyFilter) error { + logger := o.logger + + var envoyfilters istiov1a3.EnvoyFilterList + if err := o.List(ctx, &envoyfilters, + client.MatchingLabels{model.LabelCreatedBy: "HTTPFilterPolicy"}, + ); err != nil { + return fmt.Errorf("failed to list EnvoyFilter: %w", err) + } + + preEnvoyFilterMap := make(map[string]*istiov1a3.EnvoyFilter, len(envoyfilters.Items)) + for _, e := range envoyfilters.Items { + if _, ok := generatedEnvoyFilters[e.Name]; !ok || e.Namespace != config.RootNamespace() { + logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) + if err := o.Delete(ctx, e); err != nil { + return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", + err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) + } + } else { + preEnvoyFilterMap[e.Name] = e + } + } + + for _, ef := range generatedEnvoyFilters { + envoyfilter, ok := preEnvoyFilterMap[ef.Name] + if !ok { + logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + fillEnvoyFilterMeta(ef) + + if err := o.Create(ctx, ef); err != nil { + nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} + return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + + } else { + if proto.Equal(&envoyfilter.Spec, &ef.Spec) { + continue + } + + logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + fillEnvoyFilterMeta(ef) + // Address metadata.resourceVersion: Invalid value: 0x0 error + ef.SetResourceVersion(envoyfilter.ResourceVersion) + if err := o.Update(ctx, ef); err != nil { + nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} + return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + } + } + + return nil +} + +func (o *k8sOutput) FromConsumer(ctx context.Context, ef *istiov1a3.EnvoyFilter) error { + logger := o.logger + + nsName := types.NamespacedName{Name: ef.Name, Namespace: ef.Namespace} + var envoyfilters istiov1a3.EnvoyFilterList + if err := o.List(ctx, &envoyfilters, client.MatchingLabels{model.LabelCreatedBy: "Consumer"}); err != nil { + return fmt.Errorf("failed to list EnvoyFilter: %w", err) + } + + var envoyfilter *istiov1a3.EnvoyFilter + for _, e := range envoyfilters.Items { + if e.Namespace != nsName.Namespace || e.Name != nsName.Name { + logger.Info("delete EnvoyFilter", "name", e.Name, "namespace", e.Namespace) + + if err := o.Delete(ctx, e); err != nil { + return fmt.Errorf("failed to delete EnvoyFilter: %w, namespacedName: %v", + err, types.NamespacedName{Name: e.Name, Namespace: e.Namespace}) + } + } else { + envoyfilter = e + } + } + + if envoyfilter == nil { + logger.Info("create EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + + if err := o.Create(ctx, ef.DeepCopy()); err != nil { + return fmt.Errorf("failed to create EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + } else { + logger.Info("update EnvoyFilter", "name", ef.Name, "namespace", ef.Namespace) + + ef = ef.DeepCopy() + ef.SetResourceVersion(envoyfilter.ResourceVersion) + if err := o.Update(ctx, ef); err != nil { + return fmt.Errorf("failed to update EnvoyFilter: %w, namespacedName: %v", err, nsName) + } + } + + return nil +} + +func (o *k8sOutput) FromServiceRegistry(ctx context.Context, serviceEntries map[string]*istioapi.ServiceEntry) { + o.serviceEntrySyncer.Update(ctx, serviceEntries) +} diff --git a/controller/internal/controller/output/mcp_output.go b/controller/internal/controller/output/mcp_output.go new file mode 100644 index 00000000..228d04c9 --- /dev/null +++ b/controller/internal/controller/output/mcp_output.go @@ -0,0 +1,144 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "fmt" + "net" + "sync" + + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/anypb" + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" + "mosn.io/htnn/pkg/log" +) + +type mcpOutput struct { + logger *logr.Logger + + // envoyFilters will be updated by multiple sources + envoyFilters sync.Map + + mcp *mcpServer +} + +func NewMcpOutput(ctx context.Context) (procession.Output, error) { + logger := log.DefaultLogger.WithName("mcp output") + s := grpc.NewServer() + + srv := NewMcpServer(&logger) + + discovery.RegisterAggregatedDiscoveryServiceServer(s, srv) + + addr := config.McpServerListenAddress() + logger.Info("listening as mcp server", "address", addr) + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen: %w", err) + } + + go func() { + ctx, cancel := context.WithCancel(ctx) + + go func() { + defer cancel() + + logger.Info("starting mcp server") + + err := s.Serve(l) + if err != nil { + logger.Error(err, "mcp server failed") + } + }() + + <-ctx.Done() + logger.Info("stopping mcp server") + srv.CloseSubscribers() + s.GracefulStop() + logger.Info("mcp server stopped") + }() + + return &mcpOutput{ + logger: &logger, + mcp: srv, + }, nil +} + +type configSource int + +const ( + configSourceHTTPFilterPolicy configSource = iota + configSourceConsumer +) + +func (o *mcpOutput) FromHTTPFilterPolicy(ctx context.Context, generatedEnvoyFilters map[string]*istiov1a3.EnvoyFilter) error { + return o.writeEnvoyFilters(configSourceHTTPFilterPolicy, generatedEnvoyFilters) +} + +func (o *mcpOutput) FromConsumer(ctx context.Context, ef *istiov1a3.EnvoyFilter) error { + return o.writeEnvoyFilters(configSourceConsumer, map[string]*istiov1a3.EnvoyFilter{ + model.ConsumerEnvoyFilterName: ef, + }) +} + +func (o *mcpOutput) writeEnvoyFilters(src configSource, filters map[string]*istiov1a3.EnvoyFilter) error { + // Store the converted Any directly can save memory, but we keep the original EnvoyFilter here + // so that we can add observability in the future. + o.envoyFilters.Store(src, filters) + ress := make([]*anypb.Any, 0, len(filters)*2) + ok := true + o.envoyFilters.Range(func(_, value interface{}) bool { + efs := value.(map[string]*istiov1a3.EnvoyFilter) + for name, ef := range efs { + res, err := MarshalToMcpPb(name, &ef.ObjectMeta, &ef.Spec) + if err != nil { + o.logger.Error(err, "failed to marshal EnvoyFilter", "name", name) + // do not push partial configuration, this may cause service unavailable + ok = false + return false + } + ress = append(ress, res) + } + return true + }) + + if ok { + o.mcp.UpdateEnvoyFilters(ress) + } + return nil +} + +func (o *mcpOutput) FromServiceRegistry(ctx context.Context, serviceEntries map[string]*istioapi.ServiceEntry) { + ress := make([]*anypb.Any, 0, len(serviceEntries)) + for name, se := range serviceEntries { + res, err := MarshalToMcpPb(name, nil, se) + if err != nil { + o.logger.Error(err, "failed to marshal ServiceEntry", "name", name) + // do not push partial configuration, this may cause service unavailable + return + } + ress = append(ress, res) + } + + o.mcp.UpdateServiceEntries(ress) +} diff --git a/controller/internal/controller/output/mcp_server.go b/controller/internal/controller/output/mcp_server.go new file mode 100644 index 00000000..cd311436 --- /dev/null +++ b/controller/internal/controller/output/mcp_server.go @@ -0,0 +1,196 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "fmt" + "os" + "strconv" + "sync" + "sync/atomic" + "time" + + envoycfgcorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/go-logr/logr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + mcpapi "istio.io/api/mcp/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "mosn.io/htnn/controller/internal/config" +) + +func MarshalToMcpPb(name string, meta *metav1.ObjectMeta, src proto.Message) (*anypb.Any, error) { + body := &anypb.Any{} + if err := anypb.MarshalFrom(body, src, proto.MarshalOptions{}); err != nil { + return nil, fmt.Errorf("failed to marshal mcp body: %w", err) + } + + ns := config.RootNamespace() + mcpRes := &mcpapi.Resource{ + Metadata: &mcpapi.Metadata{ + Name: fmt.Sprintf("%s/%s", ns, name), + }, + Body: body, + } + + if meta != nil { + mcpRes.Metadata.Labels = meta.Labels + mcpRes.Metadata.Annotations = meta.Annotations + } + + pb := &anypb.Any{} + if err := anypb.MarshalFrom(pb, mcpRes, proto.MarshalOptions{}); err != nil { + return nil, fmt.Errorf("failed to marshal mcp resource: %w", err) + } + + return pb, nil +} + +type ( + DiscoveryStream = discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer + DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer +) + +type mcpServer struct { + logger *logr.Logger + + subscribers sync.Map + nextSubscriberID atomic.Uint64 + + resourceLock sync.RWMutex + envoyFilters []*anypb.Any + serviceEntries []*anypb.Any +} + +func NewMcpServer(logger *logr.Logger) *mcpServer { + return &mcpServer{ + logger: logger, + } +} + +type subscriber struct { + id uint64 + + stream DiscoveryStream + closeStream func() +} + +func (srv *mcpServer) UpdateEnvoyFilters(envoyFilters []*anypb.Any) { + srv.resourceLock.Lock() + srv.envoyFilters = envoyFilters + srv.resourceLock.Unlock() + go func() { + typeUrl := "networking.istio.io/v1alpha3/EnvoyFilter" + srv.sendToSubscribers(typeUrl, envoyFilters) + }() +} + +func (srv *mcpServer) UpdateServiceEntries(serviceEntries []*anypb.Any) { + srv.resourceLock.Lock() + srv.serviceEntries = serviceEntries + srv.resourceLock.Unlock() + go func() { + typeUrl := "networking.istio.io/v1beta1/ServiceEntry" + srv.sendToSubscribers(typeUrl, serviceEntries) + }() +} + +func (srv *mcpServer) send(sub *subscriber, typeUrl string, mcpResources []*anypb.Any) { + if err := sub.stream.Send(&discovery.DiscoveryResponse{ + TypeUrl: typeUrl, + VersionInfo: strconv.FormatInt(time.Now().UnixNano(), 10), + Resources: mcpResources, + ControlPlane: &envoycfgcorev3.ControlPlane{ + Identifier: os.Getenv("POD_NAME"), + }, + }); err != nil { + id := sub.id + srv.logger.Error(err, "failed to send to subscriber", "id", id) + // let Istio to retry + sub.closeStream() + srv.subscribers.Delete(id) + } +} + +func (srv *mcpServer) sendToSubscribers(typeUrl string, mcpResources []*anypb.Any) { + srv.resourceLock.Lock() + defer srv.resourceLock.Unlock() + + srv.subscribers.Range(func(key, value any) bool { + srv.logger.Info("sending to subscriber", "id", key, "typeUrl", typeUrl, "length", len(mcpResources)) + srv.send(value.(*subscriber), typeUrl, mcpResources) + return true + }) +} + +func (srv *mcpServer) CloseSubscribers() { + srv.subscribers.Range(func(key, value any) bool { + srv.logger.Info("close subscriber", "id", key) + value.(*subscriber).closeStream() + srv.subscribers.Delete(key) + + return true + }) +} + +func (srv *mcpServer) initSubscriberResource(sub *subscriber) { + srv.resourceLock.Lock() + defer srv.resourceLock.Unlock() + + srv.logger.Info("sending initial conf to subscriber", "id", sub.id, + "EnvoyFilter", len(srv.envoyFilters), "ServiceEntry", len(srv.serviceEntries)) + if len(srv.envoyFilters) > 0 { + typeUrl := "networking.istio.io/v1alpha3/EnvoyFilter" + srv.send(sub, typeUrl, srv.envoyFilters) + } + if len(srv.serviceEntries) > 0 { + typeUrl := "networking.istio.io/v1beta1/ServiceEntry" + srv.send(sub, typeUrl, srv.serviceEntries) + } +} + +// Implement discovery.AggregatedDiscoveryServiceServer + +func (srv *mcpServer) StreamAggregatedResources(downstream DiscoveryStream) error { + ctx, closeStream := context.WithCancel(downstream.Context()) + + sub := &subscriber{ + id: srv.nextSubscriberID.Add(1), + stream: downstream, + closeStream: closeStream, + } + srv.logger.Info("handle new subscriber", "id", sub.id) + + srv.subscribers.Store(sub.id, sub) + + go func() { + srv.initSubscriberResource(sub) + }() + + <-ctx.Done() + return nil +} + +func (srv *mcpServer) DeltaAggregatedResources(downstream DeltaDiscoveryStream) error { + // By now, Istio doesn't support MCP over delta ads + return status.Errorf(codes.Unimplemented, "not implemented") +} + +var _ discovery.AggregatedDiscoveryServiceServer = (*mcpServer)(nil) diff --git a/controller/internal/controller/output/service_entry_syncer.go b/controller/internal/controller/output/service_entry_syncer.go new file mode 100644 index 00000000..028452ac --- /dev/null +++ b/controller/internal/controller/output/service_entry_syncer.go @@ -0,0 +1,182 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package output + +import ( + "context" + "sync" + "time" + + "github.com/go-logr/logr" + "google.golang.org/protobuf/proto" + istioapi "istio.io/api/networking/v1beta1" + istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/internal/model" +) + +type serviceEntrySyncer struct { + client client.Client + logger *logr.Logger + + lock sync.RWMutex + entries map[string]*istiov1b1.ServiceEntry + syncInterval time.Duration +} + +func newServiceEntrySyncer(c client.Client, logger *logr.Logger) *serviceEntrySyncer { + s := &serviceEntrySyncer{ + client: c, + logger: logger, + entries: make(map[string]*istiov1b1.ServiceEntry), + syncInterval: 20 * time.Second, + } + go s.Sync() + return s +} + +func (syncer *serviceEntrySyncer) getFromK8s(ctx context.Context, service string, se *istiov1b1.ServiceEntry) error { + err := syncer.client.Get(ctx, client.ObjectKey{ + Namespace: config.RootNamespace(), + Name: service, + }, se) + return err +} + +func (syncer *serviceEntrySyncer) deleteFromK8s(ctx context.Context, se *istiov1b1.ServiceEntry) { + c := syncer.client + syncer.logger.Info("delete ServiceEntry", "name", se.Name, "namespace", se.Namespace) + err := c.Delete(ctx, se) + if err != nil { + syncer.logger.Error(err, "failed to delete service entry from k8s", "service", se.Name) + return + } +} + +func (syncer *serviceEntrySyncer) addToK8s(ctx context.Context, service string, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { + c := syncer.client + ns := config.RootNamespace() + se := istiov1b1.ServiceEntry{ + Spec: *entry.DeepCopy(), + } + se.Namespace = ns + if se.Labels == nil { + se.Labels = map[string]string{} + } + se.Labels[model.LabelCreatedBy] = "ServiceRegistry" + se.Name = service + + syncer.logger.Info("create ServiceEntry", "name", service, "namespace", ns) + err := c.Create(ctx, &se) + if err != nil { + syncer.logger.Error(err, "failed to create service entry to k8s", "service", service) + } + + return &se +} + +func (syncer *serviceEntrySyncer) updateToK8s(ctx context.Context, se *istiov1b1.ServiceEntry, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { + if proto.Equal(&se.Spec, entry) { + return se + } + + c := syncer.client + syncer.logger.Info("update ServiceEntry", "name", se.Name, "namespace", se.Namespace) + se.SetResourceVersion(se.ResourceVersion) + se.Spec = *entry.DeepCopy() + if err := c.Update(ctx, se); err != nil { + syncer.logger.Error(err, "failed to update service entry to k8s", "service", se.Name) + return se + } + + return se +} + +func (syncer *serviceEntrySyncer) Update(ctx context.Context, entries map[string]*istioapi.ServiceEntry) { + syncer.lock.Lock() + defer syncer.lock.Unlock() + + var obj istiov1b1.ServiceEntry + for service, se := range syncer.entries { + if _, ok := entries[service]; !ok { + syncer.deleteFromK8s(ctx, se) + delete(syncer.entries, service) + } + } + + var latestServiceEntry *istiov1b1.ServiceEntry + for service, se := range entries { + if prev, ok := syncer.entries[service]; !ok { + if err := syncer.getFromK8s(ctx, service, &obj); err != nil { + if !apierrors.IsNotFound(err) { + syncer.logger.Error(err, "failed to get service entry from k8s", "service", service) + return + } + + latestServiceEntry = syncer.addToK8s(ctx, service, se) + } else { + latestServiceEntry = syncer.updateToK8s(ctx, &obj, se) + } + } else { + latestServiceEntry = syncer.updateToK8s(ctx, prev, se) + } + + syncer.entries[service] = latestServiceEntry + } +} + +func (syncer *serviceEntrySyncer) sync() { + syncer.lock.Lock() + defer syncer.lock.Unlock() + + c := syncer.client + ctx := context.Background() + var serviceEntries istiov1b1.ServiceEntryList + err := c.List(ctx, &serviceEntries, client.MatchingLabels{model.LabelCreatedBy: "ServiceRegistry"}) + if err != nil { + syncer.logger.Error(err, "failed to list service entries") + return + } + + persisted := make(map[string]*istiov1b1.ServiceEntry, len(serviceEntries.Items)) + for _, se := range serviceEntries.Items { + if _, ok := syncer.entries[se.Name]; !ok { + syncer.deleteFromK8s(ctx, se) + } else { + persisted[se.Name] = se + } + } + + for service, wrp := range syncer.entries { + entry := &wrp.Spec + if se, ok := persisted[service]; !ok { + syncer.addToK8s(ctx, service, entry) + } else { + syncer.updateToK8s(ctx, se, entry) + } + } +} + +func (syncer *serviceEntrySyncer) Sync() { + // We sync the service entries so we can retry if something wrong happened + ticker := time.NewTicker(syncer.syncInterval) + // For now we don't release the ticker + for range ticker.C { + syncer.sync() + } +} diff --git a/controller/internal/registry/store_test.go b/controller/internal/controller/output/service_entry_syncer_test.go similarity index 94% rename from controller/internal/registry/store_test.go rename to controller/internal/controller/output/service_entry_syncer_test.go index 2ac1df21..5d230d9c 100644 --- a/controller/internal/registry/store_test.go +++ b/controller/internal/controller/output/service_entry_syncer_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package registry +package output import ( "context" @@ -26,10 +26,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "mosn.io/htnn/controller/tests/pkg" + "mosn.io/htnn/pkg/log" ) func TestSync(t *testing.T) { - store := newServiceEntryStore(pkg.FakeK8sClient(t)) + logger := log.DefaultLogger.WithName("test") + store := newServiceEntrySyncer(pkg.FakeK8sClient(t), &logger) var created, updated, deleted bool diff --git a/controller/internal/model/constant.go b/controller/internal/model/constant.go index 7f1479e1..6d45692d 100644 --- a/controller/internal/model/constant.go +++ b/controller/internal/model/constant.go @@ -15,5 +15,6 @@ package model const ( - LabelCreatedBy = "htnn.mosn.io/created-by" + ConsumerEnvoyFilterName = "htnn-consumer" + LabelCreatedBy = "htnn.mosn.io/created-by" ) diff --git a/controller/internal/registry/registry.go b/controller/internal/registry/registry.go index b3a190b6..ae5d5021 100644 --- a/controller/internal/registry/registry.go +++ b/controller/internal/registry/registry.go @@ -16,9 +16,9 @@ package registry import ( "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" mosniov1 "mosn.io/htnn/controller/api/v1" + "mosn.io/htnn/controller/pkg/procession" pkgRegistry "mosn.io/htnn/controller/pkg/registry" "mosn.io/htnn/pkg/log" ) @@ -31,12 +31,11 @@ var ( ) type RegistryManagerOption struct { - Client client.Client + Output procession.Output } func InitRegistryManager(opt *RegistryManagerOption) { - store = newServiceEntryStore(opt.Client) - go store.Sync() + store = newServiceEntryStore(opt.Output) } func UpdateRegistry(registry *mosniov1.ServiceRegistry) error { diff --git a/controller/internal/registry/store.go b/controller/internal/registry/store.go index 35297e1e..9400024d 100644 --- a/controller/internal/registry/store.go +++ b/controller/internal/registry/store.go @@ -17,64 +17,40 @@ package registry import ( "context" "sync" - "time" - "google.golang.org/protobuf/proto" istioapi "istio.io/api/networking/v1beta1" - istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - "mosn.io/htnn/controller/internal/config" - "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" pkgRegistry "mosn.io/htnn/controller/pkg/registry" ) type serviceEntryStore struct { - client client.Client + output procession.Output lock sync.RWMutex - entries map[string]*istiov1b1.ServiceEntry - - syncInterval time.Duration + entries map[string]*istioapi.ServiceEntry } -func newServiceEntryStore(client client.Client) *serviceEntryStore { +func newServiceEntryStore(output procession.Output) *serviceEntryStore { return &serviceEntryStore{ - client: client, - entries: make(map[string]*istiov1b1.ServiceEntry), - syncInterval: 20 * time.Second, + output: output, + entries: make(map[string]*istioapi.ServiceEntry), } } +// Implement ServiceEntryStore interface + func (store *serviceEntryStore) Update(service string, se *pkgRegistry.ServiceEntryWrapper) { store.lock.Lock() defer store.lock.Unlock() + logger.Info("service entry store update", "service", service, "entry", &se.ServiceEntry) ctx := context.Background() - var obj istiov1b1.ServiceEntry - var latestServiceEntry *istiov1b1.ServiceEntry - - if prev, ok := store.entries[service]; !ok { - if err := store.getFromK8s(ctx, service, &obj); err != nil { - if !apierrors.IsNotFound(err) { - logger.Error(err, "failed to get service entry from k8s", "service", service) - return - } - - latestServiceEntry = store.addToK8s(ctx, service, &se.ServiceEntry) - } else { - latestServiceEntry = store.updateToK8s(ctx, &obj, &se.ServiceEntry) - } - } else { - latestServiceEntry = store.updateToK8s(ctx, prev, &se.ServiceEntry) - } + store.entries[service] = &se.ServiceEntry - store.entries[service] = latestServiceEntry + store.output.FromServiceRegistry(ctx, store.entries) } -// Implement ServiceEntryStore interface - func (store *serviceEntryStore) Delete(service string) { store.lock.Lock() defer store.lock.Unlock() @@ -83,111 +59,7 @@ func (store *serviceEntryStore) Delete(service string) { return } + logger.Info("service entry store delete", "service", service) delete(store.entries, service) - - ctx := context.Background() - var se istiov1b1.ServiceEntry - if err := store.getFromK8s(ctx, service, &se); err != nil { - logger.Error(err, "failed to get service entry from k8s", "service", service) - return - } - store.deleteFromK8s(ctx, &se) -} - -func (store *serviceEntryStore) getFromK8s(ctx context.Context, service string, se *istiov1b1.ServiceEntry) error { - err := store.client.Get(ctx, client.ObjectKey{ - Namespace: config.RootNamespace(), - Name: service, - }, se) - return err -} - -func (store *serviceEntryStore) deleteFromK8s(ctx context.Context, se *istiov1b1.ServiceEntry) { - c := store.client - logger.Info("delete ServiceEntry", "name", se.Name, "namespace", se.Namespace) - err := c.Delete(ctx, se) - if err != nil { - logger.Error(err, "failed to delete service entry from k8s", "service", se.Name) - return - } -} - -func (store *serviceEntryStore) addToK8s(ctx context.Context, service string, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { - c := store.client - ns := config.RootNamespace() - se := istiov1b1.ServiceEntry{ - Spec: *entry.DeepCopy(), - } - se.Namespace = ns - if se.Labels == nil { - se.Labels = map[string]string{} - } - se.Labels[model.LabelCreatedBy] = "ServiceRegistry" - se.Name = service - - logger.Info("create ServiceEntry", "name", service, "namespace", ns) - err := c.Create(ctx, &se) - if err != nil { - logger.Error(err, "failed to create service entry to k8s", "service", service) - } - - return &se -} - -func (store *serviceEntryStore) updateToK8s(ctx context.Context, se *istiov1b1.ServiceEntry, entry *istioapi.ServiceEntry) *istiov1b1.ServiceEntry { - if proto.Equal(&se.Spec, entry) { - return se - } - - c := store.client - logger.Info("update ServiceEntry", "name", se.Name, "namespace", se.Namespace) - se.SetResourceVersion(se.ResourceVersion) - se.Spec = *entry.DeepCopy() - if err := c.Update(ctx, se); err != nil { - logger.Error(err, "failed to update service entry to k8s", "service", se.Name) - return se - } - - return se -} - -func (store *serviceEntryStore) sync() { - store.lock.Lock() - defer store.lock.Unlock() - - c := store.client - ctx := context.Background() - var serviceEntries istiov1b1.ServiceEntryList - err := c.List(ctx, &serviceEntries, client.MatchingLabels{model.LabelCreatedBy: "ServiceRegistry"}) - if err != nil { - logger.Error(err, "failed to list service entries") - return - } - - persisted := make(map[string]*istiov1b1.ServiceEntry, len(serviceEntries.Items)) - for _, se := range serviceEntries.Items { - if _, ok := store.entries[se.Name]; !ok { - store.deleteFromK8s(ctx, se) - } else { - persisted[se.Name] = se - } - } - - for service, wrp := range store.entries { - entry := &wrp.Spec - if se, ok := persisted[service]; !ok { - store.addToK8s(ctx, service, entry) - } else { - store.updateToK8s(ctx, se, entry) - } - } -} - -func (store *serviceEntryStore) Sync() { - // We sync the service entries so we can retry if something wrong happened - ticker := time.NewTicker(store.syncInterval) - // For now we don't release the ticker - for range ticker.C { - store.sync() - } + store.output.FromServiceRegistry(context.Background(), store.entries) } diff --git a/controller/pkg/procession/output.go b/controller/pkg/procession/output.go new file mode 100644 index 00000000..6ab8d2a9 --- /dev/null +++ b/controller/pkg/procession/output.go @@ -0,0 +1,31 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package procession + +import ( + "context" + + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" +) + +type Output interface { + FromHTTPFilterPolicy(ctx context.Context, envoyFilters map[string]*istiov1a3.EnvoyFilter) error + FromConsumer(ctx context.Context, envoyFilter *istiov1a3.EnvoyFilter) error + // FromServiceRegistry writes the generated ServiceEntries to the output. Unlike the other generators, + // it assumes the write already succeed, and don't retry on error, + // so the output should handle the retry by themselves. That's why the error is not returned here. + FromServiceRegistry(ctx context.Context, serviceEntries map[string]*istioapi.ServiceEntry) +} diff --git a/controller/tests/benchmark/suite_test.go b/controller/tests/benchmark/suite_test.go index c829864a..7bffbe3e 100644 --- a/controller/tests/benchmark/suite_test.go +++ b/controller/tests/benchmark/suite_test.go @@ -50,6 +50,7 @@ import ( mosniov1 "mosn.io/htnn/controller/api/v1" "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" + controlleroutput "mosn.io/htnn/controller/internal/controller/output" ) var cfg *rest.Config @@ -178,9 +179,13 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + output, err := controlleroutput.NewMcpOutput(ctx) + Expect(err).ToNot(HaveOccurred()) + httpFilterPolicyReconciler = &controller.HTTPFilterPolicyReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + Output: output, } err = httpFilterPolicyReconciler.SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) diff --git a/controller/tests/integration/controller/consumer_controller_test.go b/controller/tests/integration/controller/consumer_controller_test.go index 2163b685..7abf8ab5 100644 --- a/controller/tests/integration/controller/consumer_controller_test.go +++ b/controller/tests/integration/controller/consumer_controller_test.go @@ -29,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" mosniov1 "mosn.io/htnn/controller/api/v1" + "mosn.io/htnn/controller/internal/model" "mosn.io/htnn/controller/tests/integration/helper" "mosn.io/htnn/controller/tests/pkg" ) @@ -38,6 +39,11 @@ func mustReadConsumer(fn string, out *[]map[string]interface{}) { helper.MustReadInput(fn, out) } +func listConsumerEnvoyFilters(ctx context.Context, c client.Client, envoyfilters *istiov1a3.EnvoyFilterList) error { + return c.List(ctx, envoyfilters, + client.MatchingLabels{model.LabelCreatedBy: "Consumer"}) +} + var _ = Describe("Consumer controller", func() { const ( @@ -55,7 +61,7 @@ var _ = Describe("Consumer controller", func() { } var envoyfilters istiov1a3.EnvoyFilterList - if err := k8sClient.List(ctx, &envoyfilters); err == nil { + if err := listConsumerEnvoyFilters(ctx, k8sClient, &envoyfilters); err == nil { for _, e := range envoyfilters.Items { pkg.DeleteK8sResource(ctx, k8sClient, e) } @@ -99,30 +105,41 @@ var _ = Describe("Consumer controller", func() { Expect(cs[0].Reason).To(Equal(string(mosniov1.ReasonAccepted))) var envoyfilters istiov1a3.EnvoyFilterList + marshaledCfg := map[string]map[string]map[string]interface{}{} Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listConsumerEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { + return false + } + if len(envoyfilters.Items) != 1 { + return false + } + ef := envoyfilters.Items[0] + if ef.Namespace != "istio-system" || ef.Name != "htnn-consumer" { return false } - return len(envoyfilters.Items) == 1 && envoyfilters.Items[0].Namespace == "istio-system" - }, timeout, interval).Should(BeTrue()) - ef := envoyfilters.Items[0] - Expect(ef.Namespace).To(Equal("istio-system")) - Expect(ef.Name).To(Equal("htnn-consumer")) - Expect(len(ef.Spec.ConfigPatches)).To(Equal(2)) - cp := ef.Spec.ConfigPatches[0] - Expect(cp.ApplyTo).To(Equal(istioapi.EnvoyFilter_EXTENSION_CONFIG)) - value := cp.Patch.Value.AsMap() - Expect(value["name"]).To(Equal("htnn-consumer")) - typedCfg := value["typed_config"].(map[string]interface{}) - pluginCfg := typedCfg["plugin_config"].(map[string]interface{}) + if len(ef.Spec.ConfigPatches) != 2 { + return false + } + cp := ef.Spec.ConfigPatches[0] + if cp.ApplyTo != istioapi.EnvoyFilter_EXTENSION_CONFIG { + return false + } + + value := cp.Patch.Value.AsMap() + if value["name"] != "htnn-consumer" { + return false + } + typedCfg := value["typed_config"].(map[string]interface{}) + pluginCfg := typedCfg["plugin_config"].(map[string]interface{}) + + b, _ := json.Marshal(pluginCfg["value"]) + json.Unmarshal(b, &marshaledCfg) + // mapping is namespace -> name -> config + return marshaledCfg["default"]["spacewander"] != nil && + marshaledCfg["default"]["unchanged"] != nil + }, timeout, interval).Should(BeTrue()) - marshaledCfg := map[string]map[string]map[string]interface{}{} - b, _ := json.Marshal(pluginCfg["value"]) - json.Unmarshal(b, &marshaledCfg) - // mapping is namespace -> name -> config - Expect(marshaledCfg["default"]["spacewander"]).ToNot(BeNil()) - Expect(marshaledCfg["default"]["unchanged"]).ToNot(BeNil()) d := marshaledCfg["default"]["spacewander"]["d"].(string) cfg := map[string]interface{}{} err := json.Unmarshal([]byte(d), &cfg) @@ -156,20 +173,22 @@ var _ = Describe("Consumer controller", func() { // EnvoyFilter should be updated too Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listConsumerEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } - return len(envoyfilters.Items) == 1 - }, timeout, interval).Should(BeTrue()) + if len(envoyfilters.Items) != 1 { + return false + } + value := envoyfilters.Items[0].Spec.ConfigPatches[0].Patch.Value.AsMap() + typedCfg := value["typed_config"].(map[string]interface{}) + pluginCfg := typedCfg["plugin_config"].(map[string]interface{}) - value = envoyfilters.Items[0].Spec.ConfigPatches[0].Patch.Value.AsMap() - typedCfg = value["typed_config"].(map[string]interface{}) - pluginCfg = typedCfg["plugin_config"].(map[string]interface{}) + marshaledCfg = map[string]map[string]map[string]interface{}{} + b, _ := json.Marshal(pluginCfg["value"]) + json.Unmarshal(b, &marshaledCfg) + return marshaledCfg["default"]["spacewander"] == nil + }, timeout, interval).Should(BeTrue()) - marshaledCfg = map[string]map[string]map[string]interface{}{} - b, _ = json.Marshal(pluginCfg["value"]) - json.Unmarshal(b, &marshaledCfg) - Expect(marshaledCfg["default"]["spacewander"]).To(BeNil()) Expect(marshaledCfg["default"]["unchanged"]).ToNot(BeNil()) Expect(marshaledCfg["default"]["unchanged"]["v"]).To(Equal(v)) }) @@ -186,7 +205,7 @@ var _ = Describe("Consumer controller", func() { var envoyfilters istiov1a3.EnvoyFilterList marshaledCfg := map[string]map[string]map[string]interface{}{} Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listConsumerEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } if len(envoyfilters.Items) != 1 { @@ -219,6 +238,7 @@ var _ = Describe("Consumer controller", func() { cfg := map[string]interface{}{} err := json.Unmarshal([]byte(d), &cfg) Expect(err).To(BeNil()) + Expect(cfg["filters"]).ToNot(BeNil()) filter := cfg["filters"].(map[string]interface{}) Expect(filter["demo"]).ToNot(BeNil()) }) diff --git a/controller/tests/integration/controller/httpfilterpolicy_controller_test.go b/controller/tests/integration/controller/httpfilterpolicy_controller_test.go index b2942bc3..62a744fc 100644 --- a/controller/tests/integration/controller/httpfilterpolicy_controller_test.go +++ b/controller/tests/integration/controller/httpfilterpolicy_controller_test.go @@ -32,6 +32,7 @@ import ( gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2" mosniov1 "mosn.io/htnn/controller/api/v1" + "mosn.io/htnn/controller/internal/model" "mosn.io/htnn/controller/tests/integration/helper" "mosn.io/htnn/controller/tests/pkg" ) @@ -41,6 +42,11 @@ func mustReadHTTPFilterPolicy(fn string, out *[]map[string]interface{}) { helper.MustReadInput(fn, out) } +func listHFPEnvoyFilters(ctx context.Context, c client.Client, envoyfilters *istiov1a3.EnvoyFilterList) error { + return c.List(ctx, envoyfilters, + client.MatchingLabels{model.LabelCreatedBy: "HTTPFilterPolicy"}) +} + func attachGateway(ctx context.Context, httpRoute *gwapiv1.HTTPRoute, gwName string) { httpRoute.Status.Parents = []gwapiv1.RouteParentStatus{ { @@ -178,7 +184,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { } var envoyfilters istiov1a3.EnvoyFilterList - if err := k8sClient.List(ctx, &envoyfilters); err == nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err == nil { for _, e := range envoyfilters.Items { pkg.DeleteK8sResource(ctx, k8sClient, e) } @@ -216,7 +222,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { var envoyfilters istiov1a3.EnvoyFilterList Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -253,7 +259,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err := k8sClient.Update(ctx, virtualService) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -264,7 +270,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err = k8sClient.Update(ctx, virtualService) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -273,7 +279,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { // delete virtualservice referred by httpfilterpolicy Expect(k8sClient.Delete(ctx, virtualService)).Should(Succeed()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -304,7 +310,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { var envoyfilters istiov1a3.EnvoyFilterList Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -320,7 +326,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err := k8sClient.Update(ctx, DefaultIstioGateway) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } name := "" @@ -335,7 +341,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { Expect(k8sClient.Delete(ctx, DefaultIstioGateway)).Should(Succeed()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -355,7 +361,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { var envoyfilters istiov1a3.EnvoyFilterList Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -369,7 +375,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { Expect(k8sClient.Delete(ctx, DefaultVirtualService)).Should(Succeed()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -428,7 +434,12 @@ var _ = Describe("HTTPFilterPolicy controller", func() { names := []string{} for _, ef := range envoyfilters.Items { Expect(ef.Namespace).To(Equal("istio-system")) - names = append(names, ef.Name) + if l, ok := ef.Labels[model.LabelCreatedBy]; !ok || l == "HTTPFilterPolicy" { + // When the output is mcp, the operation to k8s is async. + // There is a race that when this test case is running, the EnvoyFilter created + // by Consumer is still existed. + names = append(names, ef.Name) + } if ef.Name == "htnn-http-filter" { Expect(len(ef.Spec.ConfigPatches) > 0).Should(BeTrue()) } @@ -472,7 +483,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { var envoyfilters istiov1a3.EnvoyFilterList Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -522,7 +533,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { var envoyfilters istiov1a3.EnvoyFilterList Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -546,7 +557,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err := k8sClient.Update(ctx, virtualService) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -557,7 +568,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err = k8sClient.Update(ctx, virtualService) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -566,7 +577,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { // delete virtualservice referred by httpfilterpolicy Expect(k8sClient.Delete(ctx, virtualService)).Should(Succeed()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -604,7 +615,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { } var envoyfilters istiov1a3.EnvoyFilterList - if err := k8sClient.List(ctx, &envoyfilters); err == nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err == nil { for _, e := range envoyfilters.Items { pkg.DeleteK8sResource(ctx, k8sClient, e) } @@ -642,7 +653,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { var envoyfilters istiov1a3.EnvoyFilterList Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -679,7 +690,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err := k8sClient.Update(ctx, httpRoute) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -690,7 +701,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err = k8sClient.Update(ctx, httpRoute) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -699,7 +710,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { // delete httproute referred by httpfilterpolicy Expect(k8sClient.Delete(ctx, httpRoute)).Should(Succeed()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 @@ -735,7 +746,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { var envoyfilters istiov1a3.EnvoyFilterList Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 2 @@ -751,7 +762,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { err := k8sClient.Update(ctx, DefaultK8sGateway) Expect(err).NotTo(HaveOccurred()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } name := "" @@ -766,7 +777,7 @@ var _ = Describe("HTTPFilterPolicy controller", func() { Expect(k8sClient.Delete(ctx, DefaultK8sGateway)).Should(Succeed()) Eventually(func() bool { - if err := k8sClient.List(ctx, &envoyfilters); err != nil { + if err := listHFPEnvoyFilters(ctx, k8sClient, &envoyfilters); err != nil { return false } return len(envoyfilters.Items) == 1 diff --git a/controller/tests/integration/controller/serviceregistry_controller_test.go b/controller/tests/integration/controller/serviceregistry_controller_test.go index e2d0cbea..6276f649 100644 --- a/controller/tests/integration/controller/serviceregistry_controller_test.go +++ b/controller/tests/integration/controller/serviceregistry_controller_test.go @@ -16,15 +16,21 @@ package controller import ( "context" + "encoding/json" + "net/http" + "net/url" "path/filepath" + "strings" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" mosniov1 "mosn.io/htnn/controller/api/v1" + "mosn.io/htnn/controller/internal/model" "mosn.io/htnn/controller/tests/integration/helper" "mosn.io/htnn/controller/tests/pkg" ) @@ -34,6 +40,54 @@ func mustReadServiceRegistry(fn string, out *[]map[string]interface{}) { helper.MustReadInput(fn, out) } +func listServiceEntries() []*istiov1b1.ServiceEntry { + var entries istiov1b1.ServiceEntryList + Expect(k8sClient.List(ctx, &entries, client.MatchingLabels{model.LabelCreatedBy: "ServiceRegistry"})).Should(Succeed()) + return entries.Items +} + +func registerNacosInstance(nacosPort string, name string, ip string, port string, metadata map[string]any) { + nacosServerURL := "http://0.0.0.0:" + nacosPort + + params := url.Values{} + params.Set("serviceName", name) + params.Set("ip", ip) + params.Set("port", port) + + if metadata != nil { + b, err := json.Marshal(metadata) + Expect(err).To(BeNil()) + params.Set("metadata", string(b)) + } + + fullURL := nacosServerURL + "/nacos/v1/ns/instance?" + params.Encode() + + req, err := http.NewRequest("POST", fullURL, strings.NewReader("")) + Expect(err).To(BeNil()) + client := &http.Client{} + resp, err := client.Do(req) + Expect(err).To(BeNil()) + Expect(resp.StatusCode).To(Equal(200)) +} + +func deregisterNacosInstance(nacosPort string, name string, ip string, port string) { + nacosServerURL := "http://0.0.0.0:" + nacosPort + + params := url.Values{} + params.Set("serviceName", name) + params.Set("ip", ip) + params.Set("port", port) + + fullURL := nacosServerURL + "/nacos/v1/ns/instance?" + params.Encode() + + req, err := http.NewRequest("DELETE", fullURL, nil) + Expect(err).To(BeNil()) + client := &http.Client{} + resp, err := client.Do(req) + Expect(err).To(BeNil()) + Expect(resp.StatusCode).To(Equal(200)) +} + var _ = Describe("ServiceRegistry controller", func() { const ( @@ -112,6 +166,19 @@ var _ = Describe("ServiceRegistry controller", func() { Expect(cs[0].Type).To(Equal(string(mosniov1.ConditionAccepted))) Expect(cs[0].Reason).To(Equal(string(mosniov1.ReasonAccepted))) + // This part of code is a little repeated with the one in registries integration tests. + // We add this code to ensure the basic feature is working. + registerNacosInstance("8848", "test", "1.2.3.4", "8080", nil) + + var entries []*istiov1b1.ServiceEntry + Eventually(func() bool { + entries = listServiceEntries() + return len(entries) == 1 + }, timeout, interval).Should(BeTrue()) + + Expect(entries[0].Name).To(Equal("test.default-group.public.earth.nacos")) + Expect(entries[0].Spec.GetHosts()).To(Equal([]string{"test.default-group.public.earth.nacos"})) + // to invalid base := client.MergeFrom(r.DeepCopy()) r.Spec.Config.Raw = []byte(`{}`) @@ -129,6 +196,17 @@ var _ = Describe("ServiceRegistry controller", func() { } return false }, timeout, interval).Should(BeTrue()) + + deregisterNacosInstance("8848", "test", "1.2.3.4", "8080") + sr := &mosniov1.ServiceRegistry{} + sr.SetName("earth") + sr.SetNamespace("default") + Expect(k8sClient.Delete(context.Background(), sr)).Should(Succeed()) + + Eventually(func() bool { + entries = listServiceEntries() + return len(entries) == 0 + }, timeout, interval).Should(BeTrue()) }) }) }) diff --git a/controller/tests/integration/controller/suite_test.go b/controller/tests/integration/controller/suite_test.go index 502adafc..afaf43b7 100644 --- a/controller/tests/integration/controller/suite_test.go +++ b/controller/tests/integration/controller/suite_test.go @@ -44,6 +44,7 @@ import ( "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" "mosn.io/htnn/controller/internal/registry" + "mosn.io/htnn/controller/tests/integration/helper" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to @@ -55,6 +56,7 @@ var testEnv *envtest.Environment var ctx context.Context var cancel context.CancelFunc var clientset *kubernetes.Clientset +var outputSuite = helper.OutputSuite{} func ptrstr(s string) *string { return &s @@ -63,7 +65,7 @@ func ptrstr(s string) *string { func TestControllers(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Controller Suite") + RunSpecs(t, fmt.Sprintf("Controller Suite [%s]", outputSuite.Name())) } var _ = BeforeSuite(func() { @@ -129,20 +131,23 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + output := outputSuite.Get(ctx, k8sManager.GetClient()) err = (&controller.HTTPFilterPolicyReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + Output: output, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) err = (&controller.ConsumerReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), + Output: output, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) registry.InitRegistryManager(®istry.RegistryManagerOption{ - Client: k8sManager.GetClient(), + Output: output, }) err = (&controller.ServiceRegistryReconciler{ Client: k8sManager.GetClient(), diff --git a/controller/tests/integration/controller/testdata/serviceregistry/default.yml b/controller/tests/integration/controller/testdata/serviceregistry/default.yml index ec849954..e23143ac 100644 --- a/controller/tests/integration/controller/testdata/serviceregistry/default.yml +++ b/controller/tests/integration/controller/testdata/serviceregistry/default.yml @@ -6,4 +6,5 @@ spec: type: nacos config: - serverUrl: http://0.0.0.0:8848 + serverUrl: http://127.0.0.1:8848 + serviceRefreshInterval: 1s diff --git a/controller/tests/integration/helper/k8s.go b/controller/tests/integration/helper/k8s.go new file mode 100644 index 00000000..0534644d --- /dev/null +++ b/controller/tests/integration/helper/k8s.go @@ -0,0 +1,38 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !mcp_output + +package helper + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + + controlleroutput "mosn.io/htnn/controller/internal/controller/output" + "mosn.io/htnn/controller/pkg/procession" +) + +type OutputSuite struct { +} + +func (o *OutputSuite) Name() string { + return "k8s" +} + +// nolint: contextcheck +func (o *OutputSuite) Get(_ context.Context, c client.Client) procession.Output { + return controlleroutput.NewK8sOutput(c) +} diff --git a/controller/tests/integration/helper/mcp.go b/controller/tests/integration/helper/mcp.go new file mode 100644 index 00000000..43179c6f --- /dev/null +++ b/controller/tests/integration/helper/mcp.go @@ -0,0 +1,53 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build mcp_output + +package helper + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" + + controlleroutput "mosn.io/htnn/controller/internal/controller/output" + "mosn.io/htnn/controller/pkg/procession" +) + +type OutputSuite struct { +} + +func (o *OutputSuite) Name() string { + return "mcp" +} + +func (o *OutputSuite) Get(ctx context.Context, c client.Client) procession.Output { + output, err := controlleroutput.NewMcpOutput(ctx) + Expect(err).ToNot(HaveOccurred()) + + go func() { + defer GinkgoRecover() + + mc := NewMcpClient(c) + defer mc.Close() + // add a little sleep to simulate a client starts after server gets new conf + time.Sleep(5 * time.Second) + mc.Init() + mc.Handle() + }() + return output +} diff --git a/controller/tests/integration/helper/mcp_client.go b/controller/tests/integration/helper/mcp_client.go new file mode 100644 index 00000000..8b89ca82 --- /dev/null +++ b/controller/tests/integration/helper/mcp_client.go @@ -0,0 +1,221 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "context" + "errors" + "io" + "strings" + "sync" + "time" + + "github.com/avast/retry-go" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + . "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/anypb" + mcpapi "istio.io/api/mcp/v1alpha1" + istioapi "istio.io/api/networking/v1beta1" + istiov1a3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + istiov1b1 "istio.io/client-go/pkg/apis/networking/v1beta1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + "mosn.io/htnn/controller/internal/config" + "mosn.io/htnn/controller/internal/controller/output" + "mosn.io/htnn/controller/internal/model" + "mosn.io/htnn/controller/pkg/procession" +) + +type mcpClient struct { + // Stream is the GRPC connection stream, allowing direct GRPC send operations. + // Set after Dial is called. + stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient + // xds client used to create a stream + client discovery.AggregatedDiscoveryServiceClient + conn *grpc.ClientConn + + lock sync.Mutex + + k8sClient client.Client + // To simulate k8s output in the existing tests, the simplest way is to use the k8s output directly + output procession.Output +} + +func NewMcpClient(cli client.Client) *mcpClient { + c := &mcpClient{ + k8sClient: cli, + output: output.NewK8sOutput(cli), + } + return c +} + +func (c *mcpClient) dial() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + conn, err := grpc.DialContext(ctx, config.McpServerListenAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), // ensure the connection is established + ) + if err != nil { + return err + } + c.conn = conn + return nil +} + +func (c *mcpClient) Init() { + c.lock.Lock() + defer c.lock.Unlock() + + for i := 0; i < 10; i++ { + err := c.dial() + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + c.client = discovery.NewAggregatedDiscoveryServiceClient(c.conn) + var err error + c.stream, err = c.client.StreamAggregatedResources(context.Background()) + Expect(err).NotTo(HaveOccurred()) + + // For now we don't care about the details in DiscoveryRequest + req := &discovery.DiscoveryRequest{} + err = c.stream.Send(req) + Expect(err).NotTo(HaveOccurred()) +} + +const ( + TypeUrlEnvoyFilter = "networking.istio.io/v1alpha3/EnvoyFilter" + TypeUrlServiceEntry = "networking.istio.io/v1beta1/ServiceEntry" +) + +func (c *mcpClient) Handle() { + for { + var err error + msg, err := c.stream.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + Expect(err).NotTo(HaveOccurred()) + } + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + switch msg.TypeUrl { + case TypeUrlEnvoyFilter: + efs := map[string]*istiov1a3.EnvoyFilter{} + for _, resource := range msg.Resources { + ef := c.convertAnyToEnvoyFilter(resource) + efs[ef.Name] = ef + } + if _, ok := efs[model.ConsumerEnvoyFilterName]; ok { + c.writeEnvoyFiltersWithRetry(ctx, false, efs) + delete(efs, model.ConsumerEnvoyFilterName) + } else { + // all EnvoyFilters don't contain consumer EnvoyFilter, remove it from k8s + var ef istiov1a3.EnvoyFilter + ns := config.RootNamespace() + err := c.k8sClient.Get(ctx, types.NamespacedName{Name: model.ConsumerEnvoyFilterName, Namespace: ns}, &ef) + if err == nil { + err = c.k8sClient.Delete(ctx, &ef) + Expect(err).NotTo(HaveOccurred()) + } + } + // handle EnvoyFilters except the one from consumer + c.writeEnvoyFiltersWithRetry(ctx, true, efs) + case TypeUrlServiceEntry: + ses := map[string]*istioapi.ServiceEntry{} + for _, resource := range msg.Resources { + se := c.convertAnyToServiceEntry(resource) + ses[se.Name] = &se.Spec + } + c.output.FromServiceRegistry(ctx, ses) + default: + Expect(false).To(BeTrue(), "unknown type url: %s", msg.TypeUrl) + } + } +} + +func (c *mcpClient) convertAnyToEnvoyFilter(res *anypb.Any) *istiov1a3.EnvoyFilter { + mcpRes := &mcpapi.Resource{} + err := res.UnmarshalTo(mcpRes) + Expect(err).NotTo(HaveOccurred()) + + ef := &istiov1a3.EnvoyFilter{} + ss := strings.Split(mcpRes.Metadata.Name, "/") + ef.SetNamespace(ss[0]) + ef.SetName(ss[1]) + ef.SetAnnotations(mcpRes.Metadata.Annotations) + ef.SetLabels(mcpRes.Metadata.Labels) + err = mcpRes.Body.UnmarshalTo(&ef.Spec) + Expect(err).NotTo(HaveOccurred()) + return ef +} + +func (c *mcpClient) convertAnyToServiceEntry(res *anypb.Any) *istiov1b1.ServiceEntry { + mcpRes := &mcpapi.Resource{} + err := res.UnmarshalTo(mcpRes) + Expect(err).NotTo(HaveOccurred()) + + se := &istiov1b1.ServiceEntry{} + ss := strings.Split(mcpRes.Metadata.Name, "/") + se.SetNamespace(ss[0]) + se.SetName(ss[1]) + err = mcpRes.Body.UnmarshalTo(&se.Spec) + Expect(err).NotTo(HaveOccurred()) + return se +} + +func (c *mcpClient) writeEnvoyFiltersWithRetry(ctx context.Context, fromHTTPFilterPolicy bool, filters map[string]*istiov1a3.EnvoyFilter) { + err := retry.Do( + func() error { + // Here we simulate the reconcile when the write failed in k8s output + // We deepcopy the filters as they are regenerated in the reconcile process + efs := make(map[string]*istiov1a3.EnvoyFilter, len(filters)) + for name, ef := range filters { + efs[name] = ef.DeepCopy() + } + if fromHTTPFilterPolicy { + return c.output.FromHTTPFilterPolicy(ctx, efs) + } + return c.output.FromConsumer(ctx, efs[model.ConsumerEnvoyFilterName]) + }, + retry.RetryIf(func(err error) bool { + return true + }), + retry.Attempts(3), + // backoff delay + retry.Delay(500*time.Millisecond), + ) + Expect(err).NotTo(HaveOccurred()) +} + +func (c *mcpClient) Close() { + c.lock.Lock() + defer c.lock.Unlock() + + if c.conn == nil { + return + } + c.conn.Close() +} diff --git a/controller/tests/integration/registries/suite_test.go b/controller/tests/integration/registries/suite_test.go index 95aa8203..6df42763 100644 --- a/controller/tests/integration/registries/suite_test.go +++ b/controller/tests/integration/registries/suite_test.go @@ -43,6 +43,7 @@ import ( mosniov1 "mosn.io/htnn/controller/api/v1" "mosn.io/htnn/controller/internal/config" "mosn.io/htnn/controller/internal/controller" + controlleroutput "mosn.io/htnn/controller/internal/controller/output" "mosn.io/htnn/controller/internal/registry" ) @@ -125,8 +126,9 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + output := controlleroutput.NewK8sOutput(k8sManager.GetClient()) registry.InitRegistryManager(®istry.RegistryManagerOption{ - Client: k8sManager.GetClient(), + Output: output, }) err = (&controller.ServiceRegistryReconciler{ Client: k8sManager.GetClient(),