From e40254c8a0a70e782712325488057649071ea039 Mon Sep 17 00:00:00 2001 From: Nehan Pathan Date: Wed, 10 Sep 2025 16:17:06 +0530 Subject: [PATCH 1/5] Add IAsyncReplicator interface and async support in HttpReplicator (client-side async API, related to #XXXX) --- .../Http/HttpClientBase.cs | 132 ++++++++++ .../Http/HttpReplicator.cs | 95 ++++++- src/Lucene.Net.Replicator/IAsyncReplicator.cs | 35 +++ .../ReplicationClient.cs | 246 ++++++++++++++++++ 4 files changed, 507 insertions(+), 1 deletion(-) create mode 100644 src/Lucene.Net.Replicator/IAsyncReplicator.cs diff --git a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs index b6e94fb49a..cfffe37545 100644 --- a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs +++ b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs @@ -200,6 +200,29 @@ protected virtual HttpResponseMessage ExecuteGet(string request, params string[] return Execute(req); } + /// + /// Execute a GET request asynchronously with an array of parameters. + /// + protected Task ExecuteGetAsync(string action, string[] parameters, CancellationToken cancellationToken) + { + var url = BuildUrl(action, parameters); + return Client.GetAsync(url, cancellationToken); + } + + /// + /// Execute a GET request asynchronously with up to 3 name/value parameters. + /// + protected Task ExecuteGetAsync( + string action, + string param1, string value1, + string param2 = null, string value2 = null, + string param3 = null, string value3 = null, + CancellationToken cancellationToken = default) + { + var url = BuildUrl(action, param1, value1, param2, value2, param3, value3); + return Client.GetAsync(url, cancellationToken); + } + private HttpResponseMessage Execute(HttpRequestMessage request) { //.NET Note: Bridging from Async to Sync, this is not ideal and we could consider changing the interface to be Async or provide Async overloads @@ -217,6 +240,31 @@ private string QueryString(string request, params string[] parameters) .Join("&", parameters.Select(WebUtility.UrlEncode).InPairs((key, val) => string.Format("{0}={1}", key, val)))); } + // Add this property so subclasses can access the HttpClient instance + protected HttpClient Client => httpc; + + // BuildUrl helpers (mirror the QueryString overloads) + protected virtual string BuildUrl(string action, string[] parameters) + { + // QueryString has signature: QueryString(string request, params string[] parameters) + return QueryString(action, parameters); + } + + protected virtual string BuildUrl( + string action, + string param1, string value1, + string param2 = null, string value2 = null, + string param3 = null, string value3 = null) + { + // Forward to QueryString which accepts params string[] + if (param2 == null && param3 == null) + { + return QueryString(action, param1, value1); + } + return QueryString(action, param1, value1, param2, value2, param3, value3); + } + + /// /// Internal utility: input stream of the provided response. /// @@ -262,6 +310,37 @@ public virtual Stream GetResponseStream(HttpResponseMessage response, bool consu return result; } + /// + /// Internal utility: input stream of the provided response asynchronously. + /// + /// + public virtual async Task GetResponseStreamAsync(HttpResponseMessage response, CancellationToken cancellationToken = default) + { + #if NET8_0_OR_GREATER + Stream result = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); + #else + Stream result = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + #endif + return result; + } + + /// + /// Internal utility: input stream of the provided response asynchronously, which optionally + /// consumes the response's resources when the input stream is exhausted. + /// + /// + public virtual async Task GetResponseStreamAsync(HttpResponseMessage response, bool consume, CancellationToken cancellationToken = default) + { + #if NET8_0_OR_GREATER + Stream result = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); + #else + Stream result = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + #endif + if (consume) + result = new ConsumingStream(result); + return result; + } + /// /// Returns true if this instance was ed, otherwise /// returns false. Note that if you override , you must call @@ -319,6 +398,59 @@ protected virtual T DoAction(HttpResponseMessage response, bool consume, Func return default; // silly, if we're here, IOUtils.reThrow always throws an exception } + /// + /// Do a specific async action and validate after the action that the status is still OK, + /// and if not, attempt to extract the actual server side exception. Optionally + /// release the response at exit, depending on parameter. + /// + protected virtual async Task DoActionAsync(HttpResponseMessage response, bool consume, Func> call) + { + Exception th = null; + try + { + VerifyStatus(response); + return await call().ConfigureAwait(false); + } + catch (Exception t) when (t.IsThrowable()) + { + th = t; + } + finally + { + try + { + VerifyStatus(response); + } + finally + { + if (consume) + { + try + { + ConsumeQuietly(response); + } + catch + { + // ignore on purpose + } + } + } + } + + if (Debugging.AssertsEnabled) Debugging.Assert(th != null); + Util.IOUtils.ReThrow(th); + return default!; // never reached, rethrow above always throws + } + + /// + /// Calls the overload passing true to consume. + /// + protected virtual Task DoActionAsync(HttpResponseMessage response, Func> call) + { + return DoActionAsync(response, true, call); + } + + /// /// Disposes this . /// When called with true, this disposes the underlying . diff --git a/src/Lucene.Net.Replicator/Http/HttpReplicator.cs b/src/Lucene.Net.Replicator/Http/HttpReplicator.cs index 5ae21ad06c..76b69f7bec 100644 --- a/src/Lucene.Net.Replicator/Http/HttpReplicator.cs +++ b/src/Lucene.Net.Replicator/Http/HttpReplicator.cs @@ -2,6 +2,9 @@ using System; using System.IO; using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; + namespace Lucene.Net.Replicator.Http { @@ -28,7 +31,7 @@ namespace Lucene.Net.Replicator.Http /// /// @lucene.experimental /// - public class HttpReplicator : HttpClientBase, IReplicator + public class HttpReplicator : HttpClientBase, IReplicator, IAsyncReplicator { /// /// Creates a new with the given host, port and path. @@ -106,5 +109,95 @@ public virtual void Release(string sessionId) // do not remove this call: as it is still validating for us! DoAction(response, () => null); } + + #region Async methods (IAsyncReplicator) + + /// + /// Checks for updates at the remote host asynchronously. + /// + /// The current index version. + /// Cancellation token. + /// + /// A if updates are available; otherwise, null. + /// + public async Task CheckForUpdateAsync(string currentVersion, CancellationToken cancellationToken = default) + { + string[] parameters = currentVersion != null + ? new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion } + : null; + + using var response = await ExecuteGetAsync( + ReplicationService.ReplicationAction.UPDATE.ToString(), + parameters, + cancellationToken: cancellationToken).ConfigureAwait(false); + + return await DoActionAsync(response, async () => + { + using var inputStream = new DataInputStream( + await GetResponseStreamAsync(response, cancellationToken).ConfigureAwait(false)); + + return inputStream.ReadByte() == 0 ? null : new SessionToken(inputStream); + }).ConfigureAwait(false); + } + + /// + /// Obtains the given file from the remote host asynchronously. + /// + /// The session ID. + /// The source of the file. + /// The file name. + /// Cancellation token. + /// A of the requested file. + public async Task ObtainFileAsync(string sessionId, string source, string fileName, CancellationToken cancellationToken = default) + { + using var response = await ExecuteGetAsync( + ReplicationService.ReplicationAction.OBTAIN.ToString(), + ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionId, + ReplicationService.REPLICATE_SOURCE_PARAM, source, + ReplicationService.REPLICATE_FILENAME_PARAM, fileName, + cancellationToken: cancellationToken).ConfigureAwait(false); + + return await DoActionAsync(response, async () => + { + return await GetResponseStreamAsync(response, cancellationToken).ConfigureAwait(false); + }).ConfigureAwait(false); + } + + /// + /// Publishes a new asynchronously. + /// Not supported in this implementation. + /// + /// The revision to publish. + /// Cancellation token. + /// A representing the operation. + /// Always thrown. + public Task PublishAsync(IRevision revision, CancellationToken cancellationToken = default) + { + throw UnsupportedOperationException.Create( + "this replicator implementation does not support remote publishing of revisions"); + } + + /// + /// Releases the session at the remote host asynchronously. + /// + /// The session ID to release. + /// Cancellation token. + /// A representing the operation. + public async Task ReleaseAsync(string sessionId, CancellationToken cancellationToken = default) + { + using var response = await ExecuteGetAsync( + ReplicationService.ReplicationAction.RELEASE.ToString(), + ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionId, + cancellationToken: cancellationToken).ConfigureAwait(false); + + await DoActionAsync(response, () => + { + // No actual response content needed — just verification + return Task.FromResult(null); + }).ConfigureAwait(false); + } + + #endregion + } } diff --git a/src/Lucene.Net.Replicator/IAsyncReplicator.cs b/src/Lucene.Net.Replicator/IAsyncReplicator.cs new file mode 100644 index 0000000000..d1e689e657 --- /dev/null +++ b/src/Lucene.Net.Replicator/IAsyncReplicator.cs @@ -0,0 +1,35 @@ +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Lucene.Net.Replicator +{ + /// + /// Async version of for non-blocking replication operations. + /// + public interface IAsyncReplicator + { + /// + /// Check whether the given version is up-to-date and returns a + /// which can be used for fetching the revision files. + /// + /// Current version of the index. + /// Cancellation token. + Task CheckForUpdateAsync(string currentVersion, CancellationToken cancellationToken = default); + + /// + /// Returns a stream for the requested file and source. + /// + Task ObtainFileAsync(string sessionId, string source, string fileName, CancellationToken cancellationToken = default); + + /// + /// Notify that the specified session is no longer needed. + /// + Task ReleaseAsync(string sessionId, CancellationToken cancellationToken = default); + + /// + /// Publishing revisions is not supported in HttpReplicator; throw if called. + /// + Task PublishAsync(IRevision revision, CancellationToken cancellationToken = default); + } +} diff --git a/src/Lucene.Net.Replicator/ReplicationClient.cs b/src/Lucene.Net.Replicator/ReplicationClient.cs index f8fba031e8..292118c490 100644 --- a/src/Lucene.Net.Replicator/ReplicationClient.cs +++ b/src/Lucene.Net.Replicator/ReplicationClient.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.IO; using System.Threading; +using System.Threading.Tasks; using Directory = Lucene.Net.Store.Directory; using JCG = J2N.Collections.Generic; @@ -127,6 +128,7 @@ public override void Run() public const string INFO_STREAM_COMPONENT = "ReplicationThread"; private readonly IReplicator replicator; + private readonly IAsyncReplicator asyncReplicator; private readonly IReplicationHandler handler; private readonly ISourceDirectoryFactory factory; @@ -135,6 +137,8 @@ public override void Run() private readonly object syncLock = new object(); // LUCENENET specific to avoid lock (this) private ReplicationThread updateThread; + private CancellationTokenSource asyncUpdateCts; + private Task asyncUpdateTask; private bool disposed = false; private InfoStream infoStream = InfoStream.Default; @@ -151,6 +155,20 @@ public ReplicationClient(IReplicator replicator, IReplicationHandler handler, IS this.factory = factory; } + /// + /// Constructor for async replicators. + /// + /// + /// + /// + /// + public ReplicationClient(IAsyncReplicator asyncReplicator, IReplicationHandler handler, ISourceDirectoryFactory factory) + { + this.asyncReplicator = asyncReplicator ?? throw new ArgumentNullException(nameof(asyncReplicator)); + this.handler = handler ?? throw new ArgumentNullException(nameof(handler)); + this.factory = factory ?? throw new ArgumentNullException(nameof(factory)); + } + /// private void CopyBytes(IndexOutput output, Stream input) { @@ -261,6 +279,101 @@ private void DoUpdate() } } + /// + /// Performs the async update logic, mirrors DoUpdate but uses IAsyncReplicator. + /// + private async Task DoUpdateAsync(CancellationToken cancellationToken) + { + if (asyncReplicator is null) + throw new InvalidOperationException("AsyncReplicator not initialized."); + + SessionToken? session = null; + var sourceDirectory = new Dictionary(); + var copiedFiles = new Dictionary>(); + bool notify = false; + + try + { + string version = handler.CurrentVersion; + session = await asyncReplicator.CheckForUpdateAsync(version, cancellationToken).ConfigureAwait(false); + + WriteToInfoStream($"DoUpdateAsync(): handlerVersion={version} session={session}"); + + if (session is null) + return; + + IDictionary> requiredFiles = RequiredFiles(session.SourceFiles); + WriteToInfoStream($"DoUpdateAsync(): handlerVersion={version} session={session}"); + + foreach (var pair in requiredFiles) + { + string source = pair.Key; + Directory directory = factory.GetDirectory(session.Id, source); + + sourceDirectory.Add(source, directory); + IList cpFiles = new J2N.Collections.Generic.List(); + copiedFiles.Add(source, cpFiles); + + foreach (var file in pair.Value) + { + cancellationToken.ThrowIfCancellationRequested(); + + Stream input = null; + IndexOutput output = null; + try + { + input = await asyncReplicator.ObtainFileAsync(session.Id, source, file.FileName, cancellationToken).ConfigureAwait(false); + output = directory.CreateOutput(file.FileName, IOContext.DEFAULT); + + int numBytes; + byte[] buffer = new byte[16384]; + while ((numBytes = await input.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) > 0) + { + output.WriteBytes(buffer, 0, numBytes); + } + + cpFiles.Add(file.FileName); + } + finally + { + IOUtils.Dispose(input, output); + } + } + } + + notify = true; + } + finally + { + if (session != null) + { + try + { + await asyncReplicator.ReleaseAsync(session.Id, cancellationToken).ConfigureAwait(false); + } + finally + { + if (!notify) + { + IOUtils.Dispose(sourceDirectory.Values); + factory.CleanupSession(session.Id); + } + } + } + } + + if (notify && !disposed) + { + handler.RevisionReady(session!.Version, session.SourceFiles, Collections.AsReadOnly(copiedFiles), sourceDirectory); + } + + IOUtils.Dispose(sourceDirectory.Values); + if (session != null) + { + factory.CleanupSession(session.Id); + } + } + /// Throws if the client has already been disposed. protected void EnsureOpen() { @@ -440,6 +553,120 @@ public virtual bool IsUpdateThreadAlive } } + /// + /// Start a periodic async update loop with the specified interval in milliseconds. + /// + /// Interval between updates. + /// Optional name for logging purposes. + public virtual void StartAsyncUpdateLoop(long intervalInMilliseconds, string threadName = null) + { + UninterruptableMonitor.Enter(syncLock); + try + { + EnsureOpen(); + + if (asyncUpdateTask != null && !asyncUpdateTask.IsCompleted) + throw IllegalStateException.Create("Async update loop is already running. Stop it first."); + + threadName ??= "ReplicationAsyncLoop"; + + asyncUpdateCts = new CancellationTokenSource(); + CancellationToken ct = asyncUpdateCts.Token; + + asyncUpdateTask = Task.Run(async () => + { + while (!ct.IsCancellationRequested) + { + try + { + updateLock.Lock(); + try + { + await DoUpdateAsync(ct).ConfigureAwait(false); + } + finally + { + updateLock.Unlock(); + } + } + catch (OperationCanceledException) + { + // Cancellation requested, exit loop gracefully + break; + } + catch (Exception ex) + { + HandleUpdateException(ex); + } + + try + { + await Task.Delay(TimeSpan.FromMilliseconds(intervalInMilliseconds), ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; // exit loop if cancelled during delay + } + } + }, ct); + } + finally + { + UninterruptableMonitor.Exit(syncLock); + } + } + + /// + /// Stops the async update loop, if it is running. + /// + public virtual async Task StopAsyncUpdateLoop() + { + UninterruptableMonitor.Enter(syncLock); + try + { + if (asyncUpdateCts != null) + { + asyncUpdateCts.Cancel(); + try + { + if (asyncUpdateTask != null) + await asyncUpdateTask.ConfigureAwait(false); + } + catch (OperationCanceledException) { } + finally + { + asyncUpdateTask = null; + asyncUpdateCts.Dispose(); + asyncUpdateCts = null; + } + } + } + finally + { + UninterruptableMonitor.Exit(syncLock); + } + } + + /// + /// Returns true if the async update loop is running. + /// + public virtual bool IsAsyncUpdateLoopAlive + { + get + { + UninterruptableMonitor.Enter(syncLock); + try + { + return asyncUpdateTask != null && !asyncUpdateTask.IsCompleted; + } + finally + { + UninterruptableMonitor.Exit(syncLock); + } + } + } + + public override string ToString() { if (updateThread is null) @@ -468,6 +695,25 @@ public virtual void UpdateNow() } } + /// + /// Executes the update operation asynchronously immediately, regardless if an update thread is running or not. + /// + public virtual async Task UpdateNowAsync(CancellationToken cancellationToken = default) + { + EnsureOpen(); + + // Acquire the same update lock to prevent concurrent updates + updateLock.Lock(); + try + { + await DoUpdateAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + updateLock.Unlock(); + } + } + /// /// Gets or sets the to use for logging messages. /// From 298f041f6bca87a897ba1ed72535b234b4b602a8 Mon Sep 17 00:00:00 2001 From: Nehan Pathan Date: Thu, 11 Sep 2025 15:38:30 +0530 Subject: [PATCH 2/5] Replace ReentrantLock with SemaphoreSlim in ReplicationClient (async-safe); fix all warnings and nullable issues --- .../Http/HttpClientBase.cs | 120 ++++++++++-------- .../Http/HttpReplicator.cs | 28 ++-- src/Lucene.Net.Replicator/IAsyncReplicator.cs | 3 + .../ReplicationClient.cs | 82 ++++++------ 4 files changed, 127 insertions(+), 106 deletions(-) diff --git a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs index cfffe37545..04f0a0fd82 100644 --- a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs +++ b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs @@ -1,3 +1,4 @@ +#nullable enable using Lucene.Net.Diagnostics; using Lucene.Net.Support; using Newtonsoft.Json; @@ -73,7 +74,7 @@ public abstract class HttpClientBase : IDisposable /// The port to be used to connect on. /// The path to the replicator on the host. /// Optional, The HTTP handler stack to use for sending requests, defaults to null. - protected HttpClientBase(string host, int port, string path, HttpMessageHandler messageHandler = null) + protected HttpClientBase(string host, int port, string path, HttpMessageHandler? messageHandler = null) : this(NormalizedUrl(host, port, path), messageHandler) { } @@ -88,7 +89,7 @@ protected HttpClientBase(string host, int port, string path, HttpMessageHandler /// The full url, including with host, port and path. /// Optional, The HTTP handler stack to use for sending requests. //Note: LUCENENET Specific - protected HttpClientBase(string url, HttpMessageHandler messageHandler = null) + protected HttpClientBase(string url, HttpMessageHandler? messageHandler = null) : this(url, new HttpClient(messageHandler ?? new HttpClientHandler()) { Timeout = DEFAULT_TIMEOUT }) { } @@ -171,27 +172,51 @@ protected virtual void ThrowKnownError(HttpResponseMessage response) } /// - /// Internal: Execute a request and return its result. + /// Internal: Execute a POST request with custom HttpContent and return its result. /// The argument is treated as: name1,value1,name2,value2,... /// - protected virtual HttpResponseMessage ExecutePost(string request, object entity, params string[] parameters) + protected virtual HttpResponseMessage ExecutePost(string request, HttpContent content, params string[]? parameters) { EnsureOpen(); - //.NET Note: No headers? No ContentType?... Bad use of Http? - HttpRequestMessage req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters)); + var req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters)) + { + Content = content + }; + + // Use SendAsync + GetAwaiter().GetResult() to bridge sync call + var resp = httpc.SendAsync(req, HttpCompletionOption.ResponseHeadersRead) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + VerifyStatus(resp); + return resp; + } - req.Content = new StringContent(JToken.FromObject(entity, JsonSerializer.Create()) - .ToString(Formatting.None), Encoding.UTF8, "application/json"); + /// + /// Internal: Execute a POST request asynchronously with custom HttpContent. + /// The argument is treated as: name1,value1,name2,value2,... + /// + protected virtual async Task ExecutePostAsync(string request, HttpContent content, params string[]? parameters) + { + EnsureOpen(); - return Execute(req); + var req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters)) + { + Content = content + }; + + var resp = await httpc.SendAsync(req).ConfigureAwait(false); // Async call + VerifyStatus(resp); + return resp; } + /// /// Internal: Execute a request and return its result. /// The argument is treated as: name1,value1,name2,value2,... /// - protected virtual HttpResponseMessage ExecuteGet(string request, params string[] parameters) + protected virtual HttpResponseMessage ExecuteGet(string request, params string[]? parameters) { EnsureOpen(); @@ -203,10 +228,11 @@ protected virtual HttpResponseMessage ExecuteGet(string request, params string[] /// /// Execute a GET request asynchronously with an array of parameters. /// - protected Task ExecuteGetAsync(string action, string[] parameters, CancellationToken cancellationToken) + protected Task ExecuteGetAsync(string action, string[]? parameters, CancellationToken cancellationToken) { - var url = BuildUrl(action, parameters); - return Client.GetAsync(url, cancellationToken); + EnsureOpen(); + var url = QueryString(action, parameters); + return httpc.GetAsync(url, cancellationToken); } /// @@ -215,12 +241,18 @@ protected Task ExecuteGetAsync(string action, string[] para protected Task ExecuteGetAsync( string action, string param1, string value1, - string param2 = null, string value2 = null, - string param3 = null, string value3 = null, + string? param2 = null, string? value2 = null, + string? param3 = null, string? value3 = null, CancellationToken cancellationToken = default) { - var url = BuildUrl(action, param1, value1, param2, value2, param3, value3); - return Client.GetAsync(url, cancellationToken); + EnsureOpen(); + var url = (param2 == null && param3 == null) + ? QueryString(action, param1, value1) + : QueryString(action, + param1, value1, + param2 ?? string.Empty, value2 ?? string.Empty, + param3 ?? string.Empty, value3 ?? string.Empty); + return httpc.GetAsync(url, cancellationToken); } private HttpResponseMessage Execute(HttpRequestMessage request) @@ -232,7 +264,7 @@ private HttpResponseMessage Execute(HttpRequestMessage request) return response; } - private string QueryString(string request, params string[] parameters) + private string QueryString(string request, params string[]? parameters) { return parameters is null ? string.Format("{0}/{1}", Url, request) @@ -240,31 +272,6 @@ private string QueryString(string request, params string[] parameters) .Join("&", parameters.Select(WebUtility.UrlEncode).InPairs((key, val) => string.Format("{0}={1}", key, val)))); } - // Add this property so subclasses can access the HttpClient instance - protected HttpClient Client => httpc; - - // BuildUrl helpers (mirror the QueryString overloads) - protected virtual string BuildUrl(string action, string[] parameters) - { - // QueryString has signature: QueryString(string request, params string[] parameters) - return QueryString(action, parameters); - } - - protected virtual string BuildUrl( - string action, - string param1, string value1, - string param2 = null, string value2 = null, - string param3 = null, string value3 = null) - { - // Forward to QueryString which accepts params string[] - if (param2 == null && param3 == null) - { - return QueryString(action, param1, value1); - } - return QueryString(action, param1, value1, param2, value2, param3, value3); - } - - /// /// Internal utility: input stream of the provided response. /// @@ -304,7 +311,12 @@ public virtual Stream GetResponseStream(HttpResponseMessage response) // LUCENEN /// public virtual Stream GetResponseStream(HttpResponseMessage response, bool consume) // LUCENENET: This was ResponseInputStream in Lucene { +#if FEATURE_HTTPCONTENT_READASSTREAM + Stream result = response.Content.ReadAsStream(); +#else Stream result = response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult(); +#endif + if (consume) result = new ConsumingStream(result); return result; @@ -316,11 +328,11 @@ public virtual Stream GetResponseStream(HttpResponseMessage response, bool consu /// public virtual async Task GetResponseStreamAsync(HttpResponseMessage response, CancellationToken cancellationToken = default) { - #if NET8_0_OR_GREATER +#if FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN Stream result = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - #else +#else Stream result = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); - #endif +#endif return result; } @@ -331,11 +343,11 @@ public virtual async Task GetResponseStreamAsync(HttpResponseMessage res /// public virtual async Task GetResponseStreamAsync(HttpResponseMessage response, bool consume, CancellationToken cancellationToken = default) { - #if NET8_0_OR_GREATER +#if FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN Stream result = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); - #else +#else Stream result = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); - #endif +#endif if (consume) result = new ConsumingStream(result); return result; @@ -363,7 +375,7 @@ protected virtual T DoAction(HttpResponseMessage response, Func call) /// protected virtual T DoAction(HttpResponseMessage response, bool consume, Func call) { - Exception th = null; + Exception? th = null; try { return call(); @@ -394,8 +406,8 @@ protected virtual T DoAction(HttpResponseMessage response, bool consume, Func } } if (Debugging.AssertsEnabled) Debugging.Assert(th != null); // extra safety - if we get here, it means the Func failed - Util.IOUtils.ReThrow(th); - return default; // silly, if we're here, IOUtils.reThrow always throws an exception + Util.IOUtils.ReThrow(th!); + return default!; // silly, if we're here, IOUtils.reThrow always throws an exception } /// @@ -405,7 +417,7 @@ protected virtual T DoAction(HttpResponseMessage response, bool consume, Func /// protected virtual async Task DoActionAsync(HttpResponseMessage response, bool consume, Func> call) { - Exception th = null; + Exception? th = null; try { VerifyStatus(response); @@ -438,7 +450,7 @@ protected virtual async Task DoActionAsync(HttpResponseMessage response, b } if (Debugging.AssertsEnabled) Debugging.Assert(th != null); - Util.IOUtils.ReThrow(th); + Util.IOUtils.ReThrow(th!); return default!; // never reached, rethrow above always throws } diff --git a/src/Lucene.Net.Replicator/Http/HttpReplicator.cs b/src/Lucene.Net.Replicator/Http/HttpReplicator.cs index 76b69f7bec..305f681fda 100644 --- a/src/Lucene.Net.Replicator/Http/HttpReplicator.cs +++ b/src/Lucene.Net.Replicator/Http/HttpReplicator.cs @@ -1,3 +1,4 @@ +#nullable enable using J2N.IO; using System; using System.IO; @@ -37,8 +38,8 @@ public class HttpReplicator : HttpClientBase, IReplicator, IAsyncReplicator /// Creates a new with the given host, port and path. /// for more details. /// - public HttpReplicator(string host, int port, string path, HttpMessageHandler messageHandler = null) - : base(host, port, path, messageHandler) + public HttpReplicator(string host, int port, string path, HttpMessageHandler? messageHandler = null) + : base(host, port, path, messageHandler ?? new HttpClientHandler()) { } @@ -47,7 +48,7 @@ public HttpReplicator(string host, int port, string path, HttpMessageHandler mes /// for more details. /// //Note: LUCENENET Specific - public HttpReplicator(string url, HttpMessageHandler messageHandler = null) + public HttpReplicator(string url, HttpMessageHandler? messageHandler = null) : this(url, new HttpClient(messageHandler ?? new HttpClientHandler()) { Timeout = DEFAULT_TIMEOUT }) { } @@ -65,13 +66,13 @@ public HttpReplicator(string url, HttpClient client) /// /// Checks for updates at the remote host. /// - public virtual SessionToken CheckForUpdate(string currentVersion) + public virtual SessionToken? CheckForUpdate(string? currentVersion) { - string[] parameters = null; - if (currentVersion != null) - parameters = new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion }; + string[]? parameters = null; + if (!string.IsNullOrEmpty(currentVersion)) + parameters = new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion! }; - HttpResponseMessage response = base.ExecuteGet(ReplicationService.ReplicationAction.UPDATE.ToString(), parameters); + HttpResponseMessage response = base.ExecuteGet(ReplicationService.ReplicationAction.UPDATE.ToString(), parameters ?? Array.Empty()); return DoAction(response, () => { using DataInputStream inputStream = new DataInputStream(GetResponseStream(response)); @@ -107,7 +108,7 @@ public virtual void Release(string sessionId) { HttpResponseMessage response = ExecuteGet(ReplicationService.ReplicationAction.RELEASE.ToString(), ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionId); // do not remove this call: as it is still validating for us! - DoAction(response, () => null); + DoAction(response, () => null); } #region Async methods (IAsyncReplicator) @@ -120,15 +121,15 @@ public virtual void Release(string sessionId) /// /// A if updates are available; otherwise, null. /// - public async Task CheckForUpdateAsync(string currentVersion, CancellationToken cancellationToken = default) + public async Task CheckForUpdateAsync(string? currentVersion, CancellationToken cancellationToken = default) { - string[] parameters = currentVersion != null - ? new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion } + string[]? parameters = !string.IsNullOrEmpty(currentVersion) + ? new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion! } : null; using var response = await ExecuteGetAsync( ReplicationService.ReplicationAction.UPDATE.ToString(), - parameters, + parameters ?? Array.Empty(), cancellationToken: cancellationToken).ConfigureAwait(false); return await DoActionAsync(response, async () => @@ -201,3 +202,4 @@ await DoActionAsync(response, () => } } +#nullable restore diff --git a/src/Lucene.Net.Replicator/IAsyncReplicator.cs b/src/Lucene.Net.Replicator/IAsyncReplicator.cs index d1e689e657..af5ededa7b 100644 --- a/src/Lucene.Net.Replicator/IAsyncReplicator.cs +++ b/src/Lucene.Net.Replicator/IAsyncReplicator.cs @@ -1,3 +1,4 @@ +#nullable enable using System.IO; using System.Threading; using System.Threading.Tasks; @@ -15,6 +16,7 @@ public interface IAsyncReplicator /// /// Current version of the index. /// Cancellation token. + /// A if an update exists; otherwise, null. Task CheckForUpdateAsync(string currentVersion, CancellationToken cancellationToken = default); /// @@ -33,3 +35,4 @@ public interface IAsyncReplicator Task PublishAsync(IRevision revision, CancellationToken cancellationToken = default); } } +#nullable restore diff --git a/src/Lucene.Net.Replicator/ReplicationClient.cs b/src/Lucene.Net.Replicator/ReplicationClient.cs index 292118c490..99a1ef71dc 100644 --- a/src/Lucene.Net.Replicator/ReplicationClient.cs +++ b/src/Lucene.Net.Replicator/ReplicationClient.cs @@ -1,3 +1,4 @@ +#nullable enable using J2N; using J2N.Threading; using Lucene.Net.Diagnostics; @@ -49,7 +50,7 @@ public class ReplicationClient : IDisposable private class ReplicationThread : ThreadJob { private readonly long intervalMillis; - private readonly ReentrantLock updateLock; + private readonly SemaphoreSlim updateLock; private readonly Action doUpdate; private readonly Action handleUpdateException; @@ -64,7 +65,7 @@ private class ReplicationThread : ThreadJob /// A delegate to call to perform the update. /// A delegate to call to handle an exception. /// - public ReplicationThread(long intervalMillis, string threadName, Action doUpdate, Action handleUpdateException, ReentrantLock updateLock) + public ReplicationThread(long intervalMillis, string threadName, Action doUpdate, Action handleUpdateException, SemaphoreSlim updateLock) : base(threadName) { this.intervalMillis = intervalMillis; @@ -78,7 +79,7 @@ public override void Run() while (true) { long time = Time.NanoTime() / Time.MillisecondsPerNanosecond; - updateLock.Lock(); + updateLock.Wait(); try { doUpdate(); @@ -89,7 +90,7 @@ public override void Run() } finally { - updateLock.Unlock(); + updateLock.Release(); } time = Time.NanoTime() / Time.MillisecondsPerNanosecond - time; @@ -127,18 +128,18 @@ public override void Run() /// public const string INFO_STREAM_COMPONENT = "ReplicationThread"; - private readonly IReplicator replicator; - private readonly IAsyncReplicator asyncReplicator; + private readonly IReplicator? replicator; + private readonly IAsyncReplicator? asyncReplicator; private readonly IReplicationHandler handler; private readonly ISourceDirectoryFactory factory; private readonly byte[] copyBuffer = new byte[16384]; - private readonly ReentrantLock updateLock = new ReentrantLock(); + private readonly SemaphoreSlim updateLock = new SemaphoreSlim(1, 1); private readonly object syncLock = new object(); // LUCENENET specific to avoid lock (this) - private ReplicationThread updateThread; - private CancellationTokenSource asyncUpdateCts; - private Task asyncUpdateTask; + private ReplicationThread? updateThread; + private CancellationTokenSource? asyncUpdateCts; + private Task? asyncUpdateTask; private bool disposed = false; private InfoStream infoStream = InfoStream.Default; @@ -150,9 +151,9 @@ public override void Run() /// The for returning a for a given source and session public ReplicationClient(IReplicator replicator, IReplicationHandler handler, ISourceDirectoryFactory factory) { - this.replicator = replicator; - this.handler = handler; - this.factory = factory; + this.replicator = replicator ?? throw new ArgumentNullException(nameof(replicator)); + this.handler = handler ?? throw new ArgumentNullException(nameof(handler)); + this.factory = factory ?? throw new ArgumentNullException(nameof(factory)); } /// @@ -182,7 +183,9 @@ private void CopyBytes(IndexOutput output, Stream input) /// private void DoUpdate() { - SessionToken session = null; + if (replicator is null) throw new InvalidOperationException("Replicator not initialized."); + + SessionToken? session = null; Dictionary sourceDirectory = new Dictionary(); Dictionary> copiedFiles = new Dictionary>(); bool notify = false; @@ -202,7 +205,7 @@ private void DoUpdate() foreach (KeyValuePair> pair in requiredFiles) { string source = pair.Key; - Directory directory = factory.GetDirectory(session.Id, source); + Directory directory = factory.GetDirectory(session.Id!, source); sourceDirectory.Add(source, directory); IList cpFiles = new JCG.List(); @@ -216,11 +219,11 @@ private void DoUpdate() return; } - Stream input = null; - IndexOutput output = null; + Stream? input = null; + IndexOutput? output = null; try { - input = replicator.ObtainFile(session.Id, source, file.FileName); + input = replicator.ObtainFile(session.Id!, source, file.FileName); output = directory.CreateOutput(file.FileName, IOContext.DEFAULT); CopyBytes(output, input); @@ -243,7 +246,7 @@ private void DoUpdate() { try { - replicator.Release(session.Id); + replicator.Release(session.Id!); } finally { @@ -265,7 +268,7 @@ private void DoUpdate() if (notify && !disposed) { // no use to notify if we are closed already // LUCENENET specific - pass the copiedFiles as read only - handler.RevisionReady(session.Version, session.SourceFiles, Collections.AsReadOnly(copiedFiles), sourceDirectory); + handler.RevisionReady(session!.Version, session.SourceFiles, Collections.AsReadOnly(copiedFiles), sourceDirectory); } } finally @@ -308,7 +311,7 @@ private async Task DoUpdateAsync(CancellationToken cancellationToken) foreach (var pair in requiredFiles) { string source = pair.Key; - Directory directory = factory.GetDirectory(session.Id, source); + Directory directory = factory.GetDirectory(session.Id!, source); sourceDirectory.Add(source, directory); IList cpFiles = new J2N.Collections.Generic.List(); @@ -318,11 +321,11 @@ private async Task DoUpdateAsync(CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - Stream input = null; - IndexOutput output = null; + Stream? input = null; + IndexOutput? output = null; try { - input = await asyncReplicator.ObtainFileAsync(session.Id, source, file.FileName, cancellationToken).ConfigureAwait(false); + input = await asyncReplicator.ObtainFileAsync(session.Id!, source, file.FileName, cancellationToken).ConfigureAwait(false); output = directory.CreateOutput(file.FileName, IOContext.DEFAULT); int numBytes; @@ -349,7 +352,7 @@ private async Task DoUpdateAsync(CancellationToken cancellationToken) { try { - await asyncReplicator.ReleaseAsync(session.Id, cancellationToken).ConfigureAwait(false); + await asyncReplicator.ReleaseAsync(session.Id!, cancellationToken).ConfigureAwait(false); } finally { @@ -476,20 +479,20 @@ public void Dispose() /// will be set. /// /// if the thread has already been started - public virtual void StartUpdateThread(long intervalInMilliseconds, string threadName) + public virtual void StartUpdateThread(long intervalInMilliseconds, string? threadName) { UninterruptableMonitor.Enter(syncLock); try { EnsureOpen(); - if (updateThread != null && updateThread.IsAlive) + if (updateThread is not null && updateThread.IsAlive) throw IllegalStateException.Create("cannot start an update thread when one is running, must first call 'stopUpdateThread()'"); threadName = threadName is null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName; updateThread = new ReplicationThread(intervalInMilliseconds, threadName, DoUpdate, HandleUpdateException, updateLock); updateThread.Start(); // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side - if (Debugging.AssertsEnabled) Debugging.Assert(updateThread.IsAlive, "updateThread started but not alive?"); + if (Debugging.AssertsEnabled) Debugging.Assert(updateThread?.IsAlive == true, "updateThread started but not alive?"); } finally { @@ -506,7 +509,7 @@ public virtual void StopUpdateThread() UninterruptableMonitor.Enter(syncLock); try { - if (updateThread != null) + if (updateThread is not null) { // this will trigger the thread to terminate if it awaits the lock. // otherwise, if it's in the middle of replication, we wait for it to @@ -544,7 +547,7 @@ public virtual bool IsUpdateThreadAlive UninterruptableMonitor.Enter(syncLock); try { - return updateThread != null && updateThread.IsAlive; + return updateThread is not null && updateThread.IsAlive; } finally { @@ -558,7 +561,7 @@ public virtual bool IsUpdateThreadAlive /// /// Interval between updates. /// Optional name for logging purposes. - public virtual void StartAsyncUpdateLoop(long intervalInMilliseconds, string threadName = null) + public virtual void StartAsyncUpdateLoop(long intervalInMilliseconds, string? threadName = null) { UninterruptableMonitor.Enter(syncLock); try @@ -579,14 +582,14 @@ public virtual void StartAsyncUpdateLoop(long intervalInMilliseconds, string thr { try { - updateLock.Lock(); + await updateLock.WaitAsync(ct).ConfigureAwait(false); try { await DoUpdateAsync(ct).ConfigureAwait(false); } finally { - updateLock.Unlock(); + updateLock.Release(); } } catch (OperationCanceledException) @@ -684,14 +687,14 @@ public virtual void UpdateNow() EnsureOpen(); //NOTE: We don't have a worker running, so we just do the work. - updateLock.Lock(); + updateLock.Wait(); try { DoUpdate(); } finally { - updateLock.Unlock(); + updateLock.Release(); } } @@ -703,14 +706,14 @@ public virtual async Task UpdateNowAsync(CancellationToken cancellationToken = d EnsureOpen(); // Acquire the same update lock to prevent concurrent updates - updateLock.Lock(); + await updateLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { await DoUpdateAsync(cancellationToken).ConfigureAwait(false); } finally { - updateLock.Unlock(); + updateLock.Release(); } } @@ -763,12 +766,13 @@ public interface ISourceDirectoryFactory /// /// /// - Directory GetDirectory(string sessionId, string source); //throws IOException; + Directory GetDirectory(string? sessionId, string source); //throws IOException; /// /// Called to denote that the replication actions for this session were finished and the directory is no longer needed. /// /// - void CleanupSession(string sessionId); + void CleanupSession(string? sessionId); } } +#nullable restore From 8135524fa0f3453d4793256278bafcde760e6b65 Mon Sep 17 00:00:00 2001 From: Nehan Pathan Date: Thu, 11 Sep 2025 15:42:53 +0530 Subject: [PATCH 3/5] Add FEATURE_HTTPCONTENT_READASSTREAM and FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN defines for .NET 8+ and fix all warnings --- Directory.Build.targets | 133 +++++++++++++++++++++++++++------------- 1 file changed, 90 insertions(+), 43 deletions(-) diff --git a/Directory.Build.targets b/Directory.Build.targets index efbd8089f3..1b65179e25 100644 --- a/Directory.Build.targets +++ b/Directory.Build.targets @@ -27,7 +27,8 @@ - $(AlternatePublishRootDirectory)/$(TargetFramework)/$(MSBuildProjectName)/ + + $(AlternatePublishRootDirectory)/$(TargetFramework)/$(MSBuildProjectName)/ @@ -38,7 +39,8 @@ - + $(DefineConstants);FEATURE_ASPNETCORE_TESTHOST $(DefineConstants);FEATURE_UTF8_TOUTF16 @@ -46,7 +48,8 @@ - + $(DefineConstants);FEATURE_RANDOM_NEXTINT64_NEXTSINGLE $(DefineConstants);FEATURE_SPANFORMATTABLE @@ -55,22 +58,26 @@ - + $(DefineConstants);FEATURE_READONLYSET - + $(DefineConstants);FEATURE_PROCESS_KILL_ENTIREPROCESSTREE $(DefineConstants);FEATURE_STRING_CONCAT_READONLYSPAN - - + + $(DefineConstants);NETSTANDARD $(DefineConstants);FEATURE_CULTUREINFO_CURRENTCULTURE_SETTER @@ -78,9 +85,16 @@ portable - - - + + + $(DefineConstants);FEATURE_HTTPCONTENT_READASSTREAM + $(DefineConstants);FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN + + + $(DefineConstants);FEATURE_ARRAY_FILL $(DefineConstants);FEATURE_CONDITIONALWEAKTABLE_ENUMERATOR @@ -92,22 +106,28 @@ - - + + $(DefineConstants);FEATURE_STRING_CONTAINS_STRINGCOMPARISON - - + + $(DefineConstants);FEATURE_ICONFIGURATIONROOT_PROVIDERS - - + + $(DefineConstants);FEATURE_ASSEMBLY_GETCALLINGASSEMBLY $(DefineConstants);FEATURE_FILESTREAM_LOCK @@ -116,14 +136,17 @@ - - + + $(DefineConstants);FEATURE_SERIALIZABLE_EXCEPTIONS $(DefineConstants);FEATURE_SERIALIZABLE - + @@ -132,7 +155,8 @@ - + $(DefineConstants);FEATURE_ICONFIGURATIONROOT_PROVIDERS @@ -142,7 +166,8 @@ $(DefineConstants);NETFRAMEWORK - $(DefineConstants);FEATURE_CODE_ACCESS_SECURITY $(DefineConstants);FEATURE_MEMORYMAPPEDFILESECURITY @@ -152,9 +177,12 @@ full - - - + + + $(DefineConstants);FEATURE_OPENNLP @@ -163,34 +191,45 @@ + https://docs.microsoft.com/en-us/nuget/create-packages/symbol-packages-snupkg#nugetorg-symbol-package-constraints --> portable - + <_Parameter1>%(InternalsVisibleTo.Identity) - <_Parameter1 Condition=" '$(SignAssembly)' == 'true' And '$(PublicKey)' != '' ">%(InternalsVisibleTo.Identity), PublicKey=$(PublicKey) + <_Parameter1 Condition=" '$(SignAssembly)' == 'true' And '$(PublicKey)' != '' ">%(InternalsVisibleTo.Identity), + PublicKey=$(PublicKey) - + - true - $(TargetFramework) + + true + + $(TargetFramework) - $(TargetFrameworks) + + $(TargetFrameworks) none - + - @@ -201,7 +240,8 @@ - + @@ -209,19 +249,26 @@ - - + + - - + + - - + + - + \ No newline at end of file From 42e3d32b1c942c3e437b1cf8fcaf733c7c295712 Mon Sep 17 00:00:00 2001 From: Nehan Pathan Date: Mon, 6 Oct 2025 20:59:56 +0530 Subject: [PATCH 4/5] remove extra trailing --- src/Lucene.Net.Replicator/Http/package.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Lucene.Net.Replicator/Http/package.md b/src/Lucene.Net.Replicator/Http/package.md index fd9797b387..0fa6191975 100644 --- a/src/Lucene.Net.Replicator/Http/package.md +++ b/src/Lucene.Net.Replicator/Http/package.md @@ -4,7 +4,7 @@ summary: *content --- - -Provides index files replication capabilities. \ No newline at end of file +Provides index files replication capabilities.