Skip to content

Commit

Permalink
Add CancellationToken support
Browse files Browse the repository at this point in the history
  • Loading branch information
arthrp committed Nov 4, 2024
1 parent 867c5ad commit 5b68b09
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 60 deletions.
6 changes: 4 additions & 2 deletions RqliteDotnet.Test/RqliteClientTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using RqliteDotnet.Dto;
Expand Down Expand Up @@ -41,7 +42,8 @@ public async Task ParametrizedQueryWithGenerics_Works()
var client = HttpClientMock.GetParamQueryMock();

var rqClient = new RqliteOrmClient("http://localhost:6000", client);
var queryresults = await rqClient.QueryParams<NamedQueryParameter, FooResultDto>("select * from foo where Name = :name",
var queryresults = await rqClient.QueryParams<NamedQueryParameter, FooResultDto>("select * from foo where Name = :name",
default(CancellationToken),
new NamedQueryParameter()
{
Name = "name",
Expand Down Expand Up @@ -92,7 +94,7 @@ public async Task BasicQueryParam_Works()
var client = HttpClientMock.GetParamQueryMock();

var rqClient = new RqliteClient("http://localhost:6000", client);
var result = await rqClient.QueryParams<QueryParameter>("select * from foo where name = ?", new QueryParameter()
var result = await rqClient.QueryParams<QueryParameter>("select * from foo where name = ?", default(CancellationToken), new QueryParameter()
{
ParamType = QueryParamType.String, Value = "john"
});
Expand Down
6 changes: 3 additions & 3 deletions RqliteDotnet/HttpClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ namespace RqliteDotnet;

public static class HttpClientExtensions
{
public static async Task<T> SendTyped<T>(this HttpClient client, HttpRequestMessage request)
public static async Task<T> SendTyped<T>(this HttpClient client, HttpRequestMessage request, CancellationToken cancellationToken = default)
{
var response = await client.SendAsync(request);
var content = await response.Content.ReadAsStringAsync();
var response = await client.SendAsync(request, cancellationToken);
var content = await response.Content.ReadAsStringAsync(cancellationToken);

response.EnsureSuccessStatusCode();

Expand Down
22 changes: 15 additions & 7 deletions RqliteDotnet/IRqliteClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,49 @@ public interface IRqliteClient
/// Ping Rqlite instance
/// </summary>
/// <returns>String containining Rqlite version</returns>
Task<string> Ping();
Task<string> Ping(CancellationToken cancellationToken);

/// <summary>
/// Query DB and return result
/// </summary>
/// <param name="query"></param>
Task<QueryResults> Query(string query);
/// <param name="query">Query to run</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<QueryResults> Query(string query, CancellationToken cancellationToken);

/// <summary>
/// Execute command and return result
/// </summary>
Task<ExecuteResults> Execute(string command);
/// <param name="command">Command to execute</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<ExecuteResults> Execute(string command, CancellationToken cancellationToken);

/// <summary>
/// Execute one or several commands and return result
/// </summary>
/// <param name="commands">Commands to execute</param>
/// <param name="flags">Command flags, e.g. whether to use transaction</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<ExecuteResults> Execute(IEnumerable<string> commands, DbFlag? flags);
Task<ExecuteResults> Execute(IEnumerable<string> commands, DbFlag? flags, CancellationToken cancellationToken);

/// <summary>
/// Execute one or several commands and return result
/// </summary>
/// <param name="commands">Commands to execute</param>
/// <param name="flags">Command flags, e.g. whether to use transaction</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<ExecuteResults> ExecuteParams<T>(IEnumerable<(string, T[])> commands, DbFlag? flags) where T : QueryParameter;
Task<ExecuteResults> ExecuteParams<T>(IEnumerable<(string, T[])> commands, DbFlag? flags, CancellationToken cancellationToken) where T : QueryParameter;

/// <summary>
/// Query DB using parametrized statement
/// </summary>
/// <param name="query"></param>
/// <param name="cancellationToken"></param>
/// <param name="qps"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<QueryResults> QueryParams<T>(string query, params T[] qps) where T: QueryParameter;
Task<QueryResults> QueryParams<T>(string query, CancellationToken cancellationToken, params T[] qps) where T: QueryParameter;
}
66 changes: 21 additions & 45 deletions RqliteDotnet/RqliteClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,96 +23,72 @@ public RqliteClient(HttpClient client)
{
_httpClient = client ?? throw new ArgumentNullException(nameof(client));
}

/// <summary>
/// Ping Rqlite instance
/// </summary>
/// <returns>String containining Rqlite version</returns>
public async Task<string> Ping()

/// <inheritdoc />
public async Task<string> Ping(CancellationToken cancellationToken = default)
{
var x = await _httpClient.GetAsync("/status");
var x = await _httpClient.GetAsync("/status", cancellationToken);

return x.Headers.GetValues("X-Rqlite-Version").FirstOrDefault()!;
}

/// <summary>
/// Query DB and return result
/// </summary>
/// <param name="query"></param>
public async Task<QueryResults> Query(string query)

/// <inheritdoc />
public async Task<QueryResults> Query(string query, CancellationToken cancellationToken = default)
{
var data = "&q=" + Uri.EscapeDataString(query);
var baseUrl = "/db/query?timings";

var r = await _httpClient.GetAsync($"{baseUrl}&{data}");
var str = await r.Content.ReadAsStringAsync();
var r = await _httpClient.GetAsync($"{baseUrl}&{data}", cancellationToken);
var str = await r.Content.ReadAsStringAsync(cancellationToken);

var result = JsonSerializer.Deserialize<QueryResults>(str, new JsonSerializerOptions() { PropertyNameCaseInsensitive = true });
return result;

Check warning on line 45 in RqliteDotnet/RqliteClient.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference return.

Check warning on line 45 in RqliteDotnet/RqliteClient.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference return.
}

/// <summary>
/// Execute command and return result
/// </summary>
public async Task<ExecuteResults> Execute(string command)
/// <inheritdoc />
public async Task<ExecuteResults> Execute(string command, CancellationToken cancellationToken = default)
{
var request = new HttpRequestMessage(HttpMethod.Post, "/db/execute?timings");
request.Content = new StringContent($"[\"{command}\"]", Encoding.UTF8, "application/json");

var result = await _httpClient.SendTyped<ExecuteResults>(request);
var result = await _httpClient.SendTyped<ExecuteResults>(request, cancellationToken);
return result;
}

/// <summary>
/// Execute one or several commands and return result
/// </summary>
/// <param name="commands">Commands to execute</param>
/// <param name="flags">Command flags, e.g. whether to use transaction</param>
/// <returns></returns>
public async Task<ExecuteResults> Execute(IEnumerable<string> commands, DbFlag? flags)
/// <inheritdoc />
public async Task<ExecuteResults> Execute(IEnumerable<string> commands, DbFlag? flags, CancellationToken cancellationToken = default)
{
var parameters = GetParameters(flags);
var request = new HttpRequestMessage(HttpMethod.Post, $"/db/execute{parameters}");
commands = commands.Select(c => $"\"{c}\"");
var s = string.Join(",", commands);

request.Content = new StringContent($"[{s}]", Encoding.UTF8, "application/json");
var result = await _httpClient.SendTyped<ExecuteResults>(request);
var result = await _httpClient.SendTyped<ExecuteResults>(request, cancellationToken);
return result;
}

/// <summary>
/// Execute one or several commands and return result
/// </summary>
/// <param name="commands">Commands to execute</param>
/// <param name="flags">Command flags, e.g. whether to use transaction</param>
/// <returns></returns>
public async Task<ExecuteResults> ExecuteParams<T>(IEnumerable<(string, T[])> commands, DbFlag? flags) where T : QueryParameter
/// <inheritdoc />
public async Task<ExecuteResults> ExecuteParams<T>(IEnumerable<(string, T[])> commands, DbFlag? flags, CancellationToken cancellationToken = default) where T : QueryParameter
{
var parameters = GetParameters(flags);
var request = new HttpRequestMessage(HttpMethod.Post, $"/db/execute{parameters}");
var compiled = commands.Select(c => $"{BuildQuery(c.Item1, c.Item2)}");
var s = string.Join(",", compiled);

request.Content = new StringContent($"[{s}]", Encoding.UTF8, "application/json");
var result = await _httpClient.SendTyped<ExecuteResults>(request);
var result = await _httpClient.SendTyped<ExecuteResults>(request, cancellationToken);
return result;
}

/// <summary>
/// Query DB using parametrized statement
/// </summary>
/// <param name="query"></param>
/// <param name="qps"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public async Task<QueryResults> QueryParams<T>(string query, params T[] qps) where T : QueryParameter
/// <inheritdoc />
public async Task<QueryResults> QueryParams<T>(string query, CancellationToken cancellationToken = default, params T[] qps) where T : QueryParameter
{
var request = new HttpRequestMessage(HttpMethod.Post, "/db/query?timings");
var q = BuildQuery(query, qps);

request.Content = new StringContent($"[{q}]", Encoding.UTF8, "application/json");
var result = await _httpClient.SendTyped<QueryResults>(request);
var result = await _httpClient.SendTyped<QueryResults>(request, cancellationToken);

return result;
}
Expand Down
6 changes: 3 additions & 3 deletions RqliteDotnet/RqliteOrmClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IRqliteOrmClient : IRqliteClient
/// <returns></returns>
Task<List<T>> Query<T>(string query) where T: new();

Task<List<U>> QueryParams<T, U>(string query, params T[] qps)
Task<List<U>> QueryParams<T, U>(string query, CancellationToken cancellationToken, params T[] qps)
where T: QueryParameter
where U : new();
}
Expand Down Expand Up @@ -59,11 +59,11 @@ public RqliteOrmClient(string uri, HttpClient? client = null) : base(uri, client
return list;
}

public async Task<List<U>> QueryParams<T, U>(string query, params T[] qps)
public async Task<List<U>> QueryParams<T, U>(string query, CancellationToken cancellationToken, params T[] qps)
where T : QueryParameter
where U : new()
{
var response = await QueryParams(query, qps);
var response = await QueryParams(query, cancellationToken, qps);
if (response.Results!.Count > 1)
throw new DataException("Query returned more than 1 result. At the moment only 1 result supported");
var res = response.Results[0];
Expand Down

0 comments on commit 5b68b09

Please sign in to comment.