Skip to content

Add possibility for caller proxy to return an observable #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
using NUnit.Framework;
using WampSharp.Core.Serialization;
Expand Down Expand Up @@ -39,6 +41,57 @@ public async Task ProgressiveCallsCallerProgress()
Assert.That(callback.Task.Result, Is.EqualTo(10));
}

[Test]
public async Task ProgressiveCallsCallerProgressObservable()
{
WampPlayground playground = new WampPlayground();

CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
IWampChannel calleeChannel = dualChannel.CalleeChannel;
IWampChannel callerChannel = dualChannel.CallerChannel;

var service = new LongOpObsService();
await calleeChannel.RealmProxy.Services.RegisterCallee(service);

MyCallback callback = new MyCallback();

callerChannel.RealmProxy.RpcCatalog.Invoke
(callback,
new CallOptions() { ReceiveProgress = true },
"com.myapp.longop",
new object[] { 10, false });

Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Called));
Assert.That(callback.Task.Result, Is.EqualTo(-1));
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults);
Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Completed));
}

[Test]
public async Task ProgressiveCallsCallerProgressCancelObservable()
{
WampPlayground playground = new WampPlayground();

CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
IWampChannel calleeChannel = dualChannel.CalleeChannel;
IWampChannel callerChannel = dualChannel.CallerChannel;

var service = new LongOpObsService();
await calleeChannel.RealmProxy.Services.RegisterCallee(service);

MyCallback callback = new MyCallback();

var invocation = callerChannel.RealmProxy.RpcCatalog.Invoke
(callback,
new CallOptions() { ReceiveProgress = true },
"com.myapp.longop",
new object[] { 10, false });

Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Called));
invocation.Cancel(new CancelOptions());
Assert.That(service.State, Is.EqualTo(LongOpObsService.EState.Cancelled));
}

[Test]
public async Task ProgressiveCallsCalleeProxyProgress()
{
Expand All @@ -64,6 +117,42 @@ public async Task ProgressiveCallsCalleeProxyProgress()
Assert.That(result.Result, Is.EqualTo(10));
}

[Test]
public async Task ProgressiveCallsCalleeProxyObservable()
{
WampPlayground playground = new WampPlayground();

CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
IWampChannel calleeChannel = dualChannel.CalleeChannel;
IWampChannel callerChannel = dualChannel.CallerChannel;

MyOperation myOperation = new MyOperation();

await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();

IEnumerable<int> results = proxy.LongOp(9, false).ToEnumerable(); // it will emit one more than asked

CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), results);
}

[Test]
public async Task ProgressiveCallsCalleeProxyObservableError()
{
WampPlayground playground = new WampPlayground();

CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
IWampChannel calleeChannel = dualChannel.CalleeChannel;
IWampChannel callerChannel = dualChannel.CallerChannel;

MyOperation myOperation = new MyOperation();

await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();

Assert.Throws(typeof(WampException), () => proxy.LongOp(9, true).ToEnumerable().Count());
}

public class MyOperation : IWampRpcOperation
{
public string Procedure => "com.myapp.longop";
Expand All @@ -80,16 +169,27 @@ public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCal
TMessage number = arguments[0];
int n = formatter.Deserialize<int>(number);

bool endWithError = arguments.Length > 1 && formatter.Deserialize<bool>(arguments[1]);

for (int i = 0; i < n; i++)
{
caller.Result(WampObjectFormatter.Value,
new YieldOptions {Progress = true},
new object[] {i});
}

caller.Result(WampObjectFormatter.Value,
new YieldOptions(),
new object[] {n});
if (endWithError)
{
caller.Error(WampObjectFormatter.Value,
new Dictionary<string, string>(),
"Something bad happened");
}
else
{
caller.Result(WampObjectFormatter.Value,
new YieldOptions(),
new object[] { n });
}

return null;
}
Expand Down Expand Up @@ -122,6 +222,47 @@ public async Task<int> LongOp(int n, IProgress<int> progress)
}
}

public interface ILongOpObsService
{
[WampProcedure("com.myapp.longop")]
[WampProgressiveResultProcedure]
IObservable<int> LongOp(int n, bool endWithError);
}

public class LongOpObsService : ILongOpObsService
{
public enum EState
{
Nothing,
Called,
Completed,
Cancelled
}

public EState State { get; set; } = EState.Nothing;

public IObservable<int> LongOp(int n, bool endWithError) => Observable.Create<int>(async (obs, ct) =>
{
State = EState.Called;
ct.Register(() =>
{
if (State == EState.Called)
State = EState.Cancelled;
});
for (int i = 0; i < n; i++)
{
obs.OnNext(i);
await Task.Delay(100, ct);
ct.ThrowIfCancellationRequested();
}
State = EState.Completed;
if (endWithError)
obs.OnError(new WampException("wamp.error", "Something bad happened"));
else
obs.OnCompleted();
});
}

public class MyCallback : IWampRawRpcOperationClientCallback
{
private readonly TaskCompletionSource<int> mTask = new TaskCompletionSource<int>();
Expand All @@ -132,7 +273,7 @@ public class MyCallback : IWampRawRpcOperationClientCallback

public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details)
{
throw new NotImplementedException();
mTask.SetResult(-1); // -1 indicates no final return value
}

public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details, TMessage[] arguments)
Expand Down Expand Up @@ -187,4 +328,4 @@ public void Report(T value)
mAction(value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Reflection;
using WampSharp.V2.Core.Contracts;
using WampSharp.V2.Rpc;
Expand Down Expand Up @@ -37,7 +38,7 @@ public virtual CallOptions GetCallOptions(MethodInfo method)

public virtual string GetProcedureUri(MethodInfo method)
{
WampProcedureAttribute attribute =
WampProcedureAttribute attribute =
method.GetCustomAttribute<WampProcedureAttribute>();

if (attribute == null)
Expand All @@ -48,4 +49,4 @@ public virtual string GetProcedureUri(MethodInfo method)
return attribute.Procedure;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive;
using System.Reflection;
using System.Threading.Tasks;
using WampSharp.V2.Rpc;
Expand Down Expand Up @@ -28,7 +29,14 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
Type genericArgument;
Type interceptorType;

if (!typeof(Task).IsAssignableFrom(returnType))
if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IObservable<>))
{
MethodInfoValidation.ValidateProgressiveObservableMethod(method);

genericArgument = returnType.GetGenericArguments()[0];
interceptorType = typeof(ObservableCalleeProxyInterceptor<>);
}
else if (!typeof(Task).IsAssignableFrom(returnType))
{
MethodInfoValidation.ValidateSyncMethod(method);

Expand All @@ -55,4 +63,4 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
return closedGenericType;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Reactive;
using System.Reactive.Linq;
using System.Reflection;

namespace WampSharp.V2.CalleeProxy
{
internal class ObservableCalleeProxyInterceptor<T> : CalleeProxyInterceptorBase<T>
{
public ObservableCalleeProxyInterceptor(MethodInfo method, IWampCalleeProxyInvocationHandler handler, ICalleeProxyInterceptor interceptor) : base(method, handler, interceptor)
{
}

public override object Invoke(MethodInfo method, object[] arguments) => Observable.Create<T>(async (obs, cancellationToken) =>
{
var last = await Handler.InvokeProgressiveAsync
(Interceptor, method, Extractor, arguments, obs.ToProgress(), cancellationToken);
if (last != null)
obs.OnNext(last);
obs.OnCompleted();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void CallResult(IWampRawRpcOperationRouterCallback caller, YieldOption
{
caller.Result(ObjectFormatter, options, arguments, argumentKeywords);
}
else if (!this.HasResult)
else if (!this.HasResult || arguments == null)
{
caller.Result(ObjectFormatter, options);
}
Expand All @@ -93,7 +93,7 @@ protected IEnumerable<object> UnpackParameters<TMessage>(IWampFormatter<TMessage
{
ArgumentUnpacker unpacker = new ArgumentUnpacker(Parameters);

IEnumerable<object> result =
IEnumerable<object> result =
unpacker.UnpackParameters(formatter, arguments, argumentsKeywords);

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void ValidateTupleReturnType(MethodInfo method)

IList<string> transformNames = attribute.TransformNames;

List<string> tupleNames =
List<string> tupleNames =
transformNames.Take(tupleLength).ToList();

ValidateTupleReturnType(method, tupleNames);
Expand All @@ -47,7 +47,7 @@ private static void ValidateTupleReturnTypeWithOutRefParameters(MethodInfo metho
method.GetParameters().Where(x => x.IsOut || x.ParameterType.IsByRef)
.Select(x => x.Name);

ICollection<string> intersection =
ICollection<string> intersection =
tupleNames.Intersect(outOrRefNames).ToList();

if (intersection.Count > 0)
Expand All @@ -74,6 +74,14 @@ public static void ValidateSyncMethod(MethodInfo method)
ValidateTupleReturnType(method);
}

internal static void ValidateProgressiveObservableMethod(MethodInfo method)
{
if (!method.IsDefined(typeof(WampProgressiveResultProcedureAttribute)))
{
ThrowHelper.ObservableMethodNotDeclaredProgressive(method);
}
}

public static void ValidateAsyncMethod(MethodInfo method)
{
ParameterInfo[] parameters = method.GetParameters();
Expand Down Expand Up @@ -162,6 +170,12 @@ public static void ProgressiveParameterTypeMismatch(MethodInfo method, Type retu
($"Method {method.Name} of type {method.DeclaringType.FullName} is declared as a progressive WAMP procedure, but its last (or second to last) parameter is not a IProgress of its return type. Expected: IProgress<{returnType.FullName}>");
}

public static void ObservableMethodNotDeclaredProgressive(MethodInfo method)
{
throw new ArgumentException
($"Method {method.Name} of type {method.DeclaringType.FullName} is returning an IObservable and therefore is required to be declared as a progressive WAMP procedure, but it is not. Please use the [WampProgressiveResultProcedure] attribute.");
}

public static void ProgressiveParameterTupleMismatch(MethodInfo method)
{
throw new ArgumentException
Expand Down Expand Up @@ -199,4 +213,4 @@ public static void CancellationTokenMustBeLastParameter(MethodInfo method)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ protected IWampRpcOperation CreateRpcMethod(Func<object> instanceProvider, ICall
string procedureUri =
interceptor.GetProcedureUri(method);

if (!typeof (Task).IsAssignableFrom(method.ReturnType))
if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(IObservable<>))
{
MethodInfoValidation.ValidateProgressiveObservableMethod(method);
return CreateProgressiveObservableOperation(instanceProvider, method, procedureUri);
}
else if (!typeof (Task).IsAssignableFrom(method.ReturnType))
{
MethodInfoValidation.ValidateSyncMethod(method);
return new SyncMethodInfoRpcOperation(instanceProvider, method, procedureUri);
Expand Down Expand Up @@ -97,5 +102,25 @@ private static IWampRpcOperation CreateProgressiveOperation(Func<object> instanc

return operation;
}

private static IWampRpcOperation CreateProgressiveObservableOperation(Func<object> instanceProvider, MethodInfo method, string procedureUri)
{
//return new ProgressiveObservableMethodInfoRpcOperation<returnType>
// (instance, method, procedureUri);

Type returnType = method.ReturnType.GetGenericArguments()[0];

Type operationType =
typeof(ProgressiveObservableMethodInfoRpcOperation<>)
.MakeGenericType(returnType);

IWampRpcOperation operation =
(IWampRpcOperation)Activator.CreateInstance(operationType,
instanceProvider,
method,
procedureUri);

return operation;
}
}
}
}
Loading