Skip to content

Commit

Permalink
add grpc broker.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jul 9, 2024
1 parent 6583de3 commit 3ae27f2
Show file tree
Hide file tree
Showing 13 changed files with 629 additions and 27 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ CLIENT_SECRET ?= maestro
ENABLE_JWT ?= true
ENABLE_AUTHZ ?= true
ENABLE_OCM_MOCK ?= false
ENABLE_GRPC ?= false
ENABLE_GRPC_SERVER ?= false
ENABLE_GRPC_BROKER ?= false

# default replicas for maestro server
REPLICAS ?= 1
Expand Down Expand Up @@ -303,7 +304,8 @@ cmds:
--param="EXTERNAL_APPS_DOMAIN=${external_apps_domain}" \
--param="CONSUMER_NAME=$(consumer_name)" \
--param="ENABLE_OCM_MOCK=$(ENABLE_OCM_MOCK)" \
--param="ENABLE_GRPC=$(ENABLE_GRPC)" \
--param="ENABLE_GRPC_SERVER=$(ENABLE_GRPC_SERVER)" \
--param="ENABLE_GRPC_BROKER=$(ENABLE_GRPC_BROKER)" \
> "templates/$*-template.json"


Expand Down
9 changes: 8 additions & 1 deletion cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ func runServer(cmd *cobra.Command, args []string) {
eventBroadcaster := event.NewEventBroadcaster()

// Create the servers
var grpcBroker *server.GRPCBroker
if environments.Environment().Config.GRPCServer.EnableGRPCBroker {
grpcBroker = server.NewGRPCBroker()
}
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
healthcheckServer := server.NewHealthCheckServer()
pulseServer := server.NewPulseServer(eventBroadcaster)
controllersServer := server.NewControllersServer(pulseServer)
controllersServer := server.NewControllersServer(pulseServer, grpcBroker)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -75,6 +79,9 @@ func runServer(cmd *cobra.Command, args []string) {
go healthcheckServer.Start()
go pulseServer.Start(ctx)
go controllersServer.Start(ctx)
if grpcBroker != nil {
go grpcBroker.Start(ctx)
}

<-ctx.Done()
}
20 changes: 16 additions & 4 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer(pulseServer *PulseServer) *ControllersServer {
func NewControllersServer(pulseServer *PulseServer, grpcBroker *GRPCBroker) *ControllersServer {
var lockFactory db.LockFactory
if grpcBroker != nil {
lockFactory = db.NewEventAdvisoryLockFactory(env().Database.SessionFactory, grpcBroker.IsConsumerSubscribed)
} else {
lockFactory = db.NewAdvisoryLockFactory(env().Database.SessionFactory)
}

s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
db.NewAdvisoryLockFactory(env().Database.SessionFactory),
lockFactory,
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand All @@ -23,14 +29,20 @@ func NewControllersServer(pulseServer *PulseServer) *ControllersServer {
}

sourceClient := env().Clients.CloudEventsSource
s.KindControllerManager.Add(&controllers.ControllerConfig{
controllerConfig := &controllers.ControllerConfig{
Source: "Resources",
Handlers: map[api.EventType][]controllers.ControllerHandlerFunc{
api.CreateEventType: {sourceClient.OnCreate},
api.UpdateEventType: {sourceClient.OnUpdate},
api.DeleteEventType: {sourceClient.OnDelete},
},
})
}
if grpcBroker != nil {
controllerConfig.Handlers[api.CreateEventType] = append(controllerConfig.Handlers[api.CreateEventType], grpcBroker.OnCreate)
controllerConfig.Handlers[api.UpdateEventType] = append(controllerConfig.Handlers[api.UpdateEventType], grpcBroker.OnUpdate)
controllerConfig.Handlers[api.DeleteEventType] = append(controllerConfig.Handlers[api.DeleteEventType], grpcBroker.OnDelete)
}
s.KindControllerManager.Add(controllerConfig)

s.StatusController.Add(map[api.StatusEventType][]controllers.StatusHandlerFunc{
api.StatusUpdateEventType: {pulseServer.OnStatusUpdate},
Expand Down
Loading

0 comments on commit 3ae27f2

Please sign in to comment.