diff --git a/README.md b/README.md index 823fa75..8de2d66 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,21 @@ let workflow = orchestrator { See [the full example](https://github.com/mikhailshilkov/DurableFunctions.FSharp/blob/master/samples/FanOutFanIn.fs). +Delays +------ + +You can pause the orchestrator by calling `Orchestrator.delay` function: + +``` fsharp +let sendAndPause email = orchestrator { + do! Activity.call sendNewsletter email + do! Orchestrator.delay (TimeSpan.FromHours 1.0) +} +``` + +Note that the durable timer is used to implement this delay, so the orchestrator function will actually stop the current +execution and will resume after the delay expires. See [Timers in Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-timers). + Contributions ------------- diff --git a/samples/Delay.fs b/samples/Delay.fs new file mode 100644 index 0000000..911e875 --- /dev/null +++ b/samples/Delay.fs @@ -0,0 +1,36 @@ +module samples.Delay + +open System +open Microsoft.Azure.WebJobs +open DurableFunctions.FSharp + +let sendNewsletter = + let impl (email: string) = + Console.WriteLine (sprintf "Fake newsletter sent to %s" email) + true + Activity.define "SendNewsletter" impl + +let newsletter = orchestrator { + + let pauseDuration = TimeSpan.FromHours 1.0 + + let sendAndPause email = orchestrator { + let! response = Activity.call sendNewsletter email + do! Orchestrator.delay pauseDuration + return response + } + + let! responses = + ["joe@foo.com"; "alex@bar.com"; "john@buzz.com"] + |> List.map sendAndPause + |> Activity.seq + + return responses |> List.forall id +} + +[] +let SendNewsletter([] url) = Activity.run sendNewsletter url + +[] +let NewsletterWorkflow ([] context: DurableOrchestrationContext) = + Orchestrator.run (newsletter, context) \ No newline at end of file diff --git a/samples/FanOutFanIn.fs b/samples/FanOutFanIn.fs index 6611e31..6bccced 100644 --- a/samples/FanOutFanIn.fs +++ b/samples/FanOutFanIn.fs @@ -1,18 +1,16 @@ -namespace samples +module samples.FanInFanOut open Microsoft.Azure.WebJobs open DurableFunctions.FSharp -module FanInFanOut = - - let hardWork = +let hardWork = fun item -> async { do! Async.Sleep 1000 return sprintf "Worked hard on %s!" item } |> Activity.defineAsync "HardWork" - let workflow = orchestrator { +let workflow = orchestrator { let! items = ["Tokyo"; "Seattle"; "London"] |> List.map (Activity.call hardWork) @@ -22,9 +20,9 @@ module FanInFanOut = return String.concat ", " items } - [] - let HardWork([] name) = hardWork.run name +[] +let HardWork([] name) = hardWork.run name - [] - let Run ([] context: DurableOrchestrationContext) = +[] +let FanInFanOut ([] context: DurableOrchestrationContext) = Orchestrator.run (workflow, context) \ No newline at end of file diff --git a/samples/Hello.fs b/samples/Hello.fs index 7a77703..2bc471a 100644 --- a/samples/Hello.fs +++ b/samples/Hello.fs @@ -1,16 +1,14 @@ -namespace samples +module samples.HelloSequence open Microsoft.Azure.WebJobs open DurableFunctions.FSharp -module HelloSequence = - - [] - let SayHello([] name) = +[] +let SayHello([] name) = sprintf "Hello %s!" name - [] - let Run ([] context: DurableOrchestrationContext) = +[] +let Run ([] context: DurableOrchestrationContext) = context |> orchestrator { let! hello1 = Activity.callByName "SayHello" "Tokyo" diff --git a/samples/Parameters.fs b/samples/Parameters.fs index 42bd0fe..bfecbb0 100644 --- a/samples/Parameters.fs +++ b/samples/Parameters.fs @@ -1,21 +1,19 @@ -namespace samples +module samples.InputParameter open Microsoft.Azure.WebJobs open DurableFunctions.FSharp open TypedSequence -module InputParameter = - - let workflow input = orchestrator { +let workflow input = orchestrator { let! hello1 = Activity.call sayHello (input + " Tokyo") let! hello2 = Activity.call sayHello (input + " Seattle") let! hello3 = Activity.call sayHello (input + " London") // given "Bla" returns ["Hello Bla Tokyo!", "Hello Bla Seattle!", "Hello Bla London!"] return [hello1; hello2; hello3] - } +} - [] - let Run ([] context: DurableOrchestrationContext) = +[] +let Run ([] context: DurableOrchestrationContext) = Orchestrator.run (workflow, context) \ No newline at end of file diff --git a/samples/Typed.fs b/samples/Typed.fs index 54a444a..522e0b6 100644 --- a/samples/Typed.fs +++ b/samples/Typed.fs @@ -1,25 +1,23 @@ -namespace samples +module samples.TypedSequence open Microsoft.Azure.WebJobs open DurableFunctions.FSharp -module TypedSequence = - - let sayHello = +let sayHello = Activity.define "SayTyped" (sprintf "Hello typed %s!") - let workflow = orchestrator { +let workflow = orchestrator { let! hello1 = Activity.call sayHello "Tokyo" let! hello2 = Activity.call sayHello "Seattle" let! hello3 = Activity.call sayHello "London" // returns ["Hello typed Tokyo!", "Hello typed Seattle!", "Hello typed London!"] return [hello1; hello2; hello3] - } +} - [] - let SayHello([] name) = sayHello.run name +[] +let SayHello([] name) = sayHello.run name - [] - let Run ([] context: DurableOrchestrationContext) = +[] +let Run ([] context: DurableOrchestrationContext) = Orchestrator.run (workflow, context) \ No newline at end of file diff --git a/samples/samples.fsproj b/samples/samples.fsproj index fb1df58..a4bab42 100644 --- a/samples/samples.fsproj +++ b/samples/samples.fsproj @@ -10,12 +10,13 @@ + - + diff --git a/src/DurableFunctions.FSharp/Activity.fs b/src/DurableFunctions.FSharp/Activity.fs index 940cbac..bd97fba 100644 --- a/src/DurableFunctions.FSharp/Activity.fs +++ b/src/DurableFunctions.FSharp/Activity.fs @@ -41,4 +41,17 @@ module Activity = let all (tasks: OrchestratorBuilder.ContextTask<'a> seq) (c: DurableOrchestrationContext) = let bla = tasks |> Seq.map (fun x -> x c) let whenAll = Task.WhenAll bla - whenAll.ContinueWith(fun (xs: Task<'a []>) -> xs.Result |> List.ofArray) \ No newline at end of file + whenAll.ContinueWith(fun (xs: Task<'a []>) -> xs.Result |> List.ofArray) + + /// Call all specified tasks sequentially one after the other and combine the results together. + let seq (tasks: OrchestratorBuilder.ContextTask<'a> list) = + let rec work acc (rem : OrchestratorBuilder.ContextTask<'a> list) = + match rem with + | [] -> fun _ -> Task.FromResult acc + | d :: rest -> orchestrator { + let! t = d + return! work (acc @ [t]) rest + } + work [] tasks + + \ No newline at end of file diff --git a/src/DurableFunctions.FSharp/DurableFunctions.FSharp.fsproj b/src/DurableFunctions.FSharp/DurableFunctions.FSharp.fsproj index 1cec0a3..993632f 100644 --- a/src/DurableFunctions.FSharp/DurableFunctions.FSharp.fsproj +++ b/src/DurableFunctions.FSharp/DurableFunctions.FSharp.fsproj @@ -13,7 +13,7 @@ - Azure Functions F# API + Azure Durable Functions F# API Mikhail Shilkov F#-friendly API layer for Azure Durable Functions https://github.com/mikhailshilkov/DurableFunctions.FSharp/releases/ @@ -24,6 +24,6 @@ https://github.com/mikhailshilkov/DurableFunctions.FSharp git true - 0.1.0 + 0.2.0 \ No newline at end of file diff --git a/src/DurableFunctions.FSharp/Orchestrator.fs b/src/DurableFunctions.FSharp/Orchestrator.fs index fc3dc19..e526b7d 100644 --- a/src/DurableFunctions.FSharp/Orchestrator.fs +++ b/src/DurableFunctions.FSharp/Orchestrator.fs @@ -1,5 +1,7 @@ namespace DurableFunctions.FSharp +open System +open System.Threading open System.Threading.Tasks open Microsoft.Azure.WebJobs open OrchestratorBuilder @@ -16,4 +18,14 @@ type Orchestrator = class static member run (workflow : 'a -> ContextTask<'b>, context : DurableOrchestrationContext) : Task<'b> = let input = context.GetInput<'a> () workflow input context + + /// Returns a fixed value as a orchestrator. + static member ret value (_: DurableOrchestrationContext) = + Task.FromResult value + + /// Delays orchestrator execution by the specified timespan. + static member delay (timespan: TimeSpan) (context: DurableOrchestrationContext) = + let deadline = context.CurrentUtcDateTime.Add timespan + context.CreateTimer(deadline, CancellationToken.None) + end \ No newline at end of file diff --git a/src/DurableFunctions.FSharp/OrchestratorCE.fs b/src/DurableFunctions.FSharp/OrchestratorCE.fs index beedb48..f68ce0c 100644 --- a/src/DurableFunctions.FSharp/OrchestratorCE.fs +++ b/src/DurableFunctions.FSharp/OrchestratorCE.fs @@ -6,6 +6,8 @@ open System.Runtime.CompilerServices open Microsoft.Azure.WebJobs module OrchestratorBuilder = + + type ContextTask = DurableOrchestrationContext -> Task type ContextTask<'a> = DurableOrchestrationContext -> Task<'a> /// Represents the state of a computation: @@ -100,6 +102,13 @@ module OrchestratorBuilder = else // Await and continue later when a result is available. Await (awt, (fun () -> continuation(awt.GetResult()))) + let inline bindTaskUnit (task : Task) (continuation : unit -> Step<'b>) = + let awt = task.GetAwaiter() + if awt.IsCompleted then // Proceed to the next step based on the result we already have. + continuation(awt.GetResult()) + else // Await and continue later when a result is available. + Await (awt, (fun () -> continuation(awt.GetResult()))) + /// Chains together a step with its following step. /// Note that this requires that the first step has no result. /// This prevents constructs like `task { return 1; return 2; }`. @@ -133,6 +142,12 @@ module OrchestratorBuilder = let a = bindTask (task c) continuation run (fun () -> a) c) + let inline bindContextTaskUnit (task : ContextTask) (continuation : unit -> Step<'b>) = + ReturnFrom( + fun c -> + let a = bindTaskUnit (task c) continuation + run (fun () -> a) c) + /// Builds a `System.Threading.Tasks.Task<'a>` similarly to a C# async/await method, but with /// all awaited tasks automatically configured *not* to resume on the captured context. /// This is often preferable when writing library code that is not context-aware, but undesirable when writing @@ -152,6 +167,8 @@ module OrchestratorBuilder = // Everything else can use bindGenericAwaitable via an extension member (defined later). member inline __.Bind(task : ContextTask<'a>, continuation : 'a -> 'b Step) : 'b Step = bindContextTask task continuation + member inline __.Bind(task : ContextTask, continuation : unit -> 'b Step) : 'b Step = + bindContextTaskUnit task continuation [] module Builders =