Skip to content

Commit

Permalink
fix: JVM hot reload fixes (#3544)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Nov 27, 2024
1 parent af8ca4c commit 4237f3e
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 34 deletions.
6 changes: 6 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
moduleState := map[string]moduleStateEntry{}
moduleByDeploymentKey := map[string]string{}
mostRecentDeploymentByModule := map[string]string{}
schemaByDeploymentKey := map[string]*schemapb.Module{}

// Seed the notification channel with the current deployments.
seedDeployments, err := s.dal.GetActiveDeployments(ctx)
Expand Down Expand Up @@ -1528,15 +1529,18 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
// Deleted key
if deletion, ok := notification.Deleted.Get(); ok {
name := moduleByDeploymentKey[deletion.String()]
schema := schemaByDeploymentKey[deletion.String()]
moduleRemoved := mostRecentDeploymentByModule[name] == deletion.String()
response = &ftlv1.PullSchemaResponse{
ModuleName: name,
DeploymentKey: proto.String(deletion.String()),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED,
ModuleRemoved: moduleRemoved,
Schema: schema,
}
delete(moduleState, name)
delete(moduleByDeploymentKey, deletion.String())
delete(schemaByDeploymentKey, deletion.String())
if moduleRemoved {
delete(mostRecentDeploymentByModule, name)
}
Expand Down Expand Up @@ -1592,7 +1596,9 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
}
moduleState[message.Schema.Name] = newState
delete(moduleByDeploymentKey, message.Key.String()) // The deployment may have changed.
delete(schemaByDeploymentKey, message.Key.String())
moduleByDeploymentKey[message.Key.String()] = message.Schema.Name
schemaByDeploymentKey[message.Key.String()] = moduleSchema
}

if response != nil {
Expand Down
54 changes: 36 additions & 18 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,7 @@ func (l *localScaling) Start(ctx context.Context, endpoint url.URL, leaser lease
return
case devEndpoints := <-l.devModeEndpointsUpdates:
l.lock.Lock()
l.devModeEndpoints[devEndpoints.Module] = &devModeRunner{
uri: devEndpoints.Endpoint,
debugPort: devEndpoints.DebugPort,
}
if ide, ok := l.ideSupport.Get(); ok {
if devEndpoints.DebugPort != 0 {
if debug, ok := l.debugPorts[devEndpoints.Module]; ok {
debug.Port = devEndpoints.DebugPort
} else {
l.debugPorts[devEndpoints.Module] = &localdebug.DebugInfo{
Port: devEndpoints.DebugPort,
Language: devEndpoints.Language,
}
}
}
ide.SyncIDEDebugIntegrations(ctx, l.debugPorts)

}
l.updateDevModeEndpoint(ctx, devEndpoints)
l.lock.Unlock()
}
}
Expand All @@ -90,6 +73,29 @@ func (l *localScaling) Start(ctx context.Context, endpoint url.URL, leaser lease
return nil
}

// updateDevModeEndpoint updates the dev mode endpoint for a module
// Must be called under lock
func (l *localScaling) updateDevModeEndpoint(ctx context.Context, devEndpoints scaling.DevModeEndpoints) {
l.devModeEndpoints[devEndpoints.Module] = &devModeRunner{
uri: devEndpoints.Endpoint,
debugPort: devEndpoints.DebugPort,
}
if ide, ok := l.ideSupport.Get(); ok {
if devEndpoints.DebugPort != 0 {
if debug, ok := l.debugPorts[devEndpoints.Module]; ok {
debug.Port = devEndpoints.DebugPort
} else {
l.debugPorts[devEndpoints.Module] = &localdebug.DebugInfo{
Port: devEndpoints.DebugPort,
Language: devEndpoints.Language,
}
}
}
ide.SyncIDEDebugIntegrations(ctx, l.debugPorts)

}
}

func (l *localScaling) GetEndpointForDeployment(ctx context.Context, module string, deployment string) (optional.Option[url.URL], error) {
l.lock.Lock()
defer l.lock.Unlock()
Expand Down Expand Up @@ -186,6 +192,18 @@ func (l *localScaling) handleSchemaChange(ctx context.Context, msg *ftlv1.PullSc

func (l *localScaling) reconcileRunners(ctx context.Context, deploymentRunners *deploymentInfo) error {
// Must be called under lock

// First make sure we have all endpoint updates
for {
select {
case devEndpoints := <-l.devModeEndpointsUpdates:
l.updateDevModeEndpoint(ctx, devEndpoints)
continue
default:
}
break
}

logger := log.FromContext(ctx)
if deploymentRunners.replicas > 0 && !deploymentRunners.runner.Ok() && deploymentRunners.exits < maxExits {
if err := l.startRunner(ctx, deploymentRunners.key, deploymentRunners); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/java/time/src/main/java/ftl/time/Time.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ public class Time {
@Verb
@Export
public TimeResponse time() {
return new TimeResponse(OffsetDateTime.now());
return new TimeResponse(OffsetDateTime.now().plusDays(1));
}
}
2 changes: 1 addition & 1 deletion internal/buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ func (e *Engine) watchForPluginEvents(originalCtx context.Context) {
e.rawEngineUpdates <- ModuleBuildStarted{Config: meta.module.Config, IsAutoRebuild: true}

case languageplugin.AutoRebuildEndedEvent:
_, deploy, err := handleBuildResult(ctx, e.projectConfig, meta.module.Config, event.Result, nil)
_, deploy, err := handleBuildResult(ctx, e.projectConfig, meta.module.Config, event.Result, e.devModeEndpointUpdates)
if err != nil {
e.rawEngineUpdates <- ModuleBuildFailed{Config: meta.module.Config, IsAutoRebuild: true, Error: err}
if errors.Is(err, errInvalidateDependencies) {
Expand Down
12 changes: 12 additions & 0 deletions jvm-runtime/plugin/common/java_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func TestJavaConfigDefaults(t *testing.T) {
LanguageConfig: map[string]any{
"build-tool": "maven",
},
Watch: []string{
"pom.xml",
"src/**",
"build/generated",
"target/generated-sources",
},
},
},
{
Expand All @@ -50,6 +56,12 @@ func TestJavaConfigDefaults(t *testing.T) {
LanguageConfig: map[string]any{
"build-tool": "maven",
},
Watch: []string{
"pom.xml",
"src/**",
"build/generated",
"target/generated-sources",
},
},
},
} {
Expand Down
119 changes: 105 additions & 14 deletions jvm-runtime/plugin/common/jvmcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/sha256"
islices "github.com/TBD54566975/ftl/internal/slices"
"github.com/TBD54566975/ftl/internal/watch"
)

const BuildLockTimeout = time.Minute
Expand Down Expand Up @@ -161,14 +162,13 @@ func (s *Service) SyncStubReferences(ctx context.Context, req *connect.Request[l
// rebuild must include the latest build context id provided by the request or subsequent BuildContextUpdated
// calls.
func (s *Service) Build(ctx context.Context, req *connect.Request[langpb.BuildRequest], stream *connect.ServerStream[langpb.BuildEvent]) error {
if req.Msg.RebuildAutomatically {
return s.handleDevModeRequest(ctx, req, stream)
}

buildCtx, err := buildContextFromProto(req.Msg.BuildContext)
if err != nil {
return err
}
if req.Msg.RebuildAutomatically {
return s.runDevMode(ctx, req, buildCtx, stream)
}

// Initial build
if err := buildAndSend(ctx, stream, buildCtx, false); err != nil {
Expand All @@ -178,17 +178,96 @@ func (s *Service) Build(ctx context.Context, req *connect.Request[langpb.BuildRe
return nil
}

func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request[langpb.BuildRequest], stream *connect.ServerStream[langpb.BuildEvent]) error {
logger := log.FromContext(ctx)
func (s *Service) runDevMode(ctx context.Context, req *connect.Request[langpb.BuildRequest], buildCtx buildContext, stream *connect.ServerStream[langpb.BuildEvent]) error {
s.acceptsContextUpdates.Store(true)
defer s.acceptsContextUpdates.Store(false)
first := true
for {
if !first {
err := stream.Send(&langpb.BuildEvent{Event: &langpb.BuildEvent_AutoRebuildStarted{AutoRebuildStarted: &langpb.AutoRebuildStarted{ContextId: buildCtx.ID}}})
if err != nil {
return fmt.Errorf("could not send build event: %w", err)
}
}
err := s.runQuarkusDev(ctx, req, stream, first)
first = false
if err != nil {
return err
}
select {
case <-ctx.Done():
return nil
default:
}
watchPatterns, err := relativeWatchPatterns(buildCtx.Config.Dir, buildCtx.Config.Watch)
if err != nil {
return err
}

watcher := watch.NewWatcher(watchPatterns...)
if err := watchFiles(ctx, watcher, buildCtx); err != nil {
return err
}
}
}

func relativeWatchPatterns(moduleDir string, watchPaths []string) ([]string, error) {
relativePaths := make([]string, len(watchPaths))
for i, path := range watchPaths {
relative, err := filepath.Rel(moduleDir, path)
if err != nil {
return nil, fmt.Errorf("could create relative path for watch pattern: %w", err)
}
relativePaths[i] = relative
}
return relativePaths, nil
}

// watchFiles watches for file changes in the module directory and triggers a rebuild when changes are detected.
// This is only used when quarkus:dev is not running, e.g. if the module is so broken that it can't start.
func watchFiles(ctx context.Context, watcher *watch.Watcher, buildCtx buildContext) error {
watchTopic, err := watcher.Watch(ctx, time.Second, []string{buildCtx.Config.Dir})
if err != nil {
return fmt.Errorf("could not watch for file changes: %w", err)
}
log.FromContext(ctx).Debugf("Watching for file changes: %s", buildCtx.Config.Dir)
watchEvents := make(chan watch.WatchEvent, 32)
watchTopic.Subscribe(watchEvents)

// We need watcher to calculate file hashes before we do initial build so we can detect changes
select {
case e := <-watchEvents:
_, ok := e.(watch.WatchEventModuleAdded)
if !ok {
return fmt.Errorf("expected module added event, got: %T", e)
}
case <-time.After(3 * time.Second):
return fmt.Errorf("expected module added event, got no event")
case <-ctx.Done():
return fmt.Errorf("context done: %w", ctx.Err())
}

select {
case e := <-watchEvents:
if change, ok := e.(watch.WatchEventModuleChanged); ok {
log.FromContext(ctx).Infof("Found file changes: %s", change)
return nil
}
case <-ctx.Done():
return nil
}

return nil
}
func (s *Service) runQuarkusDev(ctx context.Context, req *connect.Request[langpb.BuildRequest], stream *connect.ServerStream[langpb.BuildEvent], firstAttempt bool) error {
logger := log.FromContext(ctx)
// cancel context when stream ends so that watcher can be stopped
ctx, cancel := context.WithCancel(ctx)
defer cancel()

events := make(chan buildContextUpdatedEvent, 32)
s.updatesTopic.Subscribe(events)
defer s.updatesTopic.Unsubscribe(events)
first := true
buildCtx, err := buildContextFromProto(req.Msg.BuildContext)
if err != nil {
return err
Expand Down Expand Up @@ -237,6 +316,19 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
for {
select {
case <-ctx.Done():
if firstAttempt {
// the context is done before we notified the build engine
// we need to send a build failure event
err = stream.Send(&langpb.BuildEvent{Event: &langpb.BuildEvent_BuildFailure{
BuildFailure: &langpb.BuildFailure{
IsAutomaticRebuild: !firstAttempt,
ContextId: buildCtx.ID,
Errors: &langpb.ErrorList{Errors: []*langpb.Error{{Msg: "The dev mode process exited", Level: langpb.Error_ERROR, Type: langpb.Error_COMPILER}}},
}}})
if err != nil {
return fmt.Errorf("could not send build event: %w", err)
}
}
return nil
case bc := <-events:
buildCtx = bc.buildCtx
Expand All @@ -260,9 +352,7 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
}
}
if changed {
if !first {
logger.Infof("New schema detected, doing fast redeploy")
}

buildErrs, err := loadProtoErrors(buildCtx.Config)
if err != nil {
// This is likely a transient error
Expand All @@ -273,14 +363,14 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
// skip reading schema
err = stream.Send(&langpb.BuildEvent{Event: &langpb.BuildEvent_BuildFailure{
BuildFailure: &langpb.BuildFailure{
IsAutomaticRebuild: !first,
IsAutomaticRebuild: !firstAttempt,
ContextId: buildCtx.ID,
Errors: buildErrs,
}}})
if err != nil {
return fmt.Errorf("could not send build event: %w", err)
}
first = false
firstAttempt = false
continue
}

Expand All @@ -295,7 +385,7 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
Event: &langpb.BuildEvent_BuildSuccess{
BuildSuccess: &langpb.BuildSuccess{
ContextId: req.Msg.BuildContext.Id,
IsAutomaticRebuild: !first,
IsAutomaticRebuild: !firstAttempt,
Module: moduleProto,
DevEndpoint: ptr(fmt.Sprintf("http://localhost:%d", address.Port)),
DebugPort: &debugPort32,
Expand All @@ -306,7 +396,7 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
if err != nil {
return fmt.Errorf("could not send build event: %w", err)
}
first = false
firstAttempt = false
}

}
Expand Down Expand Up @@ -483,6 +573,7 @@ func (s *Service) ModuleConfigDefaults(ctx context.Context, req *connect.Request
defaults := langpb.ModuleConfigDefaultsResponse{
GeneratedSchemaDir: ptr("src/main/ftl-module-schema"),
LanguageConfig: &structpb.Struct{Fields: map[string]*structpb.Value{}},
Watch: []string{"pom.xml", "src/**", "build/generated", "target/generated-sources"},
}
dir := req.Msg.Dir
pom := filepath.Join(dir, "pom.xml")
Expand Down

0 comments on commit 4237f3e

Please sign in to comment.