Skip to content

Commit

Permalink
Allows to provide custom scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Voytsekhovskiy committed Oct 3, 2017
1 parent 1ebffec commit 3849244
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
29 changes: 16 additions & 13 deletions src/Angara.Flow/Execution.fs
Original file line number Diff line number Diff line change
Expand Up @@ -289,19 +289,19 @@ type internal Progress<'v>(v:'v, i:VertexIndex, progressReported : ObservableSou

// TODO: it is the Scheduler who should prepare the RuntimeContext for the target function.
[<AbstractClass>]
type Scheduler() =
abstract Start : (unit -> unit) -> unit
type Scheduler<'m when 'm:>ExecutableMethod and 'm:comparison> () =
abstract Start : 'm * (unit -> unit) -> unit

static member ThreadPool() : Scheduler = upcast ThreadPoolScheduler()
static member ThreadPool() : Scheduler<'m> = upcast ThreadPoolScheduler<'m>()

and [<Class>] ThreadPoolScheduler() =
inherit Scheduler()
and [<Class>] ThreadPoolScheduler<'m when 'm:>ExecutableMethod and 'm:comparison>() =
inherit Scheduler<'m>()

static let scheduler = LimitedConcurrencyLevelTaskScheduler(System.Environment.ProcessorCount)
static do Trace.Runtime.TraceInformation(sprintf "ThreadPoolScheduler limits concurrency level with %d" scheduler.MaximumConcurrencyLevel)
static let mutable ExSeq = 0L

override x.Start (f: unit -> unit) =
override x.Start (_, f: unit -> unit) =
let id = System.Threading.Interlocked.Increment(&ExSeq)
System.Threading.Tasks.Task.Factory.StartNew((fun() ->
try
Expand All @@ -313,7 +313,7 @@ and [<Class>] ThreadPoolScheduler() =
raise ex), CancellationToken.None, Tasks.TaskCreationOptions.LongRunning, scheduler) |> ignore

[<Sealed>]
type Runtime<'m when 'm:>ExecutableMethod and 'm:comparison> (source:IObservable<State<'m,MethodOutput> * RuntimeAction<'m> list>, scheduler : Scheduler) =
type Runtime<'m when 'm:>ExecutableMethod and 'm:comparison> (source:IObservable<State<'m,MethodOutput> * RuntimeAction<'m> list>, scheduler : Scheduler<'m>) =
let messages = ObservableSource<Message<'m,MethodOutput>>()
let cancels = Dictionary<'m*VertexIndex,CancellationTokenSource>()
let progressReported = ObservableSource<'m*VertexIndex*float>()
Expand Down Expand Up @@ -417,14 +417,14 @@ type Runtime<'m when 'm:>ExecutableMethod and 'm:comparison> (source:IObservable
| Execute (v,slice,time) ->
let cts = new CancellationTokenSource()
let func = buildEvaluation (v,slice,time,state) cts false
scheduler.Start func
scheduler.Start (v, func)
cancel (v,slice) cancels
cancels.Add((v,slice), cts)

| Continue (v,slice,time) ->
let cts = new CancellationTokenSource()
let func = buildEvaluation (v,slice,time,state) cts true
scheduler.Start func
scheduler.Start (v, func)
cancel (v,slice) cancels
cancels.Add((v,slice), cts)

Expand Down Expand Up @@ -470,7 +470,7 @@ module internal Helpers =
open Helpers

[<Class>]
type Engine<'m when 'm:>ExecutableMethod and 'm:comparison>(initialState : State<'m, MethodOutput>, scheduler:Scheduler) =
type Engine<'m when 'm:>ExecutableMethod and 'm:comparison>(initialState : State<'m, MethodOutput>, scheduler:Scheduler<'m>) =

let messages = ObservableSource()

Expand Down Expand Up @@ -573,8 +573,11 @@ module Control =
let output = extractOutput m outRef state
unbox output

let runToFinal<'m when 'm:>ExecutableMethod and 'm:comparison>(state:State<'m, MethodOutput>) =
use engine = new Engine<'m>(state, Scheduler.ThreadPool())
let runToFinalIn<'m when 'm:>ExecutableMethod and 'm:comparison> (scheduler:Scheduler<'m>) (state:State<'m, MethodOutput>) =
use engine = new Engine<'m>(state, scheduler)
let final = pickFinal engine.Changes
engine.Start()
final.GetResult()
final.GetResult()

let runToFinal<'m when 'm:>ExecutableMethod and 'm:comparison> (state:State<'m, MethodOutput>) =
runToFinalIn (ThreadPoolScheduler.ThreadPool()) state
15 changes: 9 additions & 6 deletions src/Angara.Flow/Execution.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,20 @@ type MethodOutput =

/// Runs methods.
[<AbstractClass>]
type Scheduler =
abstract Start : (unit -> unit) -> unit
type Scheduler<'m when 'm:>ExecutableMethod and 'm:comparison> =
abstract Start : 'm * (unit -> unit) -> unit

/// Creates a scheduler which runs methods in the thread pool and limits concurrency level
/// depending on number of CPU cores.
static member ThreadPool : unit -> Scheduler
static member ThreadPool : unit -> Scheduler<'m>


/// Represents an engine which maintains a combination of the StateMachine and Runtime.
/// It makes the changes flow from the state machine to the runtime and
/// makes the messages flow from the runtime to the state machine.
[<Class>]
type Engine<'m when 'm:>ExecutableMethod and 'm:comparison> =
new : State<'m, MethodOutput> * Scheduler -> Engine<'m>
new : State<'m, MethodOutput> * Scheduler<'m> -> Engine<'m>
interface IDisposable

member State : State<'m, MethodOutput>
Expand All @@ -96,8 +96,8 @@ type Engine<'m when 'm:>ExecutableMethod and 'm:comparison> =
member Post : Messages.Message<'m, MethodOutput> -> unit

module Control =
open System.Reactive.Linq;

open System.Reactive.Linq
/// Asynchronously returns a successful final state of the state updates sequence.
/// If the state has failed method, throws an exception.
val pickFinal<'m when 'm:>ExecutableMethod and 'm:comparison> : IObservable<StateUpdate<'m,MethodOutput>> -> Reactive.Subjects.AsyncSubject<State<'m,MethodOutput>>
Expand All @@ -108,6 +108,9 @@ module Control =
/// Starts the flow from the given flow state blocking until the final state is reached and then returns that state.
val runToFinal<'m when 'm:>ExecutableMethod and 'm:comparison> : State<'m,MethodOutput> -> State<'m,MethodOutput>

/// Starts the flow from the given flow state blocking until the final state is reached and then returns that state.
val runToFinalIn<'m when 'm:>ExecutableMethod and 'm:comparison> : Scheduler<'m> -> State<'m,MethodOutput> -> State<'m,MethodOutput>

type FlowFailedException =
new : exn seq -> FlowFailedException
inherit System.AggregateException

0 comments on commit 3849244

Please sign in to comment.