Skip to content

Commit

Permalink
Version 0.2.0: delays, sequential execution of a list of activities, …
Browse files Browse the repository at this point in the history
…activities returning unit
  • Loading branch information
mikhailshilkov committed Jan 7, 2019
1 parent 52b3f44 commit 885ee55
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 37 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,21 @@ let workflow = orchestrator {

See [the full example](https://github.com/mikhailshilkov/DurableFunctions.FSharp/blob/master/samples/FanOutFanIn.fs).

Delays
------

You can pause the orchestrator by calling `Orchestrator.delay` function:

``` fsharp
let sendAndPause email = orchestrator {
do! Activity.call sendNewsletter email
do! Orchestrator.delay (TimeSpan.FromHours 1.0)
}
```

Note that the durable timer is used to implement this delay, so the orchestrator function will actually stop the current
execution and will resume after the delay expires. See [Timers in Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-timers).

Contributions
-------------

Expand Down
36 changes: 36 additions & 0 deletions samples/Delay.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module samples.Delay

open System
open Microsoft.Azure.WebJobs
open DurableFunctions.FSharp

let sendNewsletter =
let impl (email: string) =
Console.WriteLine (sprintf "Fake newsletter sent to %s" email)
true
Activity.define "SendNewsletter" impl

let newsletter = orchestrator {

let pauseDuration = TimeSpan.FromHours 1.0

let sendAndPause email = orchestrator {
let! response = Activity.call sendNewsletter email
do! Orchestrator.delay pauseDuration
return response
}

let! responses =
["[email protected]"; "[email protected]"; "[email protected]"]
|> List.map sendAndPause
|> Activity.seq

return responses |> List.forall id
}

[<FunctionName("SendNewsletter")>]
let SendNewsletter([<ActivityTrigger>] url) = Activity.run sendNewsletter url

[<FunctionName("NewsletterWorkflow")>]
let NewsletterWorkflow ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
Orchestrator.run (newsletter, context)
16 changes: 7 additions & 9 deletions samples/FanOutFanIn.fs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
namespace samples
module samples.FanInFanOut

open Microsoft.Azure.WebJobs
open DurableFunctions.FSharp

module FanInFanOut =

let hardWork =
let hardWork =
fun item -> async {
do! Async.Sleep 1000
return sprintf "Worked hard on %s!" item
}
|> Activity.defineAsync "HardWork"

let workflow = orchestrator {
let workflow = orchestrator {
let! items =
["Tokyo"; "Seattle"; "London"]
|> List.map (Activity.call hardWork)
Expand All @@ -22,9 +20,9 @@ module FanInFanOut =
return String.concat ", " items
}

[<FunctionName("HardWork")>]
let HardWork([<ActivityTrigger>] name) = hardWork.run name
[<FunctionName("HardWork")>]
let HardWork([<ActivityTrigger>] name) = hardWork.run name

[<FunctionName("FanInFanOut")>]
let Run ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
[<FunctionName("FanInFanOut")>]
let FanInFanOut ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
Orchestrator.run (workflow, context)
12 changes: 5 additions & 7 deletions samples/Hello.fs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
namespace samples
module samples.HelloSequence

open Microsoft.Azure.WebJobs
open DurableFunctions.FSharp

module HelloSequence =

[<FunctionName("SayHello")>]
let SayHello([<ActivityTrigger>] name) =
[<FunctionName("SayHello")>]
let SayHello([<ActivityTrigger>] name) =
sprintf "Hello %s!" name

[<FunctionName("HelloSequence")>]
let Run ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
[<FunctionName("HelloSequence")>]
let Run ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
context |>
orchestrator {
let! hello1 = Activity.callByName<string> "SayHello" "Tokyo"
Expand Down
12 changes: 5 additions & 7 deletions samples/Parameters.fs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
namespace samples
module samples.InputParameter

open Microsoft.Azure.WebJobs
open DurableFunctions.FSharp

open TypedSequence

module InputParameter =

let workflow input = orchestrator {
let workflow input = orchestrator {
let! hello1 = Activity.call sayHello (input + " Tokyo")
let! hello2 = Activity.call sayHello (input + " Seattle")
let! hello3 = Activity.call sayHello (input + " London")

// given "Bla" returns ["Hello Bla Tokyo!", "Hello Bla Seattle!", "Hello Bla London!"]
return [hello1; hello2; hello3]
}
}

[<FunctionName("WorkflowWithInputParameter")>]
let Run ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
[<FunctionName("WorkflowWithInputParameter")>]
let Run ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
Orchestrator.run (workflow, context)
18 changes: 8 additions & 10 deletions samples/Typed.fs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
namespace samples
module samples.TypedSequence

open Microsoft.Azure.WebJobs
open DurableFunctions.FSharp

module TypedSequence =

let sayHello =
let sayHello =
Activity.define "SayTyped" (sprintf "Hello typed %s!")

let workflow = orchestrator {
let workflow = orchestrator {
let! hello1 = Activity.call sayHello "Tokyo"
let! hello2 = Activity.call sayHello "Seattle"
let! hello3 = Activity.call sayHello "London"

// returns ["Hello typed Tokyo!", "Hello typed Seattle!", "Hello typed London!"]
return [hello1; hello2; hello3]
}
}

[<FunctionName("SayTyped")>]
let SayHello([<ActivityTrigger>] name) = sayHello.run name
[<FunctionName("SayTyped")>]
let SayHello([<ActivityTrigger>] name) = sayHello.run name

[<FunctionName("TypedSequence")>]
let Run ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
[<FunctionName("TypedSequence")>]
let Run ([<OrchestrationTrigger>] context: DurableOrchestrationContext) =
Orchestrator.run (workflow, context)
3 changes: 2 additions & 1 deletion samples/samples.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
<Compile Include="Typed.fs" />
<Compile Include="Parameters.fs" />
<Compile Include="FanOutFanIn.fs" />
<Compile Include="Delay.fs" />
<Compile Include="HttpStart.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.24" />
<PackageReference Include="DurableFunctions.FSharp" Version="0.1.0" />
<PackageReference Include="DurableFunctions.FSharp" Version="0.2.0" />
<PackageReference Include="TaskBuilder.fs" Version="1.0.0" />
</ItemGroup>

Expand Down
15 changes: 14 additions & 1 deletion src/DurableFunctions.FSharp/Activity.fs
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,17 @@ module Activity =
let all (tasks: OrchestratorBuilder.ContextTask<'a> seq) (c: DurableOrchestrationContext) =
let bla = tasks |> Seq.map (fun x -> x c)
let whenAll = Task.WhenAll bla
whenAll.ContinueWith(fun (xs: Task<'a []>) -> xs.Result |> List.ofArray)
whenAll.ContinueWith(fun (xs: Task<'a []>) -> xs.Result |> List.ofArray)

/// Call all specified tasks sequentially one after the other and combine the results together.
let seq (tasks: OrchestratorBuilder.ContextTask<'a> list) =
let rec work acc (rem : OrchestratorBuilder.ContextTask<'a> list) =
match rem with
| [] -> fun _ -> Task.FromResult acc
| d :: rest -> orchestrator {
let! t = d
return! work (acc @ [t]) rest
}
work [] tasks


4 changes: 2 additions & 2 deletions src/DurableFunctions.FSharp/DurableFunctions.FSharp.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

<!-- NuGet Publishing Metadata -->
<PropertyGroup>
<Title>Azure Functions F# API</Title>
<Title>Azure Durable Functions F# API</Title>
<Authors>Mikhail Shilkov</Authors>
<Description>F#-friendly API layer for Azure Durable Functions</Description>
<PackageReleaseNotes>https://github.com/mikhailshilkov/DurableFunctions.FSharp/releases/</PackageReleaseNotes>
Expand All @@ -24,6 +24,6 @@
<RepositoryUrl>https://github.com/mikhailshilkov/DurableFunctions.FSharp</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>0.1.0</Version>
<Version>0.2.0</Version>
</PropertyGroup>
</Project>
12 changes: 12 additions & 0 deletions src/DurableFunctions.FSharp/Orchestrator.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace DurableFunctions.FSharp

open System
open System.Threading
open System.Threading.Tasks
open Microsoft.Azure.WebJobs
open OrchestratorBuilder
Expand All @@ -16,4 +18,14 @@ type Orchestrator = class
static member run (workflow : 'a -> ContextTask<'b>, context : DurableOrchestrationContext) : Task<'b> =
let input = context.GetInput<'a> ()
workflow input context

/// Returns a fixed value as a orchestrator.
static member ret value (_: DurableOrchestrationContext) =
Task.FromResult value

/// Delays orchestrator execution by the specified timespan.
static member delay (timespan: TimeSpan) (context: DurableOrchestrationContext) =
let deadline = context.CurrentUtcDateTime.Add timespan
context.CreateTimer(deadline, CancellationToken.None)

end
17 changes: 17 additions & 0 deletions src/DurableFunctions.FSharp/OrchestratorCE.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ open System.Runtime.CompilerServices
open Microsoft.Azure.WebJobs

module OrchestratorBuilder =

type ContextTask = DurableOrchestrationContext -> Task
type ContextTask<'a> = DurableOrchestrationContext -> Task<'a>

/// Represents the state of a computation:
Expand Down Expand Up @@ -100,6 +102,13 @@ module OrchestratorBuilder =
else // Await and continue later when a result is available.
Await (awt, (fun () -> continuation(awt.GetResult())))

let inline bindTaskUnit (task : Task) (continuation : unit -> Step<'b>) =
let awt = task.GetAwaiter()
if awt.IsCompleted then // Proceed to the next step based on the result we already have.
continuation(awt.GetResult())
else // Await and continue later when a result is available.
Await (awt, (fun () -> continuation(awt.GetResult())))

/// Chains together a step with its following step.
/// Note that this requires that the first step has no result.
/// This prevents constructs like `task { return 1; return 2; }`.
Expand Down Expand Up @@ -133,6 +142,12 @@ module OrchestratorBuilder =
let a = bindTask (task c) continuation
run (fun () -> a) c)

let inline bindContextTaskUnit (task : ContextTask) (continuation : unit -> Step<'b>) =
ReturnFrom(
fun c ->
let a = bindTaskUnit (task c) continuation
run (fun () -> a) c)

/// Builds a `System.Threading.Tasks.Task<'a>` similarly to a C# async/await method, but with
/// all awaited tasks automatically configured *not* to resume on the captured context.
/// This is often preferable when writing library code that is not context-aware, but undesirable when writing
Expand All @@ -152,6 +167,8 @@ module OrchestratorBuilder =
// Everything else can use bindGenericAwaitable via an extension member (defined later).
member inline __.Bind(task : ContextTask<'a>, continuation : 'a -> 'b Step) : 'b Step =
bindContextTask task continuation
member inline __.Bind(task : ContextTask, continuation : unit -> 'b Step) : 'b Step =
bindContextTaskUnit task continuation

[<AutoOpen>]
module Builders =
Expand Down

0 comments on commit 885ee55

Please sign in to comment.