Skip to content

Commit

Permalink
Refactored AsyncVal to use PooledResizeArray under the hood
Browse files Browse the repository at this point in the history
  • Loading branch information
xperiandri committed Mar 24, 2024
1 parent d9746f0 commit 693ea56
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type GraphQLWebSocketMiddleware<'Root>
let receiveMessageViaSocket (cancellationToken : CancellationToken) (serializerOptions : JsonSerializerOptions) (socket : WebSocket) = taskResult {
let buffer = ArrayPool.Shared.Rent options.ReadBufferSize
try
let completeMessage = new PooledList<byte> ()
use completeMessage = new PooledResizeArray<byte> ()
let mutable segmentResponse : WebSocketReceiveResult = null
while (not cancellationToken.IsCancellationRequested)
&& socket |> isSocketOpen
Expand Down
6 changes: 2 additions & 4 deletions src/FSharp.Data.GraphQL.Server/Execution.fs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ let deferResults path (res : ResolverResult<obj>) : IObservable<GQLDeferredRespo
| Error errs -> Observable.singleton <| DeferredErrors (null, errs, formattedPath)

/// Collect together an array of results using the appropriate execution strategy.
let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> []) : AsyncVal<ResolverResult<KeyValuePair<string, obj> []>> = asyncVal {
let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal<ResolverResult<KeyValuePair<string, obj>>> seq) : AsyncVal<ResolverResult<KeyValuePair<string, obj> []>> = asyncVal {
let! collected =
match strategy with
| Parallel -> AsyncVal.collectParallel rs
Expand Down Expand Up @@ -354,8 +354,7 @@ let rec private direct (returnDef : OutputDef) (ctx : ResolveFieldContext) (path
| :? System.Collections.IEnumerable as enumerable ->
enumerable
|> Seq.cast<obj>
|> Seq.toArray
|> Array.mapi resolveItem
|> Seq.mapi resolveItem
|> collectFields Parallel
|> AsyncVal.map(ResolverResult.mapValue(fun items -> KeyValuePair(name, items |> Array.map(fun d -> d.Value) |> box)))
| _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue ctx.ExecutionInfo.Identifier (value.GetType()))
Expand Down Expand Up @@ -543,7 +542,6 @@ and executeObjectFields (fields : ExecutionInfo list) (objName : string) (objDef
let! res =
fields
|> Seq.map executeField
|> Seq.toArray
|> collectFields Parallel
match res with
| Error errs -> return Error errs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" />
<PackageReference Include="FSharp.Control.Reactive" />
<PackageReference Include="System.Reactive" />
</ItemGroup>
Expand Down
52 changes: 30 additions & 22 deletions src/FSharp.Data.GraphQL.Shared/AsyncVal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -130,49 +130,55 @@ module AsyncVal =
/// executed asynchronously, one by one with regard to their order in array.
/// Returned array maintain order of values.
/// If the array contains a Failure, then the entire array will not resolve
let collectSequential (values : AsyncVal<'T>[]) : AsyncVal<'T[]> =
if values.Length = 0 then Value [||]
elif values |> Array.exists isAsync then
let collectSequential (values : AsyncVal<'T> seq) : AsyncVal<'T[]> =
let values = new PooledResizeArray<_> (values)
let length = values.Count
if length = 0 then Value [||]
elif values.Exists isAsync then
Async (async {
let results = Array.zeroCreate values.Length
let exceptions = ResizeArray values.Length
for i = 0 to values.Length - 1 do
let results = Array.zeroCreate length
use exceptions = new PooledResizeArray<_> (length)
for i = 0 to length - 1 do
let v = values.[i]
match v with
| Value v -> results.[i] <- v
| Async a ->
let! r = a
results.[i] <- r
| Failure f -> exceptions.Add f
values.Dispose()
match exceptions.Count with
| 0 -> return results
| 1 -> return exceptions.First().Reraise ()
| _ -> return AggregateException exceptions |> raise
| _ -> return AggregateException (exceptions.AsReadOnly()) |> raise
})
else
let exceptions =
use values = values
use exceptions =
values
|> Array.choose (function
| Failure f -> Some f
| _ -> None)
match exceptions.Length with
| 0 -> Value (values |> Array.map (fun (Value v) -> v))
|> PooledResizeArray.vChoose (function
| Failure f -> ValueSome f
| _ -> ValueNone)
match exceptions.Count with
| 0 -> Value (values |> Seq.map (fun (Value v) -> v) |> Seq.toArray)
| 1 -> Failure (exceptions.First ())
| _ -> Failure (AggregateException exceptions)
| _ -> Failure (AggregateException (exceptions.AsReadOnly()))

/// Converts array of AsyncVals into AsyncVal with array results.
/// In case when are non-immediate values in provided array, they are
/// executed all in parallel, in unordered fashion. Order of values
/// inside returned array is maintained.
/// If the array contains a Failure, then the entire array will not resolve
let collectParallel (values : AsyncVal<'T>[]) : AsyncVal<'T[]> =
if values.Length = 0 then Value [||]
let collectParallel (values : AsyncVal<'T> seq) : AsyncVal<'T[]> =
use values = new PooledResizeArray<_> (values)
let length = values.Count
if length = 0 then Value [||]
else
let indexes = List<_> (0)
let continuations = List<_> (0)
let results = Array.zeroCreate values.Length
let exceptions = ResizeArray values.Length
for i = 0 to values.Length - 1 do
let indexes = new PooledResizeArray<_> (length)
let continuations = new PooledResizeArray<_> (length)
let results = Array.zeroCreate length
use exceptions = new PooledResizeArray<_> (length)
for i = 0 to length - 1 do
let value = values.[i]
match value with
| Value v -> results.[i] <- v
Expand All @@ -182,13 +188,15 @@ module AsyncVal =
| Failure f -> exceptions.Add f
match exceptions.Count with
| 1 -> AsyncVal.Failure (exceptions.First ())
| count when count > 1 -> AsyncVal.Failure (AggregateException exceptions)
| count when count > 1 -> AsyncVal.Failure (AggregateException (exceptions.AsReadOnly()))
| _ ->
if indexes.Count = 0 then Value (results)
else Async (async {
let! vals = continuations |> Async.Parallel
for i = 0 to indexes.Count - 1 do
results.[indexes.[i]] <- vals.[i]
indexes.Dispose()
continuations.Dispose()
return results
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Server</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Server.AspNetCore</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>FSharp.Data.GraphQL.Client</_Parameter1>
</AssemblyAttribute>
Expand All @@ -29,6 +32,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" />
<PackageReference Include="FParsec" />
<PackageReference Include="FSharp.SystemTextJson" />
<PackageReference Include="FsToolkit.ErrorHandling" />
Expand All @@ -44,6 +48,7 @@
<Compile Include="Helpers\Extensions.fs" />
<Compile Include="Helpers\Reflection.fs" />
<Compile Include="Helpers\MemoryCache.fs" />
<Compile Include="Helpers\PooledResizeArray.fs" />
<Compile Include="Errors.fs" />
<Compile Include="Exception.fs" />
<Compile Include="ValidationTypes.fs" />
Expand Down
36 changes: 36 additions & 0 deletions src/FSharp.Data.GraphQL.Shared/Helpers/PooledResizeArray.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace FSharp.Data.GraphQL

open Collections.Pooled

type internal PooledResizeArray<'T> = PooledList<'T>

module internal PooledResizeArray =

let ofSeqWithCapacity capacity items =
let list = new PooledResizeArray<'T> (capacity = capacity)
list.AddRange (collection = items)
list

let exists f (list : PooledResizeArray<'T>) = list.Exists (f)

let length (list : PooledResizeArray<'T>) = list.Count

let vChoose f (list : PooledResizeArray<'T>) =
let res = new PooledResizeArray<_> (list.Count)

for i = 0 to length list - 1 do
match f list[i] with
| ValueNone -> ()
| ValueSome b -> res.Add (b)

res

let choose f (list : PooledResizeArray<'T>) =
let res = new PooledResizeArray<_> (list.Count)

for i = 0 to length list - 1 do
match f list[i] with
| None -> ()
| Some b -> res.Add (b)

res

0 comments on commit 693ea56

Please sign in to comment.