Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using Rebus.Logging;
using Rebus.Oracle.Outbox;
using Rebus.Pipeline;
using Rebus.Retry.Simple;
using Rebus.Threading;
using Rebus.Transport;
using System;

namespace Rebus.Config.Outbox;

/// <summary>
/// Configuration extensions for the experimental outbox support
/// </summary>
public static class SqlServerOutboxConfigurationExtensions
{
/// <summary>
/// Configures Rebus to use an outbox.
/// This will store a (message ID, source queue) tuple for all processed messages, and under this tuple any messages sent/published will
/// also be stored, thus enabling truly idempotent message processing.
/// </summary>
public static RebusConfigurer Outbox(this RebusConfigurer configurer, Action<StandardConfigurer<IOutboxStorage>> configure)
{
if (configurer == null) throw new ArgumentNullException(nameof(configurer));
if (configure == null) throw new ArgumentNullException(nameof(configure));

configurer.Options(o =>
{
configure(StandardConfigurer<IOutboxStorage>.GetConfigurerFrom(o));

// if no outbox storage was registered, no further calls must have been made... that's ok, so we just bail out here
if (!o.Has<IOutboxStorage>()) return;

o.Decorate<ITransport>(c => new OutboxClientTransportDecorator(c.Get<ITransport>(), c.Get<IOutboxStorage>()));

o.Register(c =>
{
var asyncTaskFactory = c.Get<IAsyncTaskFactory>();
var rebusLoggerFactory = c.Get<IRebusLoggerFactory>();
var outboxStorage = c.Get<IOutboxStorage>();
var transport = c.Get<ITransport>();
return new OutboxForwarder(asyncTaskFactory, rebusLoggerFactory, outboxStorage, transport);
});

o.Decorate(c =>
{
_ = c.Get<OutboxForwarder>();
return c.Get<Options>();
});

o.Decorate<IPipeline>(c =>
{
var pipeline = c.Get<IPipeline>();
var outboxConnectionProvider = c.Get<IOutboxConnectionProvider>();
var step = new OutboxIncomingStep(outboxConnectionProvider);
return new PipelineStepInjector(pipeline)
.OnReceive(step, PipelineRelativePosition.After, typeof(DefaultRetryStep));
});
});

return configurer;
}
}
29 changes: 29 additions & 0 deletions Rebus.Oracle/Config/Outbox/OutboxConnectionProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using Oracle.ManagedDataAccess.Client;
using Rebus.Oracle.Outbox;

namespace Rebus.Config.Outbox;

public class OutboxConnectionProvider(string connectionString) : IOutboxConnectionProvider
{
readonly string _connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));

public OutboxConnection GetDbConnection()
{
var connection = new OracleConnection(_connectionString);

try
{
connection.Open();

var transaction = connection.BeginTransaction();

return new OutboxConnection(connection, transaction);
}
catch
{
connection.Dispose();
throw;
}
}
}
85 changes: 85 additions & 0 deletions Rebus.Oracle/Config/Outbox/OutboxExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using Oracle.ManagedDataAccess.Client;
using Rebus.Oracle;
using Rebus.Oracle.Outbox;
using Rebus.Transport;
using System;

// ReSharper disable ArgumentsStyleLiteral

namespace Rebus.Config.Outbox;

/// <summary>
/// Configuration extensions for Oracle-based outbox
/// </summary>
public static class OutboxExtensions
{
internal const string CurrentOutboxConnectionKey = "current-outbox-connection";

/// <summary>
/// Configures Oracle as the outbox storage
/// </summary>
public static void StoreInOracle(this StandardConfigurer<IOutboxStorage> configurer, string connectionString,
string tableName, bool automaticallyCreateTables = true, Action<OracleConnection> additionalConnectionSetup = null,
bool enlistInAmbientTransaction = false)
{
if (configurer == null) throw new ArgumentNullException(nameof(configurer));
if (connectionString == null) throw new ArgumentNullException(nameof(connectionString));
if (tableName == null) throw new ArgumentNullException(nameof(tableName));

configurer
.OtherService<IOutboxStorage>()
.Register(_ => new OracleServerOutboxStorage(ConnectionProvider, tableName));

configurer.OtherService<IOutboxConnectionProvider>()
.Register(_ => new OutboxConnectionProvider(connectionString));

return;

IDbConnection ConnectionProvider(ITransactionContext context)
{
// if we find a connection in the context, use that (and accept that its lifestyle is managed somewhere else):
if (context.Items.TryGetValue(CurrentOutboxConnectionKey, out var result) &&
result is OutboxConnection outboxConnection)
{
return new DbConnectionWrapper(outboxConnection.Connection, outboxConnection.Transaction,
managedExternally: true);
}

var connection = new OracleConnection(connectionString);

connection.Open();

try
{
var transaction = connection.BeginTransaction();

return new DbConnectionWrapper(connection, transaction, managedExternally: false);
}
catch
{
connection.Dispose();
throw;
}
}
}

/// <summary>
/// Enables the use of outbox on the <see cref="RebusTransactionScope"/>. Will enlist all outgoing message operations in the
/// <paramref name="connection"/>/<paramref name="transaction"/> passed to the method.
/// </summary>
public static void UseOutbox(this RebusTransactionScope rebusTransactionScope, OracleConnection connection,
OracleTransaction transaction)
{
if (rebusTransactionScope == null) throw new ArgumentNullException(nameof(rebusTransactionScope));
if (connection == null) throw new ArgumentNullException(nameof(connection));
if (transaction == null) throw new ArgumentNullException(nameof(transaction));

var context = rebusTransactionScope.TransactionContext;

if (!context.Items.TryAdd(CurrentOutboxConnectionKey, new OutboxConnection(connection, transaction)))
{
throw new InvalidOperationException(
"Cannot add the given connection/transaction to the current Rebus transaction, because a connection/transaction has already been added!");
}
}
}
8 changes: 8 additions & 0 deletions Rebus.Oracle/IsExternalInit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// ReSharper disable CheckNamespace
// ReSharper disable UnusedMember.Global
#pragma warning disable CS1591
namespace System.Runtime.CompilerServices;

internal class IsExternalInit
{
}
115 changes: 115 additions & 0 deletions Rebus.Oracle/Oracle/AsyncHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
// ReSharper disable AsyncVoidLambda

namespace Rebus.Oracle;

static class AsyncHelpers
{
/// <summary>
/// Executes a task synchronously on the calling thread by installing a temporary synchronization context that queues continuations
/// </summary>
public static T GetSync<T>(Func<Task<T>> task)
{
var result = default(T);
RunSync(async () => result = await task());
return result;
}

/// <summary>
/// Executes a task synchronously on the calling thread by installing a temporary synchronization context that queues continuations
/// </summary>
public static void RunSync(Func<Task> task)
{
var currentContext = SynchronizationContext.Current;
var customContext = new CustomSynchronizationContext(task);

try
{
SynchronizationContext.SetSynchronizationContext(customContext);

customContext.Run();
}
finally
{
SynchronizationContext.SetSynchronizationContext(currentContext);
}
}

/// <summary>
/// Synchronization context that can be "pumped" in order to have it execute continuations posted back to it
/// </summary>
class CustomSynchronizationContext : SynchronizationContext
{
readonly ConcurrentQueue<Tuple<SendOrPostCallback, object>> _items = new();
readonly AutoResetEvent _workItemsWaiting = new(false);
readonly Func<Task> _task;

ExceptionDispatchInfo _caughtException;

bool _done;

public CustomSynchronizationContext(Func<Task> task)
{
_task = task ?? throw new ArgumentNullException(nameof(task), "Please remember to pass a Task to be executed");
}

public override void Post(SendOrPostCallback function, object state)
{
_items.Enqueue(Tuple.Create(function, state));
_workItemsWaiting.Set();
}

/// <summary>
/// Enqueues the function to be executed and executes all resulting continuations until it is completely done
/// </summary>
public void Run()
{
Post(async _ =>
{
try
{
await _task().ConfigureAwait(false);
}
catch (Exception exception)
{
_caughtException = ExceptionDispatchInfo.Capture(exception);
throw;
}
finally
{
Post(_ => _done = true, null);
}
}, null);

while (!_done)
{
if (_items.TryDequeue(out var task))
{
task.Item1(task.Item2);

if (_caughtException == null) continue;

_caughtException.Throw();
}
else
{
_workItemsWaiting.WaitOne();
}
}
}

public override void Send(SendOrPostCallback d, object state)
{
throw new NotSupportedException("Cannot send to same thread");
}

public override SynchronizationContext CreateCopy()
{
return this;
}
}
}
Loading