Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
IVar.get deadlock fix
Browse files Browse the repository at this point in the history
  • Loading branch information
eulerfx committed Mar 23, 2017
1 parent 01c390a commit badc864
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 59 deletions.
3 changes: 2 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### 0.0.36-alpha001 - 10.03.2017
### 0.0.36-alpha001 - 23.03.2017

* Improve produce and fetch codec performance.
* Fix possible deadlock with Async.AwaitTask usage.

### 0.0.35-alpha001 - 08.04.2017

Expand Down
54 changes: 26 additions & 28 deletions src/kafunk/Utility/Async.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,31 @@ open System.Collections.Concurrent
open Kafunk


let private awaitTaskUnit (t:Task) =
Async.FromContinuations <| fun (ok,err,cnc) ->
t.ContinueWith(fun t ->
if t.IsFaulted then err(t.Exception)
elif t.IsCanceled then cnc(OperationCanceledException("Task wrapped with Async.AwaitTask has been cancelled.", t.Exception))
elif t.IsCompleted then ok()
else failwith "invalid Task state!") |> ignore

let private awaitTaskCancellationAsError (t:Task<'a>) : Async<'a> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task<'a>) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok t.Result
else failwith "invalid Task state!") |> ignore

let private awaitTaskUnitCancellationAsError (t:Task) : Async<unit> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok ()
else failwith "invalid Task state!") |> ignore


/// A write-once concurrent variable.
type IVar<'a> = TaskCompletionSource<'a>

Expand Down Expand Up @@ -51,7 +76,7 @@ module IVar =

/// Creates an async computation which returns the value contained in an IVar.
let inline get (i:IVar<'a>) : Async<'a> =
i.Task |> Async.AwaitTask
i.Task |> awaitTaskCancellationAsError

/// Returns a cancellation token which is cancelled when the IVar is set.
let inline toCancellationToken (i:IVar<_>) =
Expand Down Expand Up @@ -93,33 +118,6 @@ module Task =
ivar.Task)
|> join




let private awaitTaskUnit (t:Task) =
Async.FromContinuations <| fun (ok,err,cnc) ->
t.ContinueWith(fun t ->
if t.IsFaulted then err(t.Exception)
elif t.IsCanceled then cnc(OperationCanceledException("Task wrapped with Async.AwaitTask has been cancelled.", t.Exception))
elif t.IsCompleted then ok()
else failwith "invalid Task state!") |> ignore

let private awaitTaskCancellationAsError (t:Task<'a>) : Async<'a> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task<'a>) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok t.Result
else failwith "invalid Task state!") |> ignore

let private awaitTaskUnitCancellationAsError (t:Task) : Async<unit> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok ()
else failwith "invalid Task state!") |> ignore

[<Compile(Module)>]
module Async =

Expand Down
74 changes: 44 additions & 30 deletions tests/kafunk.Tests/Async.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,31 @@ open System.Collections.Concurrent
open Kafunk


let private awaitTaskUnit (t:Task) =
Async.FromContinuations <| fun (ok,err,cnc) ->
t.ContinueWith(fun t ->
if t.IsFaulted then err(t.Exception)
elif t.IsCanceled then cnc(OperationCanceledException("Task wrapped with Async.AwaitTask has been cancelled.", t.Exception))
elif t.IsCompleted then ok()
else failwith "invalid Task state!") |> ignore

let private awaitTaskCancellationAsError (t:Task<'a>) : Async<'a> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task<'a>) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok t.Result
else failwith "invalid Task state!") |> ignore

let private awaitTaskUnitCancellationAsError (t:Task) : Async<unit> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok ()
else failwith "invalid Task state!") |> ignore


/// A write-once concurrent variable.
type IVar<'a> = TaskCompletionSource<'a>

Expand Down Expand Up @@ -51,7 +76,7 @@ module IVar =

/// Creates an async computation which returns the value contained in an IVar.
let inline get (i:IVar<'a>) : Async<'a> =
i.Task |> Async.AwaitTask
i.Task |> awaitTaskCancellationAsError

/// Returns a cancellation token which is cancelled when the IVar is set.
let inline toCancellationToken (i:IVar<_>) =
Expand Down Expand Up @@ -93,33 +118,6 @@ module Task =
ivar.Task)
|> join




let private awaitTaskUnit (t:Task) =
Async.FromContinuations <| fun (ok,err,cnc) ->
t.ContinueWith(fun t ->
if t.IsFaulted then err(t.Exception)
elif t.IsCanceled then cnc(OperationCanceledException("Task wrapped with Async.AwaitTask has been cancelled.", t.Exception))
elif t.IsCompleted then ok()
else failwith "invalid Task state!") |> ignore

let private awaitTaskCancellationAsError (t:Task<'a>) : Async<'a> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task<'a>) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok t.Result
else failwith "invalid Task state!") |> ignore

let private awaitTaskUnitCancellationAsError (t:Task) : Async<unit> =
Async.FromContinuations <| fun (ok,err,_) ->
t.ContinueWith (fun (t:Task) ->
if t.IsFaulted then err t.Exception
elif t.IsCanceled then err (OperationCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then ok ()
else failwith "invalid Task state!") |> ignore

[<Compile(Module)>]
module Async =

Expand Down Expand Up @@ -227,7 +225,7 @@ module Async =
IVar.tryPut () res |> ignore
let ok _ =
sm.Release() |> ignore
tryComplete()
tryComplete ()
let err (ex:exn) =
sm.Release() |> ignore
IVar.tryError ex res |> ignore
Expand All @@ -239,7 +237,7 @@ module Async =
sm.Wait()
Interlocked.Increment count |> ignore
startThreadPoolWithContinuations (en.Current, ok, err, cnc, ct)
tryComplete() |> ignore
tryComplete () |> ignore
do! res.Task |> awaitTaskCancellationAsError }

/// Creates an async computation which completes when any of the argument computations completes.
Expand Down Expand Up @@ -271,6 +269,22 @@ module Async =
let chooseChoice (a:Async<'a>) (b:Async<'b>) : Async<Choice<'a, 'b>> =
choose (a |> map Choice1Of2) (b |> map Choice2Of2)

/// Cancels a computation and returns None if the CancellationToken is cancelled before the
/// computation completes.
let withCancellation (ct:CancellationToken) (a:Async<'a>) : Async<'a> = async {
let! ct2 = Async.CancellationToken
use cts = CancellationTokenSource.CreateLinkedTokenSource (ct, ct2)
let tcs = new TaskCompletionSource<'a>()
use _reg = cts.Token.Register (fun () -> tcs.SetCanceled())
let a = async {
try
let! a = a
tcs.SetResult a
with ex ->
tcs.SetException ex }
Async.Start (a, cts.Token)
return! tcs.Task |> Async.AwaitTask }

/// Cancels a computation and returns None if the CancellationToken is cancelled before the
/// computation completes.
let cancelTokenWith (ct:CancellationToken) (f:unit -> 'a) (a:Async<'a>) : Async<'a> = async {
Expand Down

0 comments on commit badc864

Please sign in to comment.