Skip to content

Fixed the WorkflowInstanceController not to claim instances of workflows explicitly assigned to other operators #526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/operator/Synapse.Operator/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

services.AddScoped<WorkflowController>();
services.AddScoped<IResourceController<Workflow>>(provider => provider.GetRequiredService<WorkflowController>());
services.AddScoped<IWorkflowController>(provider => provider.GetRequiredService<WorkflowController>());

services.AddScoped<WorkflowInstanceController>();
services.AddScoped<IResourceController<WorkflowInstance>>(provider => provider.GetRequiredService<WorkflowInstanceController>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace Synapse.Operator.Services;

/// <summary>
/// Defines the fundamentals of the service used to access the current Synapse Operator
/// Defines the fundamentals of a service used to access the current Synapse Operator
/// </summary>
public interface IOperatorController
: IHostedService
Expand All @@ -25,4 +25,4 @@ public interface IOperatorController
/// </summary>
IResourceMonitor<Resources.Operator> Operator { get; }

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Synapse.Operator.Services;

/// <summary>
/// Defines the fundamentals of a service used to access all monitored workflows
/// </summary>
public interface IWorkflowController
{

/// <summary>
/// Gets a dictionary containing all monitored workflows
/// </summary>
IReadOnlyDictionary<string, Workflow> Workflows { get; }

}
5 changes: 4 additions & 1 deletion src/operator/Synapse.Operator/Services/WorkflowController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Synapse.Operator.Services;
/// <param name="operatorOptions">The current <see cref="Configuration.OperatorOptions"/></param>
/// <param name="operatorAccessor">The service used to access the current <see cref="Resources.Operator"/></param>
public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<Workflow>> controllerOptions, IResourceRepository resources, IOptions<OperatorOptions> operatorOptions, IOperatorController operatorAccessor)
: ResourceController<Workflow>(loggerFactory, controllerOptions, resources)
: ResourceController<Workflow>(loggerFactory, controllerOptions, resources), IWorkflowController
{

/// <summary>
Expand All @@ -48,6 +48,9 @@ public class WorkflowController(IServiceProvider serviceProvider, ILoggerFactory
/// </summary>
protected ConcurrentDictionary<string, WorkflowScheduler> Schedulers { get; } = [];

/// <inheritdoc/>
public IReadOnlyDictionary<string, Workflow> Workflows => this.Resources.AsReadOnly();

/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented;
using Neuroglia.Data.Infrastructure.Services;

namespace Synapse.Operator.Services;
Expand All @@ -23,8 +24,9 @@ namespace Synapse.Operator.Services;
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
/// <param name="workflowController">The service used to access all monitored <see cref="Workflow"/>s</param>
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository<Document, string> documents)
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IWorkflowController workflowController, IRepository<Document, string> documents)
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
{

Expand All @@ -38,6 +40,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
/// </summary>
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;

/// <summary>
/// Gets a dictionary containing all monitored <see cref="Workflow"/>s
/// </summary>
protected IReadOnlyDictionary<string, Workflow> Workflows => workflowController.Workflows;

/// <summary>
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
/// </summary>
Expand Down Expand Up @@ -87,7 +94,8 @@ protected virtual async Task<WorkflowInstanceHandler> CreateWorkflowInstanceHand
protected virtual async Task<bool> TryClaimAsync(WorkflowInstance resource, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(resource);
if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
var isClaimable = this.IsWorkflowInstanceClaimable(resource);
if (isClaimable.HasValue) return isClaimable.Value;
try
{
var originalResource = resource.Clone();
Expand All @@ -112,7 +120,8 @@ protected virtual async Task<bool> TryClaimAsync(WorkflowInstance resource, Canc
protected virtual async Task<bool> TryReleaseAsync(WorkflowInstance resource, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(resource);
if (resource.Metadata.Labels != null && resource.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
var isClaimable = this.IsWorkflowInstanceClaimable(resource);
if (isClaimable.HasValue) return isClaimable.Value;
try
{
var originalResource = resource.Clone();
Expand Down Expand Up @@ -205,6 +214,20 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual Task OnResourceSelectorChangedAsync(IDictionary<string, string>? selector) => this.ReconcileAsync(this.CancellationTokenSource.Token);

/// <summary>
/// Determines whether or not the specified <see cref="WorkflowInstance"/> can be claimed by the current <see cref="Resources.Operator"/>
/// </summary>
/// <param name="workflowInstance">The <see cref="WorkflowInstance"/> to check</param>
/// <returns>A boolean indicating whether or not the specified <see cref="WorkflowInstance"/> can be claimed by the current <see cref="Resources.Operator"/></returns>
protected virtual bool? IsWorkflowInstanceClaimable(WorkflowInstance workflowInstance)
{
ArgumentNullException.ThrowIfNull(workflowInstance);
if (workflowInstance.Metadata.Labels != null && workflowInstance.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out var operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
if (this.Workflows.TryGetValue(this.GetResourceCacheKey(workflowInstance.Spec.Definition.Name, workflowInstance.Spec.Definition.Namespace), out var workflow) && workflow != null
&& workflow.Metadata.Labels != null && workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out operatorQualifiedName)) return operatorQualifiedName == this.Operator.Resource.GetQualifiedName();
return null;
}

/// <inheritdoc/>
protected override async ValueTask DisposeAsync(bool disposing)
{
Expand Down
Loading