Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .NET client for dynamic pubsub subscriptions #1346

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
86a6c1c
First pass implementing standalone pubsub client for Dapr (focusing o…
WhitWaldo Sep 4, 2024
849a828
Added missing copyright notice
WhitWaldo Sep 4, 2024
2d73e69
Fleshing out comments
WhitWaldo Sep 4, 2024
bed6e4d
Added Dapr.Protos project to solution as a central place to put the D…
WhitWaldo Sep 4, 2024
bdc5a03
Added Nuget properties to project
WhitWaldo Sep 4, 2024
1f43315
Restored missing using
WhitWaldo Sep 4, 2024
0bab52c
Updated to use the standard Dapr client builder, added DI registratio…
WhitWaldo Sep 4, 2024
90fe7b0
Minor tweaks to conform to solution style
WhitWaldo Sep 4, 2024
5e751db
Merge branch 'master' into stream-sub
WhitWaldo Sep 4, 2024
6d5ed77
Tweak to README to include additional package names
WhitWaldo Sep 4, 2024
32ce9be
Added test project for Dapr.Messaging
WhitWaldo Sep 4, 2024
0852650
Added missing copyright statement
WhitWaldo Sep 15, 2024
0cb3815
Updated naming in file to match solution convention. Fleshed out miss…
WhitWaldo Sep 15, 2024
7edb9c5
Conforming to naming conventions
WhitWaldo Sep 15, 2024
e3491b4
Merge branch 'master' into stream-sub
WhitWaldo Sep 15, 2024
219d458
Eliminated need for locks by using ConcurrentDictionary instead
WhitWaldo Sep 15, 2024
1847ca9
Simplified registration extensions to minimize repeated implementations
WhitWaldo Sep 15, 2024
848c490
Added public key to InternalsVisibleTo annotation
WhitWaldo Sep 16, 2024
b6810b0
Significantly simplified dynamic pubsub implementation
WhitWaldo Sep 16, 2024
04fa4d5
Handling cancellation token timeout with configured action
WhitWaldo Sep 16, 2024
d165b14
Updated name of method so it wouldn't be as confusing why the develop…
WhitWaldo Sep 16, 2024
5c3e1ce
Added another message handler to facilitate channel draining in case …
WhitWaldo Sep 16, 2024
85ef0f8
Handling potential race condition, corrected return type to be Publis…
WhitWaldo Sep 16, 2024
218f604
No longer limiting a single subscriber for each component/topic combi…
WhitWaldo Sep 16, 2024
a7c4d5d
Eliminated restriction for one subscription per component/topic. Adde…
WhitWaldo Sep 16, 2024
152c14e
Updated to latest protos
WhitWaldo Sep 16, 2024
3af961a
Updates to implementation to reflect latest protos
WhitWaldo Sep 16, 2024
6e17cd3
Removed unused method
WhitWaldo Sep 16, 2024
562298b
Removed unused usings
WhitWaldo Sep 16, 2024
448c89b
Added the only unit test I could identify that is within capabilities…
WhitWaldo Sep 16, 2024
e18e49c
Fixing accessibility problem
WhitWaldo Sep 16, 2024
033a27a
Added example demonstrating the streaming subscription functionality
WhitWaldo Sep 16, 2024
d8969d0
Retargeting to .NET 6 instead of .NET 8
WhitWaldo Sep 16, 2024
a9f645c
Added sample deserialization - could be improved with client support
WhitWaldo Sep 16, 2024
d0efaed
Removed singleton ConnectionManager as non-conforming to the spec. Ad…
WhitWaldo Sep 25, 2024
dd22e83
Added default cancellation token value
WhitWaldo Sep 25, 2024
521396f
Minor refactoring so cancellation exceptions can be handled in a sing…
WhitWaldo Sep 25, 2024
ef418c0
Refactoring to ensure that channels are drained successfully even if …
WhitWaldo Sep 25, 2024
3addf75
Cleaning up unnecessary iniitalization values, usings and null access…
WhitWaldo Sep 25, 2024
1cb6855
Modified message drain to only drain acknowledgements and even then, …
WhitWaldo Sep 25, 2024
462e9a1
Updated to eliminate the double call for 'Register.SubscribeAsync' as…
WhitWaldo Sep 25, 2024
f16effc
Removed unnecessary call to DisposeAsync
WhitWaldo Sep 25, 2024
67a637b
Updated to apply some performance improvements to the channels at ins…
WhitWaldo Sep 25, 2024
a67185d
Added cancellation token to acknowledgement channel writer
WhitWaldo Sep 26, 2024
ce5b24a
Merge branch 'master' into stream-sub
WhitWaldo Oct 14, 2024
6db55c4
Merge branch 'master' into stream-sub
WhitWaldo Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ This repo builds the following packages:
- Dapr.AspNetCore
- Dapr.Actors
- Dapr.Actors.AspNetCore
- Dapr.Common
- Dapr.Extensions.Configuration
- Dapr.Messaging
- Dapr.Protos
- Dapr.Workflow

### Prerequisites
Expand Down
35 changes: 35 additions & 0 deletions all.sln
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dapr.E2E.Test.Actors.Genera
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cryptography", "examples\Client\Cryptography\Cryptography.csproj", "{C74FBA78-13E8-407F-A173-4555AEE41FF3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{250F0236-2014-4DD8-A688-CD25EE299FA3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dapr.Protos", "src\Dapr.Protos\Dapr.Protos.csproj", "{CE506C30-5701-47C9-A86E-39D796B8DF35}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dapr.Common", "src\Dapr.Common\Dapr.Common.csproj", "{9AB7EB9D-82CB-4BC6-B895-4F52F8DC489F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{93C6ABAF-F4B7-4CA2-8734-565EF847668A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{E748C988-1F5F-4A34-9A5C-2EE2B6CD1BA2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -290,6 +300,26 @@ Global
{C74FBA78-13E8-407F-A173-4555AEE41FF3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C74FBA78-13E8-407F-A173-4555AEE41FF3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C74FBA78-13E8-407F-A173-4555AEE41FF3}.Release|Any CPU.Build.0 = Release|Any CPU
{250F0236-2014-4DD8-A688-CD25EE299FA3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{250F0236-2014-4DD8-A688-CD25EE299FA3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{250F0236-2014-4DD8-A688-CD25EE299FA3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{250F0236-2014-4DD8-A688-CD25EE299FA3}.Release|Any CPU.Build.0 = Release|Any CPU
{CE506C30-5701-47C9-A86E-39D796B8DF35}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CE506C30-5701-47C9-A86E-39D796B8DF35}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CE506C30-5701-47C9-A86E-39D796B8DF35}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CE506C30-5701-47C9-A86E-39D796B8DF35}.Release|Any CPU.Build.0 = Release|Any CPU
{9AB7EB9D-82CB-4BC6-B895-4F52F8DC489F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9AB7EB9D-82CB-4BC6-B895-4F52F8DC489F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9AB7EB9D-82CB-4BC6-B895-4F52F8DC489F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9AB7EB9D-82CB-4BC6-B895-4F52F8DC489F}.Release|Any CPU.Build.0 = Release|Any CPU
{93C6ABAF-F4B7-4CA2-8734-565EF847668A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{93C6ABAF-F4B7-4CA2-8734-565EF847668A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{93C6ABAF-F4B7-4CA2-8734-565EF847668A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{93C6ABAF-F4B7-4CA2-8734-565EF847668A}.Release|Any CPU.Build.0 = Release|Any CPU
{E748C988-1F5F-4A34-9A5C-2EE2B6CD1BA2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E748C988-1F5F-4A34-9A5C-2EE2B6CD1BA2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E748C988-1F5F-4A34-9A5C-2EE2B6CD1BA2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E748C988-1F5F-4A34-9A5C-2EE2B6CD1BA2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -343,6 +373,11 @@ Global
{AF89083D-4715-42E6-93E9-38497D12A8A6} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{B5CDB0DC-B26D-48F1-B934-FE5C1C991940} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{C74FBA78-13E8-407F-A173-4555AEE41FF3} = {A7F41094-8648-446B-AECD-DCC2CC871F73}
{250F0236-2014-4DD8-A688-CD25EE299FA3} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{CE506C30-5701-47C9-A86E-39D796B8DF35} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{9AB7EB9D-82CB-4BC6-B895-4F52F8DC489F} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{93C6ABAF-F4B7-4CA2-8734-565EF847668A} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{E748C988-1F5F-4A34-9A5C-2EE2B6CD1BA2} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System.Text;
using Dapr.Messaging.PublishSubscribe;

var daprMessagingClientBuilder = new DaprPublishSubscribeClientBuilder();
var daprMessagingClient = daprMessagingClientBuilder.Build();

//Processor for each of the messages returned from the subscription
async Task<TopicResponseAction> HandleMessage(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return await Task.FromResult(TopicResponseAction.Success);
}
catch
{
return await Task.FromResult(TopicResponseAction.Retry);
}
}

//Create a dynamic streaming subscription
var subscription = daprMessagingClient.Register("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)),
HandleMessage, CancellationToken.None);

//Subscribe to messages on it with a timeout of 30 seconds
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await subscription.SubscribeAsync(cancellationTokenSource.Token);

await Task.Delay(TimeSpan.FromMinutes(1));

//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Dapr.Messaging\Dapr.Messaging.csproj" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion properties/IsExternalInit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ namespace System.Runtime.CompilerServices
internal static class IsExternalInit
{
}

}
18 changes: 18 additions & 0 deletions src/Dapr.Common/Dapr.Common.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\Shared\DaprDefaults.cs" />
<Compile Include="..\Shared\ArgumentVerifier.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.52.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="3.1.32" />
</ItemGroup>

</Project>
55 changes: 55 additions & 0 deletions src/Dapr.Common/DaprException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Runtime.Serialization;

namespace Dapr
{
/// <summary>
/// The base type of exceptions thrown by the Dapr .NET SDK.
/// </summary>
[Serializable]
public class DaprException : Exception
{
/// <summary>
/// Initializes a new instance of <see cref="DaprException" /> with the provided <paramref name="message" />.
/// </summary>
/// <param name="message">The exception message.</param>
public DaprException(string message)
: base(message)
{
}
/// <summary>
/// Initializes a new instance of <see cref="DaprException" /> with the provided
/// <paramref name="message" /> and <paramref name="innerException" />.
/// </summary>
/// <param name="message">The exception message.</param>
/// <param name="innerException">The inner exception.</param>
public DaprException(string message, Exception innerException)
: base(message, innerException)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="DaprException"/> class with a specified context.
/// </summary>
/// <param name="info">The <see cref="SerializationInfo" /> object that contains serialized object data of the exception being thrown.</param>
/// <param name="context">The <see cref="StreamingContext" /> object that contains contextual information about the source or destination. The context parameter is reserved for future use and can be null.</param>
#if NET8_0_OR_GREATER
[Obsolete(DiagnosticId = "SYSLIB0051")] // add this attribute to GetObjectData
#endif
protected DaprException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
}
Loading
Loading