From fe62d3ef82631d26af23860f53e95c8e85b21d92 Mon Sep 17 00:00:00 2001 From: Johan 't Hart Date: Sun, 28 Feb 2021 23:41:48 +0100 Subject: [PATCH 1/3] Add possibility for caller proxy to return an observable When subscribed to it will call a progressive RPC #238 --- .../Integration/RpcProgressTests.cs | 82 ++++++++++++++++++- .../Api/CalleeProxy/CalleeProxyInterceptor.cs | 5 +- .../CalleeProxyInterceptorFactory.cs | 10 ++- .../ObservableCalleeProxyInterceptor.cs | 22 +++++ 4 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/ObservableCalleeProxyInterceptor.cs diff --git a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs index 368555d79..fa207074d 100644 --- a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs +++ b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive.Disposables; +using System.Reactive.Linq; using System.Threading.Tasks; using NUnit.Framework; using WampSharp.Core.Serialization; @@ -64,6 +66,42 @@ public async Task ProgressiveCallsCalleeProxyProgress() Assert.That(result.Result, Is.EqualTo(10)); } + [Test] + public async Task ProgressiveCallsCalleeProxyObservable() + { + WampPlayground playground = new WampPlayground(); + + CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel(); + IWampChannel calleeChannel = dualChannel.CalleeChannel; + IWampChannel callerChannel = dualChannel.CallerChannel; + + MyOperation myOperation = new MyOperation(); + + await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions()); + ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy(); + + IEnumerable results = proxy.LongOp(9, false).ToEnumerable(); // it will emit one more than asked + + CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), results); + } + + [Test] + public async Task ProgressiveCallsCalleeProxyObservableError() + { + WampPlayground playground = new WampPlayground(); + + CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel(); + IWampChannel calleeChannel = dualChannel.CalleeChannel; + IWampChannel callerChannel = dualChannel.CallerChannel; + + MyOperation myOperation = new MyOperation(); + + await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions()); + ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy(); + + Assert.Throws(typeof(WampException), () => proxy.LongOp(9, true).ToEnumerable().Count()); + } + public class MyOperation : IWampRpcOperation { public string Procedure => "com.myapp.longop"; @@ -80,6 +118,8 @@ public IWampCancellableInvocation Invoke(IWampRawRpcOperationRouterCal TMessage number = arguments[0]; int n = formatter.Deserialize(number); + bool endWithError = arguments.Length > 1 && formatter.Deserialize(arguments[1]); + for (int i = 0; i < n; i++) { caller.Result(WampObjectFormatter.Value, @@ -87,9 +127,18 @@ public IWampCancellableInvocation Invoke(IWampRawRpcOperationRouterCal new object[] {i}); } - caller.Result(WampObjectFormatter.Value, - new YieldOptions(), - new object[] {n}); + if (endWithError) + { + caller.Error(WampObjectFormatter.Value, + new Dictionary(), + "Something bad happened"); + } + else + { + caller.Result(WampObjectFormatter.Value, + new YieldOptions(), + new object[] { n }); + } return null; } @@ -122,6 +171,31 @@ public async Task LongOp(int n, IProgress progress) } } + public interface ILongOpObsService + { + [WampProcedure("com.myapp.longop")] + [WampProgressiveResultProcedure] + IObservable LongOp(int n, bool endWithError); + } + + public class LongOpObsService : ILongOpObsService + { + public IObservable LongOp(int n, bool endWithError) => Observable.Create(async obs => + { + for (int i = 0; i < n; i++) + { + obs.OnNext(i); + await Task.Delay(100); + } + if (endWithError) + obs.OnError(new WampException("wamp.error", "Something bad happened")); + else + obs.OnCompleted(); + + return Disposable.Empty; + }); + } + public class MyCallback : IWampRawRpcOperationClientCallback { private readonly TaskCompletionSource mTask = new TaskCompletionSource(); @@ -187,4 +261,4 @@ public void Report(T value) mAction(value); } } -} \ No newline at end of file +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptor.cs b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptor.cs index f5af5f540..dc7ef01e2 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptor.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptor.cs @@ -1,3 +1,4 @@ +using System; using System.Reflection; using WampSharp.V2.Core.Contracts; using WampSharp.V2.Rpc; @@ -37,7 +38,7 @@ public virtual CallOptions GetCallOptions(MethodInfo method) public virtual string GetProcedureUri(MethodInfo method) { - WampProcedureAttribute attribute = + WampProcedureAttribute attribute = method.GetCustomAttribute(); if (attribute == null) @@ -48,4 +49,4 @@ public virtual string GetProcedureUri(MethodInfo method) return attribute.Procedure; } } -} \ No newline at end of file +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs index 6d7a12081..d35699cd8 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs @@ -1,4 +1,5 @@ using System; +using System.Reactive; using System.Reflection; using System.Threading.Tasks; using WampSharp.V2.Rpc; @@ -28,7 +29,12 @@ private static Type GetRelevantInterceptorType(MethodInfo method) Type genericArgument; Type interceptorType; - if (!typeof(Task).IsAssignableFrom(returnType)) + if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IObservable<>)) + { + genericArgument = returnType.GetGenericArguments()[0]; + interceptorType = typeof(ObservableCalleeProxyInterceptor<>); + } + else if (!typeof(Task).IsAssignableFrom(returnType)) { MethodInfoValidation.ValidateSyncMethod(method); @@ -55,4 +61,4 @@ private static Type GetRelevantInterceptorType(MethodInfo method) return closedGenericType; } } -} \ No newline at end of file +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/ObservableCalleeProxyInterceptor.cs b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/ObservableCalleeProxyInterceptor.cs new file mode 100644 index 000000000..701ee15bf --- /dev/null +++ b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/ObservableCalleeProxyInterceptor.cs @@ -0,0 +1,22 @@ +using System.Reactive; +using System.Reactive.Linq; +using System.Reflection; + +namespace WampSharp.V2.CalleeProxy +{ + internal class ObservableCalleeProxyInterceptor : CalleeProxyInterceptorBase + { + public ObservableCalleeProxyInterceptor(MethodInfo method, IWampCalleeProxyInvocationHandler handler, ICalleeProxyInterceptor interceptor) : base(method, handler, interceptor) + { + } + + public override object Invoke(MethodInfo method, object[] arguments) => Observable.Create(async (obs, cancellationToken) => + { + var last = await Handler.InvokeProgressiveAsync + (Interceptor, method, Extractor, arguments, obs.ToProgress(), cancellationToken); + if (last != null) + obs.OnNext(last); + obs.OnCompleted(); + }); + } +} From f1f54e71b6361d5bc10b182f051500ff47d5d5ee Mon Sep 17 00:00:00 2001 From: Johan 't Hart Date: Wed, 3 Mar 2021 11:26:51 +0100 Subject: [PATCH 2/3] Add possibility for callee instance to return an observable #238 --- .../Integration/RpcProgressTests.cs | 77 +++++++++++++++++-- .../WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs | 4 +- .../Callee/Reflection/OperationExtractor.cs | 28 ++++++- ...ressiveObservableMethodInfoRpcOperation.cs | 38 +++++++++ .../V2/Rpc/Callee/SyncLocalRpcOperation.cs | 34 +++++--- 5 files changed, 163 insertions(+), 18 deletions(-) create mode 100644 src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs diff --git a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs index fa207074d..eb590a817 100644 --- a/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs +++ b/src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs @@ -41,6 +41,57 @@ public async Task ProgressiveCallsCallerProgress() Assert.That(callback.Task.Result, Is.EqualTo(10)); } + [Test] + public async Task ProgressiveCallsCallerProgressObservable() + { + WampPlayground playground = new WampPlayground(); + + CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel(); + IWampChannel calleeChannel = dualChannel.CalleeChannel; + IWampChannel callerChannel = dualChannel.CallerChannel; + + var service = new LongOpObsService(); + await calleeChannel.RealmProxy.Services.RegisterCallee(service); + + MyCallback callback = new MyCallback(); + + callerChannel.RealmProxy.RpcCatalog.Invoke + (callback, + new CallOptions() { ReceiveProgress = true }, + "com.myapp.longop", + new object[] { 10, false }); + + Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Called)); + Assert.That(callback.Task.Result, Is.EqualTo(-1)); + CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults); + Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Completed)); + } + + [Test] + public async Task ProgressiveCallsCallerProgressCancelObservable() + { + WampPlayground playground = new WampPlayground(); + + CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel(); + IWampChannel calleeChannel = dualChannel.CalleeChannel; + IWampChannel callerChannel = dualChannel.CallerChannel; + + var service = new LongOpObsService(); + await calleeChannel.RealmProxy.Services.RegisterCallee(service); + + MyCallback callback = new MyCallback(); + + var invocation = callerChannel.RealmProxy.RpcCatalog.Invoke + (callback, + new CallOptions() { ReceiveProgress = true }, + "com.myapp.longop", + new object[] { 10, false }); + + Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Called)); + invocation.Cancel(new CancelOptions()); + Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Cancelled)); + } + [Test] public async Task ProgressiveCallsCalleeProxyProgress() { @@ -180,19 +231,35 @@ public interface ILongOpObsService public class LongOpObsService : ILongOpObsService { - public IObservable LongOp(int n, bool endWithError) => Observable.Create(async obs => + public enum EState + { + Nothing, + Called, + Completed, + Cancelled + } + + public EState State { get; set; } = EState.Nothing; + + public IObservable LongOp(int n, bool endWithError) => Observable.Create(async (obs, ct) => { + State = EState.Called; + ct.Register(() => + { + if (State == EState.Called) + State = EState.Cancelled; + }); for (int i = 0; i < n; i++) { obs.OnNext(i); - await Task.Delay(100); + await Task.Delay(100, ct); + ct.ThrowIfCancellationRequested(); } + State = EState.Completed; if (endWithError) obs.OnError(new WampException("wamp.error", "Something bad happened")); else obs.OnCompleted(); - - return Disposable.Empty; }); } @@ -206,7 +273,7 @@ public class MyCallback : IWampRawRpcOperationClientCallback public void Result(IWampFormatter formatter, ResultDetails details) { - throw new NotImplementedException(); + mTask.SetResult(-1); // -1 indicates no final return value } public void Result(IWampFormatter formatter, ResultDetails details, TMessage[] arguments) diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs index ab55aabb4..4e939b122 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs @@ -77,7 +77,7 @@ protected void CallResult(IWampRawRpcOperationRouterCallback caller, YieldOption { caller.Result(ObjectFormatter, options, arguments, argumentKeywords); } - else if (!this.HasResult) + else if (!this.HasResult || arguments == null) { caller.Result(ObjectFormatter, options); } @@ -93,7 +93,7 @@ protected IEnumerable UnpackParameters(IWampFormatter result = + IEnumerable result = unpacker.UnpackParameters(formatter, arguments, argumentsKeywords); return result; diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs index a6cf41815..7713f2af6 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs @@ -57,7 +57,11 @@ protected IWampRpcOperation CreateRpcMethod(Func instanceProvider, ICall string procedureUri = interceptor.GetProcedureUri(method); - if (!typeof (Task).IsAssignableFrom(method.ReturnType)) + if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(IObservable<>)) + { + return CreateProgressiveObservableOperation(instanceProvider, method, procedureUri); + } + else if (!typeof (Task).IsAssignableFrom(method.ReturnType)) { MethodInfoValidation.ValidateSyncMethod(method); return new SyncMethodInfoRpcOperation(instanceProvider, method, procedureUri); @@ -97,5 +101,25 @@ private static IWampRpcOperation CreateProgressiveOperation(Func instanc return operation; } + + private static IWampRpcOperation CreateProgressiveObservableOperation(Func instanceProvider, MethodInfo method, string procedureUri) + { + //return new ProgressiveObservableMethodInfoRpcOperation + // (instance, method, procedureUri); + + Type returnType = method.ReturnType.GetGenericArguments()[0]; + + Type operationType = + typeof(ProgressiveObservableMethodInfoRpcOperation<>) + .MakeGenericType(returnType); + + IWampRpcOperation operation = + (IWampRpcOperation)Activator.CreateInstance(operationType, + instanceProvider, + method, + procedureUri); + + return operation; + } } -} \ No newline at end of file +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs new file mode 100644 index 000000000..0c62196a9 --- /dev/null +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/ProgressiveObservableMethodInfoRpcOperation.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading; +using WampSharp.Core.Utilities; +using WampSharp.Core.Serialization; +using WampSharp.V2.Core.Contracts; + +namespace WampSharp.V2.Rpc +{ + public class ProgressiveObservableMethodInfoRpcOperation : SyncMethodInfoRpcOperation + { + public ProgressiveObservableMethodInfoRpcOperation(Func instanceProvider, MethodInfo method, string procedureName) : + base(instanceProvider, method, procedureName) + { + } + + protected override IWampCancellableInvocation OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary outputs) + { + var ctSource = new CancellationTokenSource(); + ((IObservable)result).Subscribe( + it => CallResult(caller, it, outputs, new YieldOptions { Progress = true }), + ex => + { + if (ex is WampException wampex) + HandleException(caller, wampex); + else + HandleException(caller, ex); + }, + // An observable does not emit any value when completing, so result without arguments + () => caller.Result(ObjectFormatter, new YieldOptions()), + ctSource.Token + ); + return new CancellationTokenSourceInvocation(ctSource); + } + } +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs index 3cbe45717..893564425 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs @@ -29,30 +29,46 @@ protected override IWampCancellableInvocation InnerInvoke(IWampRawRpcO argumentsKeywords, out IDictionary outputs); - CallResult(caller, result, outputs); + return OnResult(caller, result, outputs); } catch (WampException ex) { - mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure); - IWampErrorCallback callback = new WampRpcErrorCallback(caller); - callback.Error(ex); + HandleException(caller, ex); } catch (Exception ex) { - WampException wampException = ConvertExceptionToRuntimeException(ex); - IWampErrorCallback callback = new WampRpcErrorCallback(caller); - callback.Error(wampException); + HandleException(caller, ex); } return null; } + protected void HandleException(IWampRawRpcOperationRouterCallback caller, WampException ex) + { + mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure); + IWampErrorCallback callback = new WampRpcErrorCallback(caller); + callback.Error(ex); + } + + protected void HandleException(IWampRawRpcOperationRouterCallback caller, Exception ex) + { + WampException wampException = ConvertExceptionToRuntimeException(ex); + IWampErrorCallback callback = new WampRpcErrorCallback(caller); + callback.Error(wampException); + } + + protected virtual IWampCancellableInvocation OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary outputs) + { + CallResult(caller, result, outputs); + return null; + } + protected void CallResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary outputs, YieldOptions yieldOptions = null) { yieldOptions = yieldOptions ?? new YieldOptions(); object[] resultArguments = GetResultArguments(result); - IDictionary argumentKeywords = + IDictionary argumentKeywords = GetResultArgumentKeywords(result, outputs); CallResult(caller, @@ -69,4 +85,4 @@ protected virtual IDictionary GetResultArgumentKeywords(object r protected abstract object InvokeSync (IWampRawRpcOperationRouterCallback caller, IWampFormatter formatter, InvocationDetails details, TMessage[] arguments, IDictionary argumentsKeywords, out IDictionary outputs); } -} \ No newline at end of file +} From 9c5f91440f4e2ab9dc8016983e397d7e0349b43b Mon Sep 17 00:00:00 2001 From: Johan 't Hart Date: Fri, 5 Mar 2021 23:15:04 +0100 Subject: [PATCH 3/3] Make sure Observable method is declared progressive As discussed in the review, this extra declaration is a requirement to make sure that people really want the method to be progressive and not return a IObservable derived type by accident. #238 --- .../CalleeProxyInterceptorFactory.cs | 2 ++ .../Callee/Reflection/MethodInfoValidation.cs | 20 ++++++++++++++++--- .../Callee/Reflection/OperationExtractor.cs | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs index d35699cd8..0e9626671 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs @@ -31,6 +31,8 @@ private static Type GetRelevantInterceptorType(MethodInfo method) if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IObservable<>)) { + MethodInfoValidation.ValidateProgressiveObservableMethod(method); + genericArgument = returnType.GetGenericArguments()[0]; interceptorType = typeof(ObservableCalleeProxyInterceptor<>); } diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/MethodInfoValidation.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/MethodInfoValidation.cs index 2b095aa60..05e39ff9c 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/MethodInfoValidation.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/MethodInfoValidation.cs @@ -32,7 +32,7 @@ public static void ValidateTupleReturnType(MethodInfo method) IList transformNames = attribute.TransformNames; - List tupleNames = + List tupleNames = transformNames.Take(tupleLength).ToList(); ValidateTupleReturnType(method, tupleNames); @@ -47,7 +47,7 @@ private static void ValidateTupleReturnTypeWithOutRefParameters(MethodInfo metho method.GetParameters().Where(x => x.IsOut || x.ParameterType.IsByRef) .Select(x => x.Name); - ICollection intersection = + ICollection intersection = tupleNames.Intersect(outOrRefNames).ToList(); if (intersection.Count > 0) @@ -74,6 +74,14 @@ public static void ValidateSyncMethod(MethodInfo method) ValidateTupleReturnType(method); } + internal static void ValidateProgressiveObservableMethod(MethodInfo method) + { + if (!method.IsDefined(typeof(WampProgressiveResultProcedureAttribute))) + { + ThrowHelper.ObservableMethodNotDeclaredProgressive(method); + } + } + public static void ValidateAsyncMethod(MethodInfo method) { ParameterInfo[] parameters = method.GetParameters(); @@ -162,6 +170,12 @@ public static void ProgressiveParameterTypeMismatch(MethodInfo method, Type retu ($"Method {method.Name} of type {method.DeclaringType.FullName} is declared as a progressive WAMP procedure, but its last (or second to last) parameter is not a IProgress of its return type. Expected: IProgress<{returnType.FullName}>"); } + public static void ObservableMethodNotDeclaredProgressive(MethodInfo method) + { + throw new ArgumentException + ($"Method {method.Name} of type {method.DeclaringType.FullName} is returning an IObservable and therefore is required to be declared as a progressive WAMP procedure, but it is not. Please use the [WampProgressiveResultProcedure] attribute."); + } + public static void ProgressiveParameterTupleMismatch(MethodInfo method) { throw new ArgumentException @@ -199,4 +213,4 @@ public static void CancellationTokenMustBeLastParameter(MethodInfo method) } } } -} \ No newline at end of file +} diff --git a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs index 7713f2af6..5f5f6c7f7 100644 --- a/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs +++ b/src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs @@ -59,6 +59,7 @@ protected IWampRpcOperation CreateRpcMethod(Func instanceProvider, ICall if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(IObservable<>)) { + MethodInfoValidation.ValidateProgressiveObservableMethod(method); return CreateProgressiveObservableOperation(instanceProvider, method, procedureUri); } else if (!typeof (Task).IsAssignableFrom(method.ReturnType))