diff --git a/src/FSharp.Data.GraphQL.Server.AspNetCore/Giraffe/HttpHandlers.fs b/src/FSharp.Data.GraphQL.Server.AspNetCore/Giraffe/HttpHandlers.fs index 6434951af..c1527893d 100644 --- a/src/FSharp.Data.GraphQL.Server.AspNetCore/Giraffe/HttpHandlers.fs +++ b/src/FSharp.Data.GraphQL.Server.AspNetCore/Giraffe/HttpHandlers.fs @@ -1,6 +1,7 @@ namespace FSharp.Data.GraphQL.Server.AspNetCore.Giraffe open System +open System.Buffers open System.IO open System.Text.Json open System.Text.Json.Serialization @@ -154,10 +155,13 @@ module HttpHandlers = else request.EnableBuffering() let body = request.Body - let buffer = Array.zeroCreate 1 - let! bytesRead = body.ReadAsync(buffer, 0, 1) - body.Seek(0, SeekOrigin.Begin) |> ignore - return bytesRead > 0 + let buffer = ArrayPool.Shared.Rent 1 + try + let! bytesRead = body.ReadAsync(buffer, 0, 1) + body.Seek(0, SeekOrigin.Begin) |> ignore + return bytesRead > 0 + finally + ArrayPool.Shared.Return buffer } /// Check if the request is an introspection query diff --git a/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs b/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs index 7e225cf6a..8e130f075 100644 --- a/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs +++ b/src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLWebsocketMiddleware.fs @@ -82,7 +82,7 @@ type GraphQLWebSocketMiddleware<'Root> let receiveMessageViaSocket (cancellationToken : CancellationToken) (serializerOptions : JsonSerializerOptions) (socket : WebSocket) = taskResult { let buffer = ArrayPool.Shared.Rent options.ReadBufferSize try - let completeMessage = new PooledList () + use completeMessage = new PooledResizeArray () let mutable segmentResponse : WebSocketReceiveResult = null while (not cancellationToken.IsCancellationRequested) && socket |> isSocketOpen diff --git a/src/FSharp.Data.GraphQL.Server/Execution.fs b/src/FSharp.Data.GraphQL.Server/Execution.fs index ea4053c81..cb6b6ee30 100644 --- a/src/FSharp.Data.GraphQL.Server/Execution.fs +++ b/src/FSharp.Data.GraphQL.Server/Execution.fs @@ -254,13 +254,13 @@ let private resolveField (execute: ExecuteField) (ctx: ResolveFieldContext) (par |> AsyncVal.map(fun v -> if isNull v then None else Some v) -type ResolverResult<'T> = Result<'T * IObservable option * GQLProblemDetails list, GQLProblemDetails list> +type ResolverResult<'T> = Result<'T * IObservable voption * GQLProblemDetails list, GQLProblemDetails list> [] module ResolverResult = - let data data = Ok (data, None, []) - let defered data deferred = Ok (data, Some deferred, []) + let data data = Ok (data, ValueNone, []) + let defered data deferred = Ok (data, ValueSome deferred, []) let mapValue (f : 'T -> 'U) (r : ResolverResult<'T>) : ResolverResult<'U> = Result.map(fun (data, deferred, errs) -> (f data, deferred, errs)) r @@ -296,11 +296,11 @@ let deferResults path (res : ResolverResult) : IObservable DeferredResult (data, formattedPath) | _ -> DeferredErrors (data, errs, formattedPath) |> Observable.singleton - Option.foldBack Observable.concat deferred deferredData + ValueOption.foldBack Observable.concat deferred deferredData | Error errs -> Observable.singleton <| DeferredErrors (null, errs, formattedPath) /// Collect together an array of results using the appropriate execution strategy. -let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal>> []) : AsyncVal []>> = asyncVal { +let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal>> seq) : AsyncVal []>> = asyncVal { let! collected = match strategy with | Parallel -> AsyncVal.collectParallel rs @@ -312,12 +312,12 @@ let collectFields (strategy : ExecutionStrategy) (rs : AsyncVal Array.set data i field - Ok(i - 1, Option.mergeWith Observable.merge deferred d, e @ errs) + Ok(i - 1, ValueOption.mergeWith Observable.merge deferred d, e @ errs) | Error e, Ok (_, _, errs) -> Error (e @ errs) | Ok (_, _, e), Error errs -> Error (e @ errs) | Error e, Error errs -> Error (e @ errs) return - Array.foldBack merge collected (Ok (data.Length - 1, None, [])) + Array.foldBack merge collected (Ok (data.Length - 1, ValueNone, [])) |> ResolverResult.mapValue(fun _ -> data) } @@ -354,8 +354,7 @@ let rec private direct (returnDef : OutputDef) (ctx : ResolveFieldContext) (path | :? System.Collections.IEnumerable as enumerable -> enumerable |> Seq.cast - |> Seq.toArray - |> Array.mapi resolveItem + |> Seq.mapi resolveItem |> collectFields Parallel |> AsyncVal.map(ResolverResult.mapValue(fun items -> KeyValuePair(name, items |> Array.map(fun d -> d.Value) |> box))) | _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue ctx.ExecutionInfo.Identifier (value.GetType())) @@ -363,7 +362,7 @@ let rec private direct (returnDef : OutputDef) (ctx : ResolveFieldContext) (path | Nullable (Output innerDef) -> let innerCtx = { ctx with ExecutionInfo = { ctx.ExecutionInfo with IsNullable = true; ReturnDef = innerDef } } executeResolvers innerCtx path parent (toOption value |> AsyncVal.wrap) - |> AsyncVal.map(Result.valueOr (fun errs -> (KeyValuePair(name, null), None, errs)) >> Ok) + |> AsyncVal.map(Result.valueOr (fun errs -> (KeyValuePair(name, null), ValueNone, errs)) >> Ok) | Interface iDef -> let possibleTypesFn = ctx.Schema.GetPossibleTypes @@ -398,7 +397,7 @@ and deferred (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (valu executeResolvers ctx path parent (toOption value |> AsyncVal.wrap) |> Observable.ofAsyncVal |> Observable.bind(ResolverResult.mapValue(fun d -> d.Value) >> deferResults path) - ResolverResult.defered (KeyValuePair (info.Identifier, null)) deferred |> AsyncVal.wrap + ResolverResult.defered (KeyValuePair (name, null)) deferred |> AsyncVal.wrap and private streamed (options : BufferedStreamOptions) (innerDef : OutputDef) (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (value : obj) = let info = ctx.ExecutionInfo @@ -420,9 +419,9 @@ and private streamed (options : BufferedStreamOptions) (innerDef : OutputDef) (c match r with | Ok (item, d, e) -> Array.set data i item.Value - (i - 1, box index :: indicies, Option.mergeWith Observable.merge deferred d, e @ errs) + (i - 1, box index :: indicies, ValueOption.mergeWith Observable.merge deferred d, e @ errs) | Error e -> (i - 1, box index :: indicies, deferred, e @ errs) - let (_, indicies, deferred, errs) = List.foldBack merge chunk (chunk.Length - 1, [], None, []) + let (_, indicies, deferred, errs) = List.foldBack merge chunk (chunk.Length - 1, [], ValueNone, []) deferResults (box indicies :: path) (Ok (box data, deferred, errs)) let buffer (items : IObservable>>) : IObservable = @@ -449,8 +448,8 @@ and private streamed (options : BufferedStreamOptions) (innerDef : OutputDef) (c |> Array.mapi resolveItem |> Observable.ofAsyncValSeq |> buffer - ResolverResult.defered (KeyValuePair (info.Identifier, box [])) stream |> AsyncVal.wrap - | _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue ctx.ExecutionInfo.Identifier (value.GetType())) + ResolverResult.defered (KeyValuePair (name, box [])) stream |> AsyncVal.wrap + | _ -> raise <| GQLMessageException (ErrorMessages.expectedEnumerableValue name (value.GetType())) and private live (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (value : obj) = let info = ctx.ExecutionInfo @@ -485,7 +484,7 @@ and private live (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) ( executeResolvers ctx path parent (value |> Some |> AsyncVal.wrap) // TODO: Add tests for `Observable.merge deferred updates` correct order - |> AsyncVal.map(Result.map(fun (data, deferred, errs) -> (data, Some <| Option.foldBack Observable.merge deferred updates, errs))) + |> AsyncVal.map(Result.map(fun (data, deferred, errs) -> (data, ValueSome <| ValueOption.foldBack Observable.merge deferred updates, errs))) /// Actually execute the resolvers. and private executeResolvers (ctx : ResolveFieldContext) (path : FieldPath) (parent : obj) (value : AsyncVal) : AsyncVal>> = @@ -505,8 +504,8 @@ and private executeResolvers (ctx : ResolveFieldContext) (path : FieldPath) (par let resolveWith (ctx : ResolveFieldContext) (onSuccess : ResolveFieldContext -> FieldPath -> obj -> obj -> AsyncVal>>) : AsyncVal>> = asyncVal { let! resolved = value |> AsyncVal.rescue path ctx.Schema.ParseError match resolved with - | Error errs when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), None, errs) - | Ok None when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), None, []) + | Error errs when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), ValueNone, errs) + | Ok None when ctx.ExecutionInfo.IsNullable -> return Ok (KeyValuePair(name, null), ValueNone, []) | Error errs -> return Error errs | Ok None -> return Error (nullResolverError name path ctx) | Ok (Some v) -> return! onSuccess ctx path parent v @@ -543,7 +542,6 @@ and executeObjectFields (fields : ExecutionInfo list) (objName : string) (objDef let! res = fields |> Seq.map executeField - |> Seq.toArray |> collectFields Parallel match res with | Error errs -> return Error errs @@ -604,16 +602,16 @@ let private executeQueryOrMutation (resultSet: (string * ExecutionInfo) []) (ctx | Ok (Error errs) | Error errs -> Error errs match result with - | Error errs when info.IsNullable -> return Ok (KeyValuePair(name, null), None, errs) + | Error errs when info.IsNullable -> return Ok (KeyValuePair(name, null), ValueNone, errs) | Error errs -> return Error errs | Ok r -> return Ok r } asyncVal { let documentId = ctx.ExecutionPlan.DocumentId - match! resultSet |> Array.map executeRootOperation |> collectFields ctx.ExecutionPlan.Strategy with - | Ok (data, Some deferred, errs) -> return GQLExecutionResult.Deferred(documentId, NameValueLookup(data), errs, deferred, ctx.Metadata) - | Ok (data, None, errs) -> return GQLExecutionResult.Direct(documentId, NameValueLookup(data), errs, ctx.Metadata) + match! resultSet |> Seq.map executeRootOperation |> collectFields ctx.ExecutionPlan.Strategy with + | Ok (data, ValueSome deferred, errs) -> return GQLExecutionResult.Deferred(documentId, NameValueLookup(data), errs, deferred, ctx.Metadata) + | Ok (data, ValueNone, errs) -> return GQLExecutionResult.Direct(documentId, NameValueLookup(data), errs, ctx.Metadata) | Error errs -> return GQLExecutionResult.RequestError(documentId, errs, ctx.Metadata) } @@ -635,9 +633,9 @@ let private executeSubscription (resultSet: (string * ExecutionInfo) []) (ctx: E Path = fieldPath |> List.rev } let onValue v = asyncVal { match! executeResolvers fieldCtx fieldPath value (toOption v |> AsyncVal.wrap) with - | Ok (data, None, []) -> return SubscriptionResult (NameValueLookup.ofList [nameOrAlias, data.Value]) - | Ok (data, None, errs) -> return SubscriptionErrors (NameValueLookup.ofList [nameOrAlias, data.Value], errs) - | Ok (_, Some _, _) -> return failwith "Deferred/Streamed/Live are not supported for subscriptions!" + | Ok (data, ValueNone, []) -> return SubscriptionResult (NameValueLookup.ofList [nameOrAlias, data.Value]) + | Ok (data, ValueNone, errs) -> return SubscriptionErrors (NameValueLookup.ofList [nameOrAlias, data.Value], errs) + | Ok (_, ValueSome _, _) -> return failwith "Deferred/Streamed/Live are not supported for subscriptions!" | Error errs -> return SubscriptionErrors (null, errs) } return diff --git a/src/FSharp.Data.GraphQL.Server/FSharp.Data.GraphQL.Server.fsproj b/src/FSharp.Data.GraphQL.Server/FSharp.Data.GraphQL.Server.fsproj index cca4e1812..ddd43933f 100644 --- a/src/FSharp.Data.GraphQL.Server/FSharp.Data.GraphQL.Server.fsproj +++ b/src/FSharp.Data.GraphQL.Server/FSharp.Data.GraphQL.Server.fsproj @@ -23,6 +23,7 @@ + diff --git a/src/FSharp.Data.GraphQL.Shared/AsyncVal.fs b/src/FSharp.Data.GraphQL.Shared/AsyncVal.fs index 80b29fe05..ec8c4cfef 100644 --- a/src/FSharp.Data.GraphQL.Shared/AsyncVal.fs +++ b/src/FSharp.Data.GraphQL.Shared/AsyncVal.fs @@ -130,13 +130,15 @@ module AsyncVal = /// executed asynchronously, one by one with regard to their order in array. /// Returned array maintain order of values. /// If the array contains a Failure, then the entire array will not resolve - let collectSequential (values : AsyncVal<'T>[]) : AsyncVal<'T[]> = - if values.Length = 0 then Value [||] - elif values |> Array.exists isAsync then + let collectSequential (values : AsyncVal<'T> seq) : AsyncVal<'T[]> = + let values = new PooledResizeArray<_> (values) + let length = values.Count + if length = 0 then Value [||] + elif values.Exists isAsync then Async (async { - let results = Array.zeroCreate values.Length - let exceptions = ResizeArray values.Length - for i = 0 to values.Length - 1 do + let results = Array.zeroCreate length + use exceptions = new PooledResizeArray<_> (length) + for i = 0 to length - 1 do let v = values.[i] match v with | Value v -> results.[i] <- v @@ -144,35 +146,39 @@ module AsyncVal = let! r = a results.[i] <- r | Failure f -> exceptions.Add f + values.Dispose() match exceptions.Count with | 0 -> return results | 1 -> return exceptions.First().Reraise () - | _ -> return AggregateException exceptions |> raise + | _ -> return AggregateException (exceptions.AsReadOnly()) |> raise }) else - let exceptions = + use values = values + use exceptions = values - |> Array.choose (function - | Failure f -> Some f - | _ -> None) - match exceptions.Length with - | 0 -> Value (values |> Array.map (fun (Value v) -> v)) + |> PooledResizeArray.vChoose (function + | Failure f -> ValueSome f + | _ -> ValueNone) + match exceptions.Count with + | 0 -> Value (values |> Seq.map (fun (Value v) -> v) |> Seq.toArray) | 1 -> Failure (exceptions.First ()) - | _ -> Failure (AggregateException exceptions) + | _ -> Failure (AggregateException (exceptions.AsReadOnly())) /// Converts array of AsyncVals into AsyncVal with array results. /// In case when are non-immediate values in provided array, they are /// executed all in parallel, in unordered fashion. Order of values /// inside returned array is maintained. /// If the array contains a Failure, then the entire array will not resolve - let collectParallel (values : AsyncVal<'T>[]) : AsyncVal<'T[]> = - if values.Length = 0 then Value [||] + let collectParallel (values : AsyncVal<'T> seq) : AsyncVal<'T[]> = + use values = new PooledResizeArray<_> (values) + let length = values.Count + if length = 0 then Value [||] else - let indexes = List<_> (0) - let continuations = List<_> (0) - let results = Array.zeroCreate values.Length - let exceptions = ResizeArray values.Length - for i = 0 to values.Length - 1 do + let indexes = new PooledResizeArray<_> (length) + let continuations = new PooledResizeArray<_> (length) + let results = Array.zeroCreate length + use exceptions = new PooledResizeArray<_> (length) + for i = 0 to length - 1 do let value = values.[i] match value with | Value v -> results.[i] <- v @@ -182,13 +188,15 @@ module AsyncVal = | Failure f -> exceptions.Add f match exceptions.Count with | 1 -> AsyncVal.Failure (exceptions.First ()) - | count when count > 1 -> AsyncVal.Failure (AggregateException exceptions) + | count when count > 1 -> AsyncVal.Failure (AggregateException (exceptions.AsReadOnly())) | _ -> if indexes.Count = 0 then Value (results) else Async (async { let! vals = continuations |> Async.Parallel for i = 0 to indexes.Count - 1 do results.[indexes.[i]] <- vals.[i] + indexes.Dispose() + continuations.Dispose() return results }) diff --git a/src/FSharp.Data.GraphQL.Shared/FSharp.Data.GraphQL.Shared.fsproj b/src/FSharp.Data.GraphQL.Shared/FSharp.Data.GraphQL.Shared.fsproj index a201c4d9a..1d3614a3b 100644 --- a/src/FSharp.Data.GraphQL.Shared/FSharp.Data.GraphQL.Shared.fsproj +++ b/src/FSharp.Data.GraphQL.Shared/FSharp.Data.GraphQL.Shared.fsproj @@ -17,6 +17,9 @@ <_Parameter1>FSharp.Data.GraphQL.Server + + <_Parameter1>FSharp.Data.GraphQL.Server.AspNetCore + <_Parameter1>FSharp.Data.GraphQL.Client @@ -29,6 +32,7 @@ + @@ -44,6 +48,7 @@ + diff --git a/src/FSharp.Data.GraphQL.Shared/Helpers/Extensions.fs b/src/FSharp.Data.GraphQL.Shared/Helpers/Extensions.fs index 603ba60fc..ec72a0ad6 100644 --- a/src/FSharp.Data.GraphQL.Shared/Helpers/Extensions.fs +++ b/src/FSharp.Data.GraphQL.Shared/Helpers/Extensions.fs @@ -39,20 +39,6 @@ type TypeInfo with x.GetDeclaredMethod(first + propertyName.Substring(1)) | prop, _ -> prop -module Option = - - let mergeWith (f: 'T -> 'T -> 'T) (o1 : 'T option) (o2 : 'T option) : 'T option = - match (o1, o2) with - | Some a, Some b -> Some (f a b) - | Some a, _ -> Some a - | _, Some b -> Some b - | _, _ -> None - - let unwrap (defaultValue : 'U) (onSome : 'T -> 'U) (o : 'T option) : 'U = - match o with - | Some t -> onSome t - | None -> defaultValue - module Skippable = let ofList list = diff --git a/src/FSharp.Data.GraphQL.Shared/Helpers/ObjAndStructConversions.fs b/src/FSharp.Data.GraphQL.Shared/Helpers/ObjAndStructConversions.fs index 7614aa9a8..e23376565 100644 --- a/src/FSharp.Data.GraphQL.Shared/Helpers/ObjAndStructConversions.fs +++ b/src/FSharp.Data.GraphQL.Shared/Helpers/ObjAndStructConversions.fs @@ -9,6 +9,18 @@ module internal ValueOption = let ofOption value = Option.toVOption value + let mergeWith (f: 'T -> 'T -> 'T) (o1 : 'T voption) (o2 : 'T voption) : 'T voption = + match (o1, o2) with + | ValueSome a, ValueSome b -> ValueSome (f a b) + | ValueSome a, _ -> ValueSome a + | _, ValueSome b -> ValueSome b + | _, _ -> ValueNone + + let unwrap (defaultValue : 'U) (onSome : 'T -> 'U) (o : 'T voption) : 'U = + match o with + | ValueSome t -> onSome t + | ValueNone -> defaultValue + module internal Option = let toVOption voption = @@ -18,6 +30,18 @@ module internal Option = let ofVOption voption = voption |> ValueOption.toOption + let mergeWith (f: 'T -> 'T -> 'T) (o1 : 'T option) (o2 : 'T option) : 'T option = + match (o1, o2) with + | Some a, Some b -> Some (f a b) + | Some a, _ -> Some a + | _, Some b -> Some b + | _, _ -> None + + let unwrap (defaultValue : 'U) (onSome : 'T -> 'U) (o : 'T option) : 'U = + match o with + | Some t -> onSome t + | None -> defaultValue + [] module internal ValueTuple = diff --git a/src/FSharp.Data.GraphQL.Shared/Helpers/PooledResizeArray.fs b/src/FSharp.Data.GraphQL.Shared/Helpers/PooledResizeArray.fs new file mode 100644 index 000000000..a8a6271c0 --- /dev/null +++ b/src/FSharp.Data.GraphQL.Shared/Helpers/PooledResizeArray.fs @@ -0,0 +1,36 @@ +namespace FSharp.Data.GraphQL + +open Collections.Pooled + +type internal PooledResizeArray<'T> = PooledList<'T> + +module internal PooledResizeArray = + + let ofSeqWithCapacity capacity items = + let list = new PooledResizeArray<'T> (capacity = capacity) + list.AddRange (collection = items) + list + + let exists f (list : PooledResizeArray<'T>) = list.Exists (f) + + let length (list : PooledResizeArray<'T>) = list.Count + + let vChoose f (list : PooledResizeArray<'T>) = + let res = new PooledResizeArray<_> (list.Count) + + for i = 0 to length list - 1 do + match f list[i] with + | ValueNone -> () + | ValueSome b -> res.Add (b) + + res + + let choose f (list : PooledResizeArray<'T>) = + let res = new PooledResizeArray<_> (list.Count) + + for i = 0 to length list - 1 do + match f list[i] with + | None -> () + | Some b -> res.Add (b) + + res diff --git a/src/FSharp.Data.GraphQL.Shared/TypeSystem.fs b/src/FSharp.Data.GraphQL.Shared/TypeSystem.fs index e77d2291e..7bd13e96e 100644 --- a/src/FSharp.Data.GraphQL.Shared/TypeSystem.fs +++ b/src/FSharp.Data.GraphQL.Shared/TypeSystem.fs @@ -747,7 +747,8 @@ and ExecutionInfoKind = /// Reduce the current field as a live query. | ResolveLive of ExecutionInfo -/// Buffered stream options. Used to specify how the buffer will behavior in a stream. +// TODO: Migrate to voption +/// Buffered stream options. Used to specify how the buffer will behave in a stream. and BufferedStreamOptions = { /// The maximum time in milliseconds that the buffer will be filled before being sent to the subscriber. Interval : int option