Skip to content

Commit fe62d3e

Browse files
author
Johan 't Hart
committed
Add possibility for caller proxy to return an observable
When subscribed to it will call a progressive RPC #238
1 parent 82a672e commit fe62d3e

File tree

4 files changed

+111
-8
lines changed

4 files changed

+111
-8
lines changed

src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs

+78-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Reactive.Disposables;
5+
using System.Reactive.Linq;
46
using System.Threading.Tasks;
57
using NUnit.Framework;
68
using WampSharp.Core.Serialization;
@@ -64,6 +66,42 @@ public async Task ProgressiveCallsCalleeProxyProgress()
6466
Assert.That(result.Result, Is.EqualTo(10));
6567
}
6668

69+
[Test]
70+
public async Task ProgressiveCallsCalleeProxyObservable()
71+
{
72+
WampPlayground playground = new WampPlayground();
73+
74+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
75+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
76+
IWampChannel callerChannel = dualChannel.CallerChannel;
77+
78+
MyOperation myOperation = new MyOperation();
79+
80+
await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
81+
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();
82+
83+
IEnumerable<int> results = proxy.LongOp(9, false).ToEnumerable(); // it will emit one more than asked
84+
85+
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), results);
86+
}
87+
88+
[Test]
89+
public async Task ProgressiveCallsCalleeProxyObservableError()
90+
{
91+
WampPlayground playground = new WampPlayground();
92+
93+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
94+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
95+
IWampChannel callerChannel = dualChannel.CallerChannel;
96+
97+
MyOperation myOperation = new MyOperation();
98+
99+
await calleeChannel.RealmProxy.RpcCatalog.Register(myOperation, new RegisterOptions());
100+
ILongOpObsService proxy = callerChannel.RealmProxy.Services.GetCalleeProxy<ILongOpObsService>();
101+
102+
Assert.Throws(typeof(WampException), () => proxy.LongOp(9, true).ToEnumerable().Count());
103+
}
104+
67105
public class MyOperation : IWampRpcOperation
68106
{
69107
public string Procedure => "com.myapp.longop";
@@ -80,16 +118,27 @@ public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCal
80118
TMessage number = arguments[0];
81119
int n = formatter.Deserialize<int>(number);
82120

121+
bool endWithError = arguments.Length > 1 && formatter.Deserialize<bool>(arguments[1]);
122+
83123
for (int i = 0; i < n; i++)
84124
{
85125
caller.Result(WampObjectFormatter.Value,
86126
new YieldOptions {Progress = true},
87127
new object[] {i});
88128
}
89129

90-
caller.Result(WampObjectFormatter.Value,
91-
new YieldOptions(),
92-
new object[] {n});
130+
if (endWithError)
131+
{
132+
caller.Error(WampObjectFormatter.Value,
133+
new Dictionary<string, string>(),
134+
"Something bad happened");
135+
}
136+
else
137+
{
138+
caller.Result(WampObjectFormatter.Value,
139+
new YieldOptions(),
140+
new object[] { n });
141+
}
93142

94143
return null;
95144
}
@@ -122,6 +171,31 @@ public async Task<int> LongOp(int n, IProgress<int> progress)
122171
}
123172
}
124173

174+
public interface ILongOpObsService
175+
{
176+
[WampProcedure("com.myapp.longop")]
177+
[WampProgressiveResultProcedure]
178+
IObservable<int> LongOp(int n, bool endWithError);
179+
}
180+
181+
public class LongOpObsService : ILongOpObsService
182+
{
183+
public IObservable<int> LongOp(int n, bool endWithError) => Observable.Create<int>(async obs =>
184+
{
185+
for (int i = 0; i < n; i++)
186+
{
187+
obs.OnNext(i);
188+
await Task.Delay(100);
189+
}
190+
if (endWithError)
191+
obs.OnError(new WampException("wamp.error", "Something bad happened"));
192+
else
193+
obs.OnCompleted();
194+
195+
return Disposable.Empty;
196+
});
197+
}
198+
125199
public class MyCallback : IWampRawRpcOperationClientCallback
126200
{
127201
private readonly TaskCompletionSource<int> mTask = new TaskCompletionSource<int>();
@@ -187,4 +261,4 @@ public void Report(T value)
187261
mAction(value);
188262
}
189263
}
190-
}
264+
}

src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptor.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Reflection;
23
using WampSharp.V2.Core.Contracts;
34
using WampSharp.V2.Rpc;
@@ -37,7 +38,7 @@ public virtual CallOptions GetCallOptions(MethodInfo method)
3738

3839
public virtual string GetProcedureUri(MethodInfo method)
3940
{
40-
WampProcedureAttribute attribute =
41+
WampProcedureAttribute attribute =
4142
method.GetCustomAttribute<WampProcedureAttribute>();
4243

4344
if (attribute == null)
@@ -48,4 +49,4 @@ public virtual string GetProcedureUri(MethodInfo method)
4849
return attribute.Procedure;
4950
}
5051
}
51-
}
52+
}

src/netstandard/WampSharp/WAMP2/V2/Api/CalleeProxy/CalleeProxyInterceptorFactory.cs

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Reactive;
23
using System.Reflection;
34
using System.Threading.Tasks;
45
using WampSharp.V2.Rpc;
@@ -28,7 +29,12 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
2829
Type genericArgument;
2930
Type interceptorType;
3031

31-
if (!typeof(Task).IsAssignableFrom(returnType))
32+
if (returnType.IsGenericType && returnType.GetGenericTypeDefinition() == typeof(IObservable<>))
33+
{
34+
genericArgument = returnType.GetGenericArguments()[0];
35+
interceptorType = typeof(ObservableCalleeProxyInterceptor<>);
36+
}
37+
else if (!typeof(Task).IsAssignableFrom(returnType))
3238
{
3339
MethodInfoValidation.ValidateSyncMethod(method);
3440

@@ -55,4 +61,4 @@ private static Type GetRelevantInterceptorType(MethodInfo method)
5561
return closedGenericType;
5662
}
5763
}
58-
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System.Reactive;
2+
using System.Reactive.Linq;
3+
using System.Reflection;
4+
5+
namespace WampSharp.V2.CalleeProxy
6+
{
7+
internal class ObservableCalleeProxyInterceptor<T> : CalleeProxyInterceptorBase<T>
8+
{
9+
public ObservableCalleeProxyInterceptor(MethodInfo method, IWampCalleeProxyInvocationHandler handler, ICalleeProxyInterceptor interceptor) : base(method, handler, interceptor)
10+
{
11+
}
12+
13+
public override object Invoke(MethodInfo method, object[] arguments) => Observable.Create<T>(async (obs, cancellationToken) =>
14+
{
15+
var last = await Handler.InvokeProgressiveAsync
16+
(Interceptor, method, Extractor, arguments, obs.ToProgress(), cancellationToken);
17+
if (last != null)
18+
obs.OnNext(last);
19+
obs.OnCompleted();
20+
});
21+
}
22+
}

0 commit comments

Comments
 (0)