diff --git a/src/Angara.Flow/Execution.fs b/src/Angara.Flow/Execution.fs index d02790e..9a3cb67 100644 --- a/src/Angara.Flow/Execution.fs +++ b/src/Angara.Flow/Execution.fs @@ -289,19 +289,19 @@ type internal Progress<'v>(v:'v, i:VertexIndex, progressReported : ObservableSou // TODO: it is the Scheduler who should prepare the RuntimeContext for the target function. [] -type Scheduler() = - abstract Start : (unit -> unit) -> unit +type Scheduler<'m when 'm:>ExecutableMethod and 'm:comparison> () = + abstract Start : 'm * (unit -> unit) -> unit - static member ThreadPool() : Scheduler = upcast ThreadPoolScheduler() + static member ThreadPool() : Scheduler<'m> = upcast ThreadPoolScheduler<'m>() -and [] ThreadPoolScheduler() = - inherit Scheduler() +and [] ThreadPoolScheduler<'m when 'm:>ExecutableMethod and 'm:comparison>() = + inherit Scheduler<'m>() static let scheduler = LimitedConcurrencyLevelTaskScheduler(System.Environment.ProcessorCount) static do Trace.Runtime.TraceInformation(sprintf "ThreadPoolScheduler limits concurrency level with %d" scheduler.MaximumConcurrencyLevel) static let mutable ExSeq = 0L - override x.Start (f: unit -> unit) = + override x.Start (_, f: unit -> unit) = let id = System.Threading.Interlocked.Increment(&ExSeq) System.Threading.Tasks.Task.Factory.StartNew((fun() -> try @@ -313,7 +313,7 @@ and [] ThreadPoolScheduler() = raise ex), CancellationToken.None, Tasks.TaskCreationOptions.LongRunning, scheduler) |> ignore [] -type Runtime<'m when 'm:>ExecutableMethod and 'm:comparison> (source:IObservable * RuntimeAction<'m> list>, scheduler : Scheduler) = +type Runtime<'m when 'm:>ExecutableMethod and 'm:comparison> (source:IObservable * RuntimeAction<'m> list>, scheduler : Scheduler<'m>) = let messages = ObservableSource>() let cancels = Dictionary<'m*VertexIndex,CancellationTokenSource>() let progressReported = ObservableSource<'m*VertexIndex*float>() @@ -417,14 +417,14 @@ type Runtime<'m when 'm:>ExecutableMethod and 'm:comparison> (source:IObservable | Execute (v,slice,time) -> let cts = new CancellationTokenSource() let func = buildEvaluation (v,slice,time,state) cts false - scheduler.Start func + scheduler.Start (v, func) cancel (v,slice) cancels cancels.Add((v,slice), cts) | Continue (v,slice,time) -> let cts = new CancellationTokenSource() let func = buildEvaluation (v,slice,time,state) cts true - scheduler.Start func + scheduler.Start (v, func) cancel (v,slice) cancels cancels.Add((v,slice), cts) @@ -470,7 +470,7 @@ module internal Helpers = open Helpers [] -type Engine<'m when 'm:>ExecutableMethod and 'm:comparison>(initialState : State<'m, MethodOutput>, scheduler:Scheduler) = +type Engine<'m when 'm:>ExecutableMethod and 'm:comparison>(initialState : State<'m, MethodOutput>, scheduler:Scheduler<'m>) = let messages = ObservableSource() @@ -573,8 +573,11 @@ module Control = let output = extractOutput m outRef state unbox output - let runToFinal<'m when 'm:>ExecutableMethod and 'm:comparison>(state:State<'m, MethodOutput>) = - use engine = new Engine<'m>(state, Scheduler.ThreadPool()) + let runToFinalIn<'m when 'm:>ExecutableMethod and 'm:comparison> (scheduler:Scheduler<'m>) (state:State<'m, MethodOutput>) = + use engine = new Engine<'m>(state, scheduler) let final = pickFinal engine.Changes engine.Start() - final.GetResult() \ No newline at end of file + final.GetResult() + + let runToFinal<'m when 'm:>ExecutableMethod and 'm:comparison> (state:State<'m, MethodOutput>) = + runToFinalIn (ThreadPoolScheduler.ThreadPool()) state \ No newline at end of file diff --git a/src/Angara.Flow/Execution.fsi b/src/Angara.Flow/Execution.fsi index 45b32e4..81e6bec 100644 --- a/src/Angara.Flow/Execution.fsi +++ b/src/Angara.Flow/Execution.fsi @@ -71,12 +71,12 @@ type MethodOutput = /// Runs methods. [] -type Scheduler = - abstract Start : (unit -> unit) -> unit +type Scheduler<'m when 'm:>ExecutableMethod and 'm:comparison> = + abstract Start : 'm * (unit -> unit) -> unit /// Creates a scheduler which runs methods in the thread pool and limits concurrency level /// depending on number of CPU cores. - static member ThreadPool : unit -> Scheduler + static member ThreadPool : unit -> Scheduler<'m> /// Represents an engine which maintains a combination of the StateMachine and Runtime. @@ -84,7 +84,7 @@ type Scheduler = /// makes the messages flow from the runtime to the state machine. [] type Engine<'m when 'm:>ExecutableMethod and 'm:comparison> = - new : State<'m, MethodOutput> * Scheduler -> Engine<'m> + new : State<'m, MethodOutput> * Scheduler<'m> -> Engine<'m> interface IDisposable member State : State<'m, MethodOutput> @@ -96,8 +96,8 @@ type Engine<'m when 'm:>ExecutableMethod and 'm:comparison> = member Post : Messages.Message<'m, MethodOutput> -> unit module Control = - open System.Reactive.Linq; - + open System.Reactive.Linq + /// Asynchronously returns a successful final state of the state updates sequence. /// If the state has failed method, throws an exception. val pickFinal<'m when 'm:>ExecutableMethod and 'm:comparison> : IObservable> -> Reactive.Subjects.AsyncSubject> @@ -108,6 +108,9 @@ module Control = /// Starts the flow from the given flow state blocking until the final state is reached and then returns that state. val runToFinal<'m when 'm:>ExecutableMethod and 'm:comparison> : State<'m,MethodOutput> -> State<'m,MethodOutput> + /// Starts the flow from the given flow state blocking until the final state is reached and then returns that state. + val runToFinalIn<'m when 'm:>ExecutableMethod and 'm:comparison> : Scheduler<'m> -> State<'m,MethodOutput> -> State<'m,MethodOutput> + type FlowFailedException = new : exn seq -> FlowFailedException inherit System.AggregateException \ No newline at end of file