diff --git a/src/WorkflowCore/Interface/IActivityController.cs b/src/WorkflowCore/Interface/IActivityController.cs index 46464e8e8..362bd90cc 100644 --- a/src/WorkflowCore/Interface/IActivityController.cs +++ b/src/WorkflowCore/Interface/IActivityController.cs @@ -15,6 +15,7 @@ public class PendingActivity public interface IActivityController { Task GetPendingActivity(string activityName, string workerId, TimeSpan? timeout = null); + Task GetPendingActivity(string activityName, string workflowId, string workerId, TimeSpan? timeout = null); Task ReleaseActivityToken(string token); Task SubmitActivitySuccess(string token, object result); Task SubmitActivityFailure(string token, object result); diff --git a/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs b/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs index 2f22a45f4..cf835c0ff 100644 --- a/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs +++ b/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs @@ -17,7 +17,9 @@ public interface ISubscriptionRepository Task GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default); Task GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default); - + + Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default); + Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default); Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default); diff --git a/src/WorkflowCore/Services/ActivityController.cs b/src/WorkflowCore/Services/ActivityController.cs index e37481521..48a331d84 100644 --- a/src/WorkflowCore/Services/ActivityController.cs +++ b/src/WorkflowCore/Services/ActivityController.cs @@ -66,6 +66,47 @@ public async Task GetPendingActivity(string activityName, strin } + public async Task GetPendingActivity(string activityName, string workflowId, string workerId, TimeSpan? timeout = null) + { + var endTime = _dateTimeProvider.UtcNow.Add(timeout ?? TimeSpan.Zero); + var firstPass = true; + EventSubscription subscription = null; + while ((subscription == null && _dateTimeProvider.UtcNow < endTime) || firstPass) + { + if (!firstPass) + await Task.Delay(100); + subscription = await _subscriptionRepository.GetFirstOpenSubscription(Event.EventTypeActivity, activityName, workflowId, _dateTimeProvider.Now); + if (subscription != null) + if (!await _lockProvider.AcquireLock($"sub:{subscription.Id}", CancellationToken.None)) + subscription = null; + firstPass = false; + } + if (subscription == null) + return null; + + try + { + var token = Token.Create(subscription.Id, subscription.EventKey); + var result = new PendingActivity + { + Token = token.Encode(), + ActivityName = subscription.EventKey, + Parameters = subscription.SubscriptionData, + TokenExpiry = DateTime.MaxValue + }; + + if (!await _subscriptionRepository.SetSubscriptionToken(subscription.Id, result.Token, workerId, result.TokenExpiry)) + return null; + + return result; + } + finally + { + await _lockProvider.ReleaseLock($"sub:{subscription.Id}"); + } + + } + public async Task ReleaseActivityToken(string token) { var tokenObj = Token.Decode(token); diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs index 96ea772af..8199e79df 100644 --- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs @@ -152,6 +152,16 @@ public Task GetFirstOpenSubscription(string eventName, string } } + public Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken _ = default) + { + lock (_subscriptions) + { + var result = _subscriptions + .FirstOrDefault(x => x.ExternalToken == null && x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf); + return Task.FromResult(result); + } + } + public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) { lock (_subscriptions) diff --git a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs index c2fab66e1..4b87e73cd 100644 --- a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs @@ -53,6 +53,8 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService) public Task GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, asOf); + public Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, workflowId, asOf); + public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry); public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token); diff --git a/src/WorkflowCore/Services/WorkflowHost.cs b/src/WorkflowCore/Services/WorkflowHost.cs index ca5e04b2f..8a670ff06 100644 --- a/src/WorkflowCore/Services/WorkflowHost.cs +++ b/src/WorkflowCore/Services/WorkflowHost.cs @@ -170,6 +170,11 @@ public Task GetPendingActivity(string activityName, string work return _activityController.GetPendingActivity(activityName, workerId, timeout); } + public Task GetPendingActivity(string activityName, string workflowId, string workerId, TimeSpan? timeout = null) + { + return _activityController.GetPendingActivity(activityName, workflowId, workerId, timeout); + } + public Task ReleaseActivityToken(string token) { return _activityController.ReleaseActivityToken(token); diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index e0c7082fd..7eefd966b 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -327,6 +327,16 @@ public async Task GetFirstOpenSubscription(string eventName, } } + public async Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default) + { + using (var db = ConstructDbContext()) + { + var raw = await db.Set().FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf && x.ExternalToken == null, cancellationToken); + + return raw?.ToEventSubscription(); + } + } + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) { using (var db = ConstructDbContext()) diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs index 79e9fe5b7..9803a227c 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs @@ -205,6 +205,14 @@ public async Task GetFirstOpenSubscription(string eventName, return await query.FirstOrDefaultAsync(cancellationToken); } + public async Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default) + { + var query = EventSubscriptions + .Find(x => x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf && x.ExternalToken == null); + + return await query.FirstOrDefaultAsync(cancellationToken); + } + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) { var update = Builders.Update diff --git a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs index d115f1eb2..7d50196a3 100644 --- a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs @@ -172,6 +172,22 @@ public async Task GetFirstOpenSubscription(string eventName, } } + public async Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default) + { + using (var session = _database.OpenAsyncSession()) + { + var q = session.Query().Where(x => + x.EventName == eventKey + && x.EventKey == eventKey + && x.WorkflowId == workflowId + && x.SubscribeAsOf <= asOf + && x.ExternalToken == null + ); + + return await q.FirstOrDefaultAsync(cancellationToken); + } + } + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) { try diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs index d3828389a..8fcccbaa6 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs @@ -421,6 +421,45 @@ public async Task GetFirstOpenSubscription(string eventName, return result.FirstOrDefault(); } + public async Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default) + { + var result = new List(); + var asOfTicks = asOf.ToUniversalTime().Ticks; + + var request = new QueryRequest + { + TableName = $"{_tablePrefix}-{SUBCRIPTION_TABLE}", + IndexName = "ix_slug", + Select = "ALL_PROJECTED_ATTRIBUTES", + KeyConditionExpression = "event_slug = :slug and workflow_id = :workflow_id and subscribe_as_of <= :as_of", + FilterExpression = "attribute_not_exists(external_token)", + Limit = 1, + ExpressionAttributeValues = new Dictionary + { + { + ":slug", new AttributeValue($"{eventName}:{eventKey}") + }, + { + ":workflow_id", new AttributeValue(workflowId) + }, + { + ":as_of", new AttributeValue + { + N = Convert.ToString(asOfTicks) + } + } + }, + ScanIndexForward = true + }; + + var response = await _client.QueryAsync(request, cancellationToken); + + foreach (var item in response.Items) + result.Add(item.ToEventSubscription()); + + return result.FirstOrDefault(); + } + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) { var request = new UpdateItemRequest diff --git a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs index 36e51b86c..d5d86f0e4 100644 --- a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs @@ -119,6 +119,25 @@ public async Task GetFirstOpenSubscription(string eventName, return eventSubscription; } + public async Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken) + { + EventSubscription eventSubscription = null; + using (FeedIterator feedIterator = _subscriptionContainer.Value.GetItemLinqQueryable() + .Where(x => x.ExternalToken == null && x.EventName == eventName && x.EventKey == eventKey && x.WorkflowId == workflowId && x.SubscribeAsOf <= asOf) + .ToFeedIterator()) + { + while (feedIterator.HasMoreResults && eventSubscription == null) + { + foreach (var item in await feedIterator.ReadNextAsync(cancellationToken)) + { + eventSubscription = PersistedSubscription.ToInstance(item); + } + } + } + + return eventSubscription; + } + public async Task> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken) { var events = new List(); diff --git a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs index dcd932af0..702ba702d 100644 --- a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs @@ -117,7 +117,7 @@ public async Task> GetSubscriptions(string eventN } return result; - } + } public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) { @@ -138,6 +138,11 @@ public async Task GetFirstOpenSubscription(string eventName, return (await GetSubscriptions(eventName, eventKey, asOf, cancellationToken)).FirstOrDefault(sub => string.IsNullOrEmpty(sub.ExternalToken)); } + public async Task GetFirstOpenSubscription(string eventName, string eventKey, string workflowId, DateTime asOf, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) { var item = JsonConvert.DeserializeObject(await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId), _serializerSettings); diff --git a/src/samples/WorkflowCore.Sample18/Program.cs b/src/samples/WorkflowCore.Sample18/Program.cs index aa7d3f2f7..e7ff550b8 100644 --- a/src/samples/WorkflowCore.Sample18/Program.cs +++ b/src/samples/WorkflowCore.Sample18/Program.cs @@ -20,7 +20,7 @@ static void Main(string[] args) var workflowId = host.StartWorkflow("activity-sample", new MyData { Request = "Spend $1,000,000" }).Result; - var approval = host.GetPendingActivity("get-approval", "worker1", TimeSpan.FromMinutes(1)).Result; + var approval = host.GetPendingActivity("get-approval", workflowId, "worker1", TimeSpan.FromMinutes(1)).Result; if (approval != null) { @@ -37,8 +37,8 @@ private static IServiceProvider ConfigureServices() //setup dependency injection IServiceCollection services = new ServiceCollection(); //services.AddWorkflow(); - services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow")); - //services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true)); + //services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow")); + services.AddWorkflow(x => x.UseSqlServer(@"Server=.\\SqlExpress;Database=WFCore;Trusted_Connection=True;", true, true)); //services.AddWorkflow(x => x.UsePostgreSQL(@"Server=127.0.0.1;Port=5432;Database=workflow;User Id=postgres;", true, true)); services.AddLogging(cfg => { diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/ActivityScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/ActivityScenario.cs index 1e082f4db..8a8eb794a 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/ActivityScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/ActivityScenario.cs @@ -50,7 +50,7 @@ public ActivityScenario() public void Scenario() { var workflowId = StartWorkflow(new MyDataClass { ActivityInput = new ActivityInput { Value1 = "a", Value2 = 1 } }); - var activity = Host.GetPendingActivity("act-1", "worker1", TimeSpan.FromSeconds(30)).Result; + var activity = Host.GetPendingActivity("act-1", workflowId, "worker1", TimeSpan.FromSeconds(30)).Result; if (activity != null) {