From 946666e7b520ffe7db36dc1bc80773d77e0ff0d4 Mon Sep 17 00:00:00 2001 From: Irina Mihai Date: Fri, 21 Jun 2024 13:00:59 -0400 Subject: [PATCH] MGMT-16066: Resource Subscription Server Description: - update the alarm subscription handler to a generic subscription handler that can also be used for resource subscriptions - add a new command for the resource subscription server - update the alarm and resource subscription servers to also set the subscription type --- .vscode/launch.json | 13 + README.md | 75 +++- .../server/start_alarm_subscription_server.go | 5 +- ...structure_inventory_subscription_server.go | 335 ++++++++++++++++++ internal/cmd/start_cmd.go | 1 + internal/persiststorage/persiststorage.go | 12 +- .../persiststorage_configmap.go | 38 +- internal/service/constants.go | 13 + ...ion_handler.go => subscription_handler.go} | 307 +++++++++------- ...r_test.go => subscription_handler_test.go} | 81 +++-- 10 files changed, 696 insertions(+), 184 deletions(-) create mode 100644 internal/cmd/server/start_infrastructure_inventory_subscription_server.go create mode 100644 internal/service/constants.go rename internal/service/{alarm_subscription_handler.go => subscription_handler.go} (50%) rename internal/service/{alarm_subscription_handler_test.go => subscription_handler_test.go} (75%) diff --git a/.vscode/launch.json b/.vscode/launch.json index e3f573282..05e2fc2e0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -85,6 +85,19 @@ "--resource-server-url=${env:RESOURCE_SERVER_URL}", ] }, + { + "name": "start infrastructure-inventory-subscription-server", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}", + "args": [ + "start", + "infrastructure-inventory-subscription-server", + "--log-level=debug", + "--cloud-id=6575154c-72fc-4ed8-9a87-a81885ab38bb" + ] + }, { "name": "test", "type": "go", diff --git a/README.md b/README.md index 28b6b194e..9f08063cd 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,79 @@ $ curl -s http://localhost:8002/o2ims-infrastructureInventory/v1/resourcePools/{ /resources | jq ``` +#### Infrastructure Inventory Subscription Server (Resource Server) + +The infrastructure inventory subscription server exposes endpoints for creating, retrieving +and deleting resource subscriptions. + +***Notes:*** +- No URL or token are required +- A connection to an ACM hub cluster is required + +Start the infrastructure inventory subscription server with a command like this: + +``` +$ ./oran-o2ims start infrastructure-inventory-subscription-server \ +--log-level=debug \ +--log-file=stdout \ +--log-field="server=resource-subscriptions" \ +--api-listener-address=localhost:8004 \ +--metrics-listener-address=localhost:8008 \ +--cloud-id=123 +``` + +For more information about other command line flags use the `--help` command: + +``` +$ ./oran-o2ims start infrastructure-inventory-subscription-server --help +``` + +##### Run and Debug + +Inside _VS Code_ use the _Run and Debug_ option with the `start +infrastructure-inventory-subscription-server` [configuration](.vscode/launch.json). + +##### Request Examples + +###### GET Infrastructure Inventory Subscription List + +To get a list of resource subscriptions: +``` +$ curl -s http://localhost:8004/o2ims-infrastructureInventory/v1/subscriptions | jq +``` + +###### GET Infrastructure Inventory Subscription Information + +To get all the information about an existing resource subscription: +``` +$ curl -s http://localhost:8004/o2ims-infrastructureInventory/v1/subscriptions/ | jq +``` + +###### POST a new Infrastructure Inventory Subscription Information + +To add a new resource subscription: +``` +$ curl -s -X POST \ +--header "Content-Type: application/json" \ +-d @infra-sub.json http://127.0.0.1:8004/o2ims-infrastructureInventory/v1/subscriptions | jq +``` +Where the content of `infra-sub.json` is as follows: +``` +{ + "consumerSubscriptionId": "69253c4b-8398-4602-855d-783865f5f25c", + "filter": "(eq,extensions/country,US);", + "callback": "https://128.224.115.15:1081/smo/v1/o2ims_inventory_observer" +} +``` + +###### DELETE an Infrastructure Inventory Subscription + +To delete an existing resource subscription: +``` +$ curl -s -X DELETE \ +http://localhost:8000/o2ims-infrastructureInventory/v1/subscriptions/ | jq +``` + #### Alarm server The alarm server exposes endpoints for retrieving alarms (AlarmEventRecord objects). @@ -282,4 +355,4 @@ $ curl -s http://localhost:8001/o2ims-infrastructureMonitoring/v1/alarmSubscript Above example will get a list of existing alarm subscriptions Inside _VS Code_ use the _Run and Debug_ option with the `start -alarm-subscription-server` [configuration](.vscode/launch.json). \ No newline at end of file +alarm-subscription-server` [configuration](.vscode/launch.json). diff --git a/internal/cmd/server/start_alarm_subscription_server.go b/internal/cmd/server/start_alarm_subscription_server.go index efa4d5271..71fdb9649 100644 --- a/internal/cmd/server/start_alarm_subscription_server.go +++ b/internal/cmd/server/start_alarm_subscription_server.go @@ -220,12 +220,13 @@ func (c *AlarmSubscriptionServerCommand) run(cmd *cobra.Command, argv []string) } // Create the handler: - handler, err := service.NewAlarmSubscriptionHandler(). + handler, err := service.NewSubscriptionHandler(). SetLogger(logger). SetLoggingWrapper(loggingWrapper). SetCloudID(cloudID). SetExtensions(extensions...). SetKubeClient(kubeClient). + SetSubscriptionType(service.SubscriptionTypeAlarm). Build(ctx) if err != nil { @@ -269,7 +270,7 @@ func (c *AlarmSubscriptionServerCommand) run(cmd *cobra.Command, argv []string) if err != nil { logger.ErrorContext( ctx, - "Failed to to create API listener", + "Failed to create API listener", slog.String("error", err.Error()), ) return exit.Error(1) diff --git a/internal/cmd/server/start_infrastructure_inventory_subscription_server.go b/internal/cmd/server/start_infrastructure_inventory_subscription_server.go new file mode 100644 index 000000000..fe8b459d8 --- /dev/null +++ b/internal/cmd/server/start_infrastructure_inventory_subscription_server.go @@ -0,0 +1,335 @@ +/* +Copyright 2024 Red Hat Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in +compliance with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is +distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing permissions and limitations under the +License. +*/ + +package server + +import ( + "errors" + "log/slog" + "net/http" + + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" + + "github.com/openshift-kni/oran-o2ims/internal" + "github.com/openshift-kni/oran-o2ims/internal/authentication" + "github.com/openshift-kni/oran-o2ims/internal/authorization" + "github.com/openshift-kni/oran-o2ims/internal/exit" + "github.com/openshift-kni/oran-o2ims/internal/k8s" + "github.com/openshift-kni/oran-o2ims/internal/logging" + "github.com/openshift-kni/oran-o2ims/internal/metrics" + "github.com/openshift-kni/oran-o2ims/internal/network" + "github.com/openshift-kni/oran-o2ims/internal/service" +) + +// InfrastructureInventorySubscriptionServer creates and returns the +// `start infrastructure-inventory-subscription-server` command. +func InfrastructureInventorySubscriptionServer() *cobra.Command { + c := NewInfrastructureInventorySubscriptionServer() + result := &cobra.Command{ + Use: "infrastructure-inventory-subscription-server", + Short: "Starts the infrastructure inventory subscription server", + Args: cobra.NoArgs, + RunE: c.run, + } + flags := result.Flags() + + authentication.AddFlags(flags) + authorization.AddFlags(flags) + + network.AddListenerFlags(flags, network.APIListener, network.APIAddress) + network.AddListenerFlags(flags, network.MetricsListener, network.MetricsAddress) + _ = flags.String( + cloudIDFlagName, + "", + "O-Cloud identifier.", + ) + _ = flags.StringArray( + extensionsFlagName, + []string{}, + "Extension to add to infrastructure inventory subscriptions.", + ) + + return result +} + +// InfrastructureInventorySubscriptionServerCommand contains the data and logic needed to run the +// `start infrastructure-inventory-subscription-server` command. +type InfrastructureInventorySubscriptionServerCommand struct { +} + +// NewInfrastructureInventorySubscriptionServer creates a new runner that knows how to execute the +// `start infrastructure-inventory-subscription-server` command. +func NewInfrastructureInventorySubscriptionServer() *InfrastructureInventorySubscriptionServerCommand { + return &InfrastructureInventorySubscriptionServerCommand{} +} + +// run executes the `start infrastructure-inventory-subscription-server` command. +func (c *InfrastructureInventorySubscriptionServerCommand) run(cmd *cobra.Command, argv []string) error { + // Get the context: + ctx := cmd.Context() + + // Get the dependencies from the context: + logger := internal.LoggerFromContext(ctx) + + // Get the flags: + flags := cmd.Flags() + + // Create the exit handler: + exitHandler, err := exit.NewHandler(). + SetLogger(logger). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create exit handler", + slog.String("error", err.Error()), + ) + return exit.Error(1) + } + + // Get the cloud identifier: + cloudID, err := flags.GetString(cloudIDFlagName) + if err != nil { + logger.ErrorContext( + ctx, + "Failed to get cloud identifier flag", + "flag", cloudIDFlagName, + "error", err.Error(), + ) + return exit.Error(1) + } + if cloudID == "" { + logger.ErrorContext( + ctx, + "Cloud identifier is empty", + "flag", cloudIDFlagName, + ) + return exit.Error(1) + } + logger.InfoContext( + ctx, + "Cloud identifier", + "value", cloudID, + ) + + // Get the extensions details: + extensions, err := flags.GetStringArray(extensionsFlagName) + if err != nil { + logger.ErrorContext( + ctx, + "Failed to extension flag", + "flag", extensionsFlagName, + "error", err.Error(), + ) + return exit.Error(1) + } + logger.InfoContext( + ctx, + "infrastructure inventory subscription extensions details", + slog.Any("extensions", extensions), + ) + + // Create the logging wrapper: + loggingWrapper, err := logging.NewTransportWrapper(). + SetLogger(logger). + SetFlags(flags). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create transport wrapper", + "error", err.Error(), + ) + return exit.Error(1) + } + + // Create the authentication and authorization wrappers: + authenticationWrapper, err := authentication.NewHandlerWrapper(). + SetLogger(logger). + SetFlags(flags). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create authentication wrapper", + slog.String("error", err.Error()), + ) + return exit.Error(1) + } + authorizationWrapper, err := authorization.NewHandlerWrapper(). + SetLogger(logger). + SetFlags(flags). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create authorization wrapper", + slog.String("error", err.Error()), + ) + return exit.Error(1) + } + + // Create the metrics wrapper: + metricsWrapper, err := metrics.NewHandlerWrapper(). + AddPaths( + "/o2ims-infrastructureInventory/-/subscriptions/-", + ). + SetSubsystem("inbound"). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create metrics wrapper", + slog.String("error", err.Error()), + ) + return exit.Error(1) + } + + // Create the router: + router := mux.NewRouter() + router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + service.SendError(w, http.StatusNotFound, "Not found") + }) + router.MethodNotAllowedHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + service.SendError(w, http.StatusMethodNotAllowed, "Method not allowed") + }) + router.Use(metricsWrapper, authenticationWrapper, authorizationWrapper) + + // Get the K8S client (from the environment first): + kubeClient, err := k8s.NewClient().SetLogger(logger).SetLoggingWrapper(loggingWrapper).Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create kubeClient", + "error", err, + ) + return exit.Error(1) + } + + // Create the handler: + handler, err := service.NewSubscriptionHandler(). + SetLogger(logger). + SetLoggingWrapper(loggingWrapper). + SetCloudID(cloudID). + SetExtensions(extensions...). + SetKubeClient(kubeClient). + SetSubscriptionType(service.SubscriptionTypeInfrastructureInventory). + Build(ctx) + + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create handler", + slog.String("error", err.Error()), + ) + return exit.Error(1) + } + + // Create the routes: + adapter, err := service.NewAdapter(). + SetLogger(logger). + SetPathVariables("subscriptionId"). + SetHandler(handler). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create adapter", + "error", err, + ) + return exit.Error(1) + } + router.Handle( + "/o2ims-infrastructureInventory/{version}/subscriptions", + adapter, + ).Methods(http.MethodGet, http.MethodPost) + router.Handle( + "/o2ims-infrastructureInventory/{version}/subscriptions/{subscriptionId}", + adapter, + ).Methods(http.MethodGet, http.MethodDelete) + + // Start the API server: + apiListener, err := network.NewListener(). + SetLogger(logger). + SetFlags(flags, network.APIListener). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create API listener", + slog.String("error", err.Error()), + ) + return exit.Error(1) + } + logger.InfoContext( + ctx, + "API server listening", + slog.String("address", apiListener.Addr().String()), + ) + apiServer := &http.Server{ + Addr: apiListener.Addr().String(), + Handler: router, + } + exitHandler.AddServer(apiServer) + go func() { + err = apiServer.Serve(apiListener) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.ErrorContext( + ctx, + "API server finished with error", + slog.String("error", err.Error()), + ) + } + }() + + // Start the metrics server: + metricsListener, err := network.NewListener(). + SetLogger(logger). + SetFlags(flags, network.MetricsListener). + Build() + if err != nil { + logger.ErrorContext( + ctx, + "Failed to create metrics listener", + slog.String("error", err.Error()), + ) + return exit.Error(1) + } + logger.InfoContext( + ctx, + "Metrics server listening", + slog.String("address", metricsListener.Addr().String()), + ) + metricsHandler := promhttp.Handler() + metricsServer := &http.Server{ + Addr: metricsListener.Addr().String(), + Handler: metricsHandler, + } + exitHandler.AddServer(metricsServer) + go func() { + err = metricsServer.Serve(metricsListener) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.ErrorContext( + ctx, + "Metrics server finished with error", + slog.String("error", err.Error()), + ) + } + }() + + // Wait for exit signals: + return exitHandler.Wait(ctx) +} diff --git a/internal/cmd/start_cmd.go b/internal/cmd/start_cmd.go index c0721e186..d10362d21 100644 --- a/internal/cmd/start_cmd.go +++ b/internal/cmd/start_cmd.go @@ -33,6 +33,7 @@ func Start() *cobra.Command { result.AddCommand(server.ResourceServer()) result.AddCommand(server.AlarmServer()) result.AddCommand(server.AlarmSubscriptionServer()) + result.AddCommand(server.InfrastructureInventorySubscriptionServer()) result.AddCommand(operator.ControllerManager()) return result } diff --git a/internal/persiststorage/persiststorage.go b/internal/persiststorage/persiststorage.go index b29504e7b..01211facf 100644 --- a/internal/persiststorage/persiststorage.go +++ b/internal/persiststorage/persiststorage.go @@ -11,7 +11,7 @@ import ( var ErrNotFound = errors.New("not found") // interface for persistent storage -type StorageOperations interface { +type Storage interface { //notification from db to application about db entry changes //currently assume the notification is granular to indivial entry ReadEntry(ctx context.Context, key string) (value string, err error) @@ -21,18 +21,18 @@ type StorageOperations interface { ProcessChanges(ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) } -func Add(so StorageOperations, ctx context.Context, key string, value string) (err error) { +func Add(so Storage, ctx context.Context, key string, value string) (err error) { return so.AddEntry(ctx, key, value) } -func Get(so StorageOperations, ctx context.Context, key string) (value string, err error) { +func Get(so Storage, ctx context.Context, key string) (value string, err error) { return so.ReadEntry(ctx, key) } -func GetAll(so StorageOperations, ctx context.Context) (result map[string]data.Object, err error) { +func GetAll(so Storage, ctx context.Context) (result map[string]data.Object, err error) { return so.ReadAllEntries(ctx) } -func Delete(so StorageOperations, ctx context.Context, key string) (err error) { +func Delete(so Storage, ctx context.Context, key string) (err error) { return so.DeleteEntry(ctx, key) } -func ProcessChanges(so StorageOperations, ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) { +func ProcessChanges(so Storage, ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) { return so.ProcessChanges(ctx, dataMap, lock) } diff --git a/internal/persiststorage/persiststorage_configmap.go b/internal/persiststorage/persiststorage_configmap.go index cd4b395bd..928e74441 100644 --- a/internal/persiststorage/persiststorage_configmap.go +++ b/internal/persiststorage/persiststorage_configmap.go @@ -15,8 +15,8 @@ import ( ) type KubeConfigMapStore struct { - nameSpace string - name string + Name string + namespace string fieldOwner string jsonAPI *jsoniter.API client *k8s.Client @@ -28,16 +28,16 @@ func NewKubeConfigMapStore() *KubeConfigMapStore { func (b *KubeConfigMapStore) SetNameSpace( ns string) *KubeConfigMapStore { - b.nameSpace = ns + b.namespace = ns return b } func (b *KubeConfigMapStore) SetName( name string) *KubeConfigMapStore { - b.name = name + b.Name = name return b } -func (b *KubeConfigMapStore) SetFieldOwnder( +func (b *KubeConfigMapStore) SetFieldOwner( owner string) *KubeConfigMapStore { b.fieldOwner = owner return b @@ -61,8 +61,8 @@ func (s *KubeConfigMapStore) AddEntry(ctx context.Context, entryKey string, valu configmap := &corev1.ConfigMap{} key := clnt.ObjectKey{ - Namespace: s.nameSpace, - Name: s.name, + Namespace: s.namespace, + Name: s.Name, } err = (*s.client).Get(ctx, key, configmap) @@ -84,8 +84,8 @@ func (s *KubeConfigMapStore) AddEntry(ctx context.Context, entryKey string, valu APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: s.nameSpace, - Name: s.name, + Namespace: s.namespace, + Name: s.Name, }, Data: savedData, } @@ -100,8 +100,8 @@ func (s *KubeConfigMapStore) DeleteEntry(ctx context.Context, entryKey string) ( configmap := &corev1.ConfigMap{} key := clnt.ObjectKey{ - Namespace: s.nameSpace, - Name: s.name, + Namespace: s.namespace, + Name: s.Name, } err = (*s.client).Get(ctx, key, configmap) @@ -129,8 +129,8 @@ func (s *KubeConfigMapStore) DeleteEntry(ctx context.Context, entryKey string) ( APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: s.nameSpace, - Name: s.name, + Namespace: s.namespace, + Name: s.Name, }, Data: configmap.Data, } @@ -145,8 +145,8 @@ func (s *KubeConfigMapStore) ReadEntry(ctx context.Context, entryKey string) (va configmap := &corev1.ConfigMap{} key := clnt.ObjectKey{ - Namespace: s.nameSpace, - Name: s.name, + Namespace: s.namespace, + Name: s.Name, } err = (*s.client).Get(ctx, key, configmap) @@ -176,8 +176,8 @@ func (s *KubeConfigMapStore) ReadAllEntries(ctx context.Context) (result map[str configmap := &corev1.ConfigMap{} key := clnt.ObjectKey{ - Namespace: s.nameSpace, - Name: s.name, + Namespace: s.namespace, + Name: s.Name, } err = (*s.client).Get(ctx, key, configmap) @@ -206,8 +206,8 @@ func (s *KubeConfigMapStore) ReadAllEntries(ctx context.Context) (result map[str func (s *KubeConfigMapStore) ProcessChanges(ctx context.Context, dataMap **map[string]data.Object, lock *sync.Mutex) (err error) { raw_opt := metav1.SingleObject(metav1.ObjectMeta{ - Namespace: s.nameSpace, - Name: s.name, + Namespace: s.namespace, + Name: s.Name, }) opt := clnt.ListOptions{} opt.Raw = &raw_opt diff --git a/internal/service/constants.go b/internal/service/constants.go new file mode 100644 index 000000000..ca4a62a86 --- /dev/null +++ b/internal/service/constants.go @@ -0,0 +1,13 @@ +package service + +const ( + TestNamespace = "orantest" + AlarmConfigMapName = "oran-alarms-sub" + InfraInventoryConfigMapName = "oran-infra-inventory-sub" + FieldOwner = "oran-o2ims" +) + +const ( + SubscriptionTypeAlarm = "ALARM" + SubscriptionTypeInfrastructureInventory = "INFRA-INV" +) diff --git a/internal/service/alarm_subscription_handler.go b/internal/service/subscription_handler.go similarity index 50% rename from internal/service/alarm_subscription_handler.go rename to internal/service/subscription_handler.go index 2cf6ead21..850ffa221 100644 --- a/internal/service/alarm_subscription_handler.go +++ b/internal/service/subscription_handler.go @@ -17,6 +17,7 @@ package service import ( "context" "errors" + "fmt" "log/slog" "net/http" "slices" @@ -24,94 +25,100 @@ import ( "github.com/google/uuid" jsoniter "github.com/json-iterator/go" - "github.com/openshift-kni/oran-o2ims/internal/data" "github.com/openshift-kni/oran-o2ims/internal/jq" "github.com/openshift-kni/oran-o2ims/internal/k8s" - "github.com/openshift-kni/oran-o2ims/internal/search" - "github.com/openshift-kni/oran-o2ims/internal/persiststorage" + "github.com/openshift-kni/oran-o2ims/internal/search" ) -const ( - TestNamespace = "orantest" - TestConfigmapName = "orantestconfigmapalarmsub" - FieldOwner = "oran-o2ims" -) - -// alarmSubscriptionHandlerBuilder contains the data and logic needed to create a new deployment -// manager collection handler. Don't create instances of this type directly, use the -// NewAlarmSubscriptionHandler function instead. -type alarmSubscriptionHandlerBuilder struct { - logger *slog.Logger - loggingWrapper func(http.RoundTripper) http.RoundTripper - cloudID string - extensions []string - kubeClient *k8s.Client +// SubscriptionHandlerBuilder contains the data and logic needed to create a new +// subscription handler. Don't create instances of this type directly, use the +// NewSubscriptionHandler function instead. +type SubscriptionHandlerBuilder struct { + logger *slog.Logger + loggingWrapper func(http.RoundTripper) http.RoundTripper + cloudID string + extensions []string + kubeClient *k8s.Client + subscriptionType string } -// alarmSubscriptionHander knows how to respond to requests to list deployment managers. -// Don't create instances of this type directly, use the NewAlarmSubscriptionHandler function -// instead. -type alarmSubscriptionHandler struct { - logger *slog.Logger - loggingWrapper func(http.RoundTripper) http.RoundTripper - cloudID string - extensions []string - kubeClient *k8s.Client - jsonAPI jsoniter.API - selectorEvaluator *search.SelectorEvaluator - jqTool *jq.Tool - subscritionMapMemoryLock *sync.Mutex - subscriptionMap *map[string]data.Object - persistStore *persiststorage.KubeConfigMapStore +// SubscriptionHandler knows how to respond to requests to list subscriptions. +// Don't create instances of this type directly, use the NewSubscriptionHandler +// function instead. +type SubscriptionHandler struct { + logger *slog.Logger + loggingWrapper func(http.RoundTripper) http.RoundTripper + cloudID string + extensions []string + kubeClient *k8s.Client + jsonAPI jsoniter.API + selectorEvaluator *search.SelectorEvaluator + jqTool *jq.Tool + subscriptionMapLock *sync.Mutex + subscriptionMap *map[string]data.Object + persistStore *persiststorage.KubeConfigMapStore } -// NewAlarmSubscriptionHandler creates a builder that can then be used to configure and create a +// NewSubscriptionHandler creates a builder that can then be used to configure and create a // handler for the collection of deployment managers. -func NewAlarmSubscriptionHandler() *alarmSubscriptionHandlerBuilder { - return &alarmSubscriptionHandlerBuilder{} +func NewSubscriptionHandler() *SubscriptionHandlerBuilder { + return &SubscriptionHandlerBuilder{} +} + +func GetSubcriptionId() (subId string) { + subId = uuid.New().String() + return } // SetLogger sets the logger that the handler will use to write to the log. This is mandatory. -func (b *alarmSubscriptionHandlerBuilder) SetLogger( - value *slog.Logger) *alarmSubscriptionHandlerBuilder { +func (b *SubscriptionHandlerBuilder) SetLogger( + value *slog.Logger) *SubscriptionHandlerBuilder { b.logger = value return b } // SetLoggingWrapper sets the wrapper that will be used to configure logging for the HTTP clients // used to connect to other servers, including the backend server. This is optional. -func (b *alarmSubscriptionHandlerBuilder) SetLoggingWrapper( - value func(http.RoundTripper) http.RoundTripper) *alarmSubscriptionHandlerBuilder { +func (b *SubscriptionHandlerBuilder) SetLoggingWrapper( + value func(http.RoundTripper) http.RoundTripper) *SubscriptionHandlerBuilder { b.loggingWrapper = value return b } // SetCloudID sets the identifier of the O-Cloud of this handler. This is mandatory. -func (b *alarmSubscriptionHandlerBuilder) SetCloudID( - value string) *alarmSubscriptionHandlerBuilder { +func (b *SubscriptionHandlerBuilder) SetCloudID( + value string) *SubscriptionHandlerBuilder { b.cloudID = value return b } // SetExtensions sets the fields that will be added to the extensions. -func (b *alarmSubscriptionHandlerBuilder) SetExtensions( - values ...string) *alarmSubscriptionHandlerBuilder { +func (b *SubscriptionHandlerBuilder) SetExtensions( + values ...string) *SubscriptionHandlerBuilder { b.extensions = values return b } -// SetExtensions sets the fields that will be added to the extensions. -func (b *alarmSubscriptionHandlerBuilder) SetKubeClient( - kubeClient *k8s.Client) *alarmSubscriptionHandlerBuilder { +// SetKubeClient sets the K8S client. +func (b *SubscriptionHandlerBuilder) SetKubeClient( + kubeClient *k8s.Client) *SubscriptionHandlerBuilder { b.kubeClient = kubeClient return b } +// SetSubscriptionType sets the purpose of the subscription. +// So far alarm and infrastructure-inventory as supported. +func (b *SubscriptionHandlerBuilder) SetSubscriptionType( + subscriptionType string) *SubscriptionHandlerBuilder { + b.subscriptionType = subscriptionType + return b +} + // Build uses the data stored in the builder to create anad configure a new handler. -func (b *alarmSubscriptionHandlerBuilder) Build(ctx context.Context) ( - result *alarmSubscriptionHandler, err error) { +func (b *SubscriptionHandlerBuilder) Build(ctx context.Context) ( + result *SubscriptionHandler, err error) { // Check parameters: if b.logger == nil { err = errors.New("logger is mandatory") @@ -127,6 +134,17 @@ func (b *alarmSubscriptionHandlerBuilder) Build(ctx context.Context) ( return } + if b.subscriptionType != SubscriptionTypeAlarm && + b.subscriptionType != SubscriptionTypeInfrastructureInventory { + err = fmt.Errorf( + fmt.Sprintf( + "subscription type can only be %s or %s", + SubscriptionTypeAlarm, SubscriptionTypeInfrastructureInventory, + ), + ) + return + } + // Prepare the JSON iterator API: jsonConfig := jsoniter.Config{ IndentionStep: 2, @@ -164,45 +182,52 @@ func (b *alarmSubscriptionHandlerBuilder) Build(ctx context.Context) ( } } - // create persist storeage option + configMapName := "" + if b.subscriptionType == SubscriptionTypeAlarm { + configMapName = AlarmConfigMapName + } else if b.subscriptionType == SubscriptionTypeInfrastructureInventory { + configMapName = InfraInventoryConfigMapName + } + + // Setup persistent storage: persistStore := persiststorage.NewKubeConfigMapStore(). SetNameSpace(TestNamespace). - SetName(TestConfigmapName). - SetFieldOwnder(FieldOwner). + SetName(configMapName). + SetFieldOwner(FieldOwner). SetJsonAPI(&jsonAPI). SetClient(b.kubeClient) // Create and populate the object: - result = &alarmSubscriptionHandler{ - logger: b.logger, - loggingWrapper: b.loggingWrapper, - cloudID: b.cloudID, - kubeClient: b.kubeClient, - extensions: slices.Clone(b.extensions), - selectorEvaluator: selectorEvaluator, - jsonAPI: jsonAPI, - jqTool: jqTool, - subscritionMapMemoryLock: &sync.Mutex{}, - subscriptionMap: &map[string]data.Object{}, - persistStore: persistStore, + result = &SubscriptionHandler{ + logger: b.logger, + loggingWrapper: b.loggingWrapper, + cloudID: b.cloudID, + kubeClient: b.kubeClient, + extensions: slices.Clone(b.extensions), + selectorEvaluator: selectorEvaluator, + jsonAPI: jsonAPI, + jqTool: jqTool, + subscriptionMapLock: &sync.Mutex{}, + subscriptionMap: &map[string]data.Object{}, + persistStore: persistStore, } b.logger.Debug( - "alarmSubscriptionHandler build:", + "SubscriptionHandler build:", "CloudID", b.cloudID, ) - err = result.recoveryFromPersistStore(ctx) + err = result.getFromPersistentStorage(ctx) if err != nil { b.logger.Error( - "alarmSubscriptionHandler failed to recovery from persistStore ", err, + "SubscriptionHandler failed to recovery from persistStore ", err, ) } err = result.watchPersistStore(ctx) if err != nil { b.logger.Error( - "alarmSubscriptionHandler failed to watch persist store changes ", err, + "SubscriptionHandler failed to watch persist store changes ", err, ) } @@ -210,7 +235,7 @@ func (b *alarmSubscriptionHandlerBuilder) Build(ctx context.Context) ( } // List is the implementation of the collection handler interface. -func (h *alarmSubscriptionHandler) List(ctx context.Context, +func (h *SubscriptionHandler) List(ctx context.Context, request *ListRequest) (response *ListResponse, err error) { // Create the stream that will fetch the items: var items data.Stream @@ -243,12 +268,12 @@ func (h *alarmSubscriptionHandler) List(ctx context.Context, } // Get is the implementation of the object handler interface. -func (h *alarmSubscriptionHandler) Get(ctx context.Context, +func (h *SubscriptionHandler) Get(ctx context.Context, request *GetRequest) (response *GetResponse, err error) { h.logger.DebugContext( ctx, - "alarmSubscriptionHandler Get:", + "SubscriptionHandler Get:", ) item, err := h.fetchItem(ctx, request.Variables[0]) @@ -260,18 +285,18 @@ func (h *alarmSubscriptionHandler) Get(ctx context.Context, } // Add is the implementation of the object handler ADD interface. -func (h *alarmSubscriptionHandler) Add(ctx context.Context, +func (h *SubscriptionHandler) Add(ctx context.Context, request *AddRequest) (response *AddResponse, err error) { h.logger.DebugContext( ctx, - "alarmSubscriptionHandler Add:", + "SubscriptionHandler Add:", ) id, err := h.addItem(ctx, *request) if err != nil { h.logger.Debug( - "alarmSubscriptionHandler Add:", + "SubscriptionHandler Add:", "err", err.Error(), ) return @@ -280,7 +305,7 @@ func (h *alarmSubscriptionHandler) Add(ctx context.Context, //add subscription Id in the response obj := request.Object - obj, err = h.encodeSubId(ctx, id, obj) + obj, err = h.encodeSubId(id, obj) if err != nil { return @@ -294,12 +319,12 @@ func (h *alarmSubscriptionHandler) Add(ctx context.Context, } // Delete is the implementation of the object handler delete interface. -func (h *alarmSubscriptionHandler) Delete(ctx context.Context, +func (h *SubscriptionHandler) Delete(ctx context.Context, request *DeleteRequest) (response *DeleteResponse, err error) { h.logger.DebugContext( ctx, - "alarmSubscriptionHandler delete:", + "SubscriptionHandler delete:", ) err = h.deleteItem(ctx, *request) @@ -309,39 +334,39 @@ func (h *alarmSubscriptionHandler) Delete(ctx context.Context, return } -func (h *alarmSubscriptionHandler) fetchItem(ctx context.Context, +func (h *SubscriptionHandler) fetchItem(ctx context.Context, id string) (result data.Object, err error) { - h.subscritionMapMemoryLock.Lock() - defer h.subscritionMapMemoryLock.Unlock() + h.subscriptionMapLock.Lock() + defer h.subscriptionMapLock.Unlock() obj, ok := (*h.subscriptionMap)[id] if !ok { err = ErrNotFound return } - result, _ = h.encodeSubId(ctx, id, obj) + result, _ = h.encodeSubId(id, obj) return } -func (h *alarmSubscriptionHandler) fetchItems(ctx context.Context) (result data.Stream, err error) { - h.subscritionMapMemoryLock.Lock() - defer h.subscritionMapMemoryLock.Unlock() +func (h *SubscriptionHandler) fetchItems(ctx context.Context) (result data.Stream, err error) { + h.subscriptionMapLock.Lock() + defer h.subscriptionMapLock.Unlock() ar := make([]data.Object, 0, len(*h.subscriptionMap)) for key, value := range *h.subscriptionMap { - obj, _ := h.encodeSubId(ctx, key, value) + obj, _ := h.encodeSubId(key, value) ar = append(ar, obj) } h.logger.DebugContext( ctx, - "alarmSubscriptionHandler fetchItems:", + "SubscriptionHandler fetchItems:", ) result = data.Pour(ar...) return } -func (h *alarmSubscriptionHandler) addItem( +func (h *SubscriptionHandler) addItem( ctx context.Context, input_data AddRequest) (subId string, err error) { subId = h.getSubcriptionId() @@ -352,10 +377,10 @@ func (h *alarmSubscriptionHandler) addItem( if err != nil { return } - err = h.persistStoreAddEntry(ctx, subId, string(value)) + err = persiststorage.Add(h.persistStore, ctx, subId, string(value)) if err != nil { h.logger.Debug( - "alarmSubscriptionHandler addItem:", + "SubscriptionHandler addItem:", "err", err.Error(), ) return @@ -366,10 +391,10 @@ func (h *alarmSubscriptionHandler) addItem( return } -func (h *alarmSubscriptionHandler) deleteItem( +func (h *SubscriptionHandler) deleteItem( ctx context.Context, delete_req DeleteRequest) (err error) { - err = h.persistStoreDeleteEntry(ctx, delete_req.Variables[0]) + err = persiststorage.Delete(h.persistStore, ctx, delete_req.Variables[0]) if err != nil { return } @@ -379,20 +404,20 @@ func (h *alarmSubscriptionHandler) deleteItem( return } -func (h *alarmSubscriptionHandler) mapItem(ctx context.Context, +func (h *SubscriptionHandler) mapItem(ctx context.Context, input data.Object) (output data.Object, err error) { //TBD only save related attributes in the future return input, nil } -func (h *alarmSubscriptionHandler) addToSubscriptionMap(key string, value data.Object) { - h.subscritionMapMemoryLock.Lock() - defer h.subscritionMapMemoryLock.Unlock() +func (h *SubscriptionHandler) addToSubscriptionMap(key string, value data.Object) { + h.subscriptionMapLock.Lock() + defer h.subscriptionMapLock.Unlock() (*h.subscriptionMap)[key] = value } -func (h *alarmSubscriptionHandler) deleteToSubscriptionMap(key string) { - h.subscritionMapMemoryLock.Lock() - defer h.subscritionMapMemoryLock.Unlock() +func (h *SubscriptionHandler) deleteToSubscriptionMap(key string) { + h.subscriptionMapLock.Lock() + defer h.subscriptionMapLock.Unlock() //test if the key in the map _, ok := (*h.subscriptionMap)[key] @@ -403,59 +428,71 @@ func (h *alarmSubscriptionHandler) deleteToSubscriptionMap(key string) { delete(*h.subscriptionMap, key) } -func (h *alarmSubscriptionHandler) assignSubscriptionMap(newMap map[string]data.Object) { - h.subscritionMapMemoryLock.Lock() - defer h.subscritionMapMemoryLock.Unlock() +func (h *SubscriptionHandler) assignSubscriptionMap(newMap map[string]data.Object) { + h.subscriptionMapLock.Lock() + defer h.subscriptionMapLock.Unlock() h.subscriptionMap = &newMap } -func (h *alarmSubscriptionHandler) getSubcriptionId() (subId string) { +func (h *SubscriptionHandler) getSubcriptionId() (subId string) { subId = uuid.New().String() return } -func (h *alarmSubscriptionHandler) encodeSubId(ctx context.Context, +func (h *SubscriptionHandler) encodeSubId( subId string, input data.Object) (output data.Object, err error) { - //get consumer name, subscriptions - err = h.jqTool.Evaluate( - `{ - "alarmSubscriptionId": $alarmSubId, - "consumerSubscriptionId": .consumerSubscriptionId, - "callback": .callback, - "filter": .filter - }`, - input, &output, - jq.String("$alarmSubId", subId), - ) - if err != nil { - return + + switch h.persistStore.Name { + case AlarmConfigMapName: + // Get consumer name, subscriptions. + err = h.jqTool.Evaluate( + `{ + "alarmSubscriptionId": $alarmSubId, + "consumerSubscriptionId": .consumerSubscriptionId, + "callback": .callback, + "filter": .filter + }`, + input, &output, + jq.String("$alarmSubId", subId), + ) + case InfraInventoryConfigMapName: + // Get consumer name, subscriptions. + err = h.jqTool.Evaluate( + `{ + "subscriptionId": $subId, + "consumerSubscriptionId": .consumerSubscriptionId, + "callback": .callback, + "filter": .filter + }`, + input, &output, + jq.String("$subId", subId), + ) + default: + err = nil } + return } -func (h *alarmSubscriptionHandler) decodeSubId(ctx context.Context, +func (h *SubscriptionHandler) decodeSubId( input data.Object) (output string, err error) { - //get cluster name, subscriptions - err = h.jqTool.Evaluate( - `.alarmSubscriptionId`, input, &output) - if err != nil { - return - } - return -} -func (h *alarmSubscriptionHandler) persistStoreAddEntry( - ctx context.Context, entryKey string, value string) (err error) { - return persiststorage.Add(h.persistStore, ctx, entryKey, value) -} + // get cluster name, subscriptions + switch h.persistStore.Name { + case AlarmConfigMapName: + err = h.jqTool.Evaluate( + `.alarmSubscriptionId`, input, &output) + case InfraInventoryConfigMapName: + err = h.jqTool.Evaluate( + `.subscriptionId`, input, &output) + default: + err = nil + } -func (h *alarmSubscriptionHandler) persistStoreDeleteEntry( - ctx context.Context, entryKey string) (err error) { - err = persiststorage.Delete(h.persistStore, ctx, entryKey) return } -func (h *alarmSubscriptionHandler) recoveryFromPersistStore(ctx context.Context) (err error) { +func (h *SubscriptionHandler) getFromPersistentStorage(ctx context.Context) (err error) { newMap, err := persiststorage.GetAll(h.persistStore, ctx) if err != nil { return @@ -464,8 +501,8 @@ func (h *alarmSubscriptionHandler) recoveryFromPersistStore(ctx context.Context) return } -func (h *alarmSubscriptionHandler) watchPersistStore(ctx context.Context) (err error) { - err = persiststorage.ProcessChanges(h.persistStore, ctx, &h.subscriptionMap, h.subscritionMapMemoryLock) +func (h *SubscriptionHandler) watchPersistStore(ctx context.Context) (err error) { + err = persiststorage.ProcessChanges(h.persistStore, ctx, &h.subscriptionMap, h.subscriptionMapLock) if err != nil { panic("failed to launch watcher") diff --git a/internal/service/alarm_subscription_handler_test.go b/internal/service/subscription_handler_test.go similarity index 75% rename from internal/service/alarm_subscription_handler_test.go rename to internal/service/subscription_handler_test.go index 1c0178934..af81084ff 100644 --- a/internal/service/alarm_subscription_handler_test.go +++ b/internal/service/subscription_handler_test.go @@ -16,6 +16,7 @@ package service import ( "context" + "fmt" . "github.com/onsi/ginkgo/v2/dsl/core" . "github.com/onsi/gomega" @@ -27,7 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var _ = Describe("alarm Subscription handler", func() { +var _ = Describe("Subscription handler", func() { Describe("Creation", func() { var ( ctx context.Context @@ -40,10 +41,29 @@ var _ = Describe("alarm Subscription handler", func() { fakeClient = k8s.NewFakeClient() }) + It("Needs a subscription type", func() { + handler, err := NewSubscriptionHandler(). + SetLogger(logger). + SetCloudID("123"). + SetKubeClient(fakeClient). + Build(ctx) + Expect(err).To(HaveOccurred()) + Expect(handler).To(BeNil()) + msg := err.Error() + Expect(msg).To( + ContainSubstring( + fmt.Sprintf("subscription type can only be %s or %s", + SubscriptionTypeAlarm, + SubscriptionTypeInfrastructureInventory), + ), + ) + }) + It("Can't be created without a logger", func() { - handler, err := NewAlarmSubscriptionHandler(). + handler, err := NewSubscriptionHandler(). SetCloudID("123"). SetKubeClient(fakeClient). + SetSubscriptionType(SubscriptionTypeInfrastructureInventory). Build(ctx) Expect(err).To(HaveOccurred()) Expect(handler).To(BeNil()) @@ -53,9 +73,10 @@ var _ = Describe("alarm Subscription handler", func() { }) It("Can't be created without a cloud identifier", func() { - handler, err := NewAlarmSubscriptionHandler(). + handler, err := NewSubscriptionHandler(). SetLogger(logger). SetKubeClient(fakeClient). + SetSubscriptionType(SubscriptionTypeInfrastructureInventory). Build(ctx) Expect(err).To(HaveOccurred()) Expect(handler).To(BeNil()) @@ -83,18 +104,32 @@ var _ = Describe("alarm Subscription handler", func() { } err := fakeClient.Create(ctx, namespace, &client.CreateOptions{}, client.FieldOwner(FieldOwner)) Expect(err).ToNot(HaveOccurred()) - configmap := &corev1.ConfigMap{ + alarmSubConfigMap := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: TestNamespace, + Name: AlarmConfigMapName, + }, + Data: nil, + } + err = fakeClient.Create(ctx, alarmSubConfigMap, &client.CreateOptions{}, client.FieldOwner(FieldOwner)) + Expect(err).ToNot(HaveOccurred()) + + resourceSubConfigMap := &corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ Kind: "ConfigMap", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ Namespace: TestNamespace, - Name: TestConfigmapName, + Name: InfraInventoryConfigMapName, }, Data: nil, } - err = fakeClient.Create(ctx, configmap, &client.CreateOptions{}, client.FieldOwner(FieldOwner)) + err = fakeClient.Create(ctx, resourceSubConfigMap, &client.CreateOptions{}, client.FieldOwner(FieldOwner)) Expect(err).ToNot(HaveOccurred()) }) @@ -103,10 +138,11 @@ var _ = Describe("alarm Subscription handler", func() { It("Translates empty list of results", func() { // Create the handler: - handler, err := NewAlarmSubscriptionHandler(). + handler, err := NewSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). SetKubeClient(fakeClient). + SetSubscriptionType(SubscriptionTypeAlarm). Build(ctx) Expect(err).ToNot(HaveOccurred()) Expect(handler).ToNot(BeNil()) @@ -122,10 +158,11 @@ var _ = Describe("alarm Subscription handler", func() { It("Translates non empty list of results", func() { // Create the handler: - handler, err := NewAlarmSubscriptionHandler(). + handler, err := NewSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). SetKubeClient(fakeClient). + SetSubscriptionType(SubscriptionTypeInfrastructureInventory). Build(ctx) Expect(err).ToNot(HaveOccurred()) Expect(handler).ToNot(BeNil()) @@ -135,7 +172,7 @@ var _ = Describe("alarm Subscription handler", func() { "customerId": "test_customer_id_prime", } obj_2 := data.Object{ - "customerId": "test_custer_id", + "customerId": "test_cluster_id", "filter": data.Object{ "notificationType": "1", "nsInstanceId": "test_instance_id", @@ -151,9 +188,9 @@ var _ = Describe("alarm Subscription handler", func() { subId_2, err := handler.addItem(ctx, req_2) Expect(err).ToNot(HaveOccurred()) - obj_1, err = handler.encodeSubId(ctx, subId_1, obj_1) + obj_1, err = handler.encodeSubId(subId_1, obj_1) Expect(err).ToNot(HaveOccurred()) - obj_2, err = handler.encodeSubId(ctx, subId_2, obj_2) + obj_2, err = handler.encodeSubId(subId_2, obj_2) Expect(err).ToNot(HaveOccurred()) subIdMap := map[string]data.Object{} @@ -169,10 +206,10 @@ var _ = Describe("alarm Subscription handler", func() { items, err := data.Collect(ctx, response.Items) Expect(err).ToNot(HaveOccurred()) Expect(items).To(HaveLen(2)) - id, err := handler.decodeSubId(ctx, items[0]) + id, err := handler.decodeSubId(items[0]) Expect(err).ToNot(HaveOccurred()) Expect(items[0]).To(Equal(subIdMap[id])) - id, err = handler.decodeSubId(ctx, items[1]) + id, err = handler.decodeSubId(items[1]) Expect(err).ToNot(HaveOccurred()) Expect(items[1]).To(Equal(subIdMap[id])) }) @@ -185,10 +222,11 @@ var _ = Describe("alarm Subscription handler", func() { Describe("Get", func() { It("Test Get functions", func() { // Create the handler: - handler, err := NewAlarmSubscriptionHandler(). + handler, err := NewSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). SetKubeClient(fakeClient). + SetSubscriptionType(SubscriptionTypeInfrastructureInventory). Build(ctx) Expect(err).ToNot(HaveOccurred()) @@ -207,10 +245,11 @@ var _ = Describe("alarm Subscription handler", func() { It("Uses the right search id ", func() { // Create the handler: - handler, err := NewAlarmSubscriptionHandler(). + handler, err := NewSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). SetKubeClient(fakeClient). + SetSubscriptionType(SubscriptionTypeInfrastructureInventory). Build(ctx) Expect(err).ToNot(HaveOccurred()) obj_1 := data.Object{ @@ -225,7 +264,7 @@ var _ = Describe("alarm Subscription handler", func() { subId_1, err := handler.addItem(ctx, req_1) Expect(err).ToNot(HaveOccurred()) - obj_1, err = handler.encodeSubId(ctx, subId_1, obj_1) + obj_1, err = handler.encodeSubId(subId_1, obj_1) Expect(err).ToNot(HaveOccurred()) // Send the request. Note that we ignore the error here because @@ -241,12 +280,13 @@ var _ = Describe("alarm Subscription handler", func() { }) Describe("Add + Delete", func() { - It("Create the alart subscription and add a subscription", func() { + It("Create the subscription handler and add a subscription", func() { // Create the handler: - handler, err := NewAlarmSubscriptionHandler(). + handler, err := NewSubscriptionHandler(). SetLogger(logger). SetCloudID("123"). SetKubeClient(fakeClient). + SetSubscriptionType(SubscriptionTypeInfrastructureInventory). Build(ctx) Expect(err).ToNot(HaveOccurred()) obj := data.Object{ @@ -264,7 +304,7 @@ var _ = Describe("alarm Subscription handler", func() { Expect(err).ToNot(HaveOccurred()) //decode the subId - sub_id, err := handler.decodeSubId(ctx, resp.Object) + sub_id, err := handler.decodeSubId(resp.Object) Expect(err).ToNot(HaveOccurred()) //use Get to verify the addrequest @@ -273,7 +313,7 @@ var _ = Describe("alarm Subscription handler", func() { }) Expect(err).ToNot(HaveOccurred()) //extract sub_id and verify - sub_id_get, err := handler.decodeSubId(ctx, get_resp.Object) + sub_id_get, err := handler.decodeSubId(get_resp.Object) Expect(err).ToNot(HaveOccurred()) Expect(sub_id).To(Equal(sub_id_get)) @@ -291,7 +331,6 @@ var _ = Describe("alarm Subscription handler", func() { Expect(msg).To(Equal("not found")) Expect(get_resp.Object).To(BeEmpty()) }) - }) }) })