Skip to content

Commit

Permalink
Add tests for Commit/Rollback (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
victor-sushko committed Aug 2, 2023
1 parent f801f6d commit ecf5418
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 28 deletions.
157 changes: 157 additions & 0 deletions src/Octonica.ClickHouseClient.Tests/ClickHouseColumnWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
#endregion

using Octonica.ClickHouseClient.Exceptions;
using Octonica.ClickHouseClient.Utils;
using System;
using System.Collections.Generic;
using System.Data;
Expand Down Expand Up @@ -1339,6 +1341,161 @@ async Task Test(ClickHouseConnection cn, string tableName)
}
}

[Fact]
public Task TransactionModeBlock()
{
return WithTemporaryTable("tran_block", "id Int32", Test);

async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct)
{
var list = MappedReadOnlyList<int, int>.Map(Enumerable.Range(0, 48).ToList(), i => i < 47 ? i : throw new IndexOutOfRangeException("Too long!"));
await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct))
{
// There will be three blocks in the list. The last block will produce an error, but first two blocks must be commited.
writer.MaxBlockSize = 16;

var ex = await Assert.ThrowsAnyAsync<ClickHouseHandledException>(() => writer.WriteTableAsync(new[] { list }, list.Count, ClickHouseTransactionMode.Block, ct));
Assert.NotNull(ex.InnerException);
Assert.IsType<IndexOutOfRangeException>(ex.InnerException);
}

var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id");
await using var reader = await cmd.ExecuteReaderAsync();
int expected = 0;

while (await reader.ReadAsync(ct))
{
var id = reader.GetInt32(0);
Assert.Equal(expected++, id);
}

Assert.Equal(32, expected);
}
}

[Fact]
public Task TransactionModeManual()
{
return WithTemporaryTable("tran_manual", "id Int32", Test);

static async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct)
{
await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct))
{
await writer.WriteTableAsync(new[] { Enumerable.Range(0, 10) }, 10, ClickHouseTransactionMode.Manual, ct);
await writer.RollbackAsync(ct);
await writer.WriteTableAsync(new[] { Enumerable.Range(10, 10) }, 10, ClickHouseTransactionMode.Manual, ct);
await writer.CommitAsync(ct);
await writer.WriteTableAsync(new[] { Enumerable.Range(20, 10) }, 10, ClickHouseTransactionMode.Manual, ct);
}

await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct))
{
await writer.WriteRowAsync(new object?[] { 18 }, false, ct);
await writer.WriteRowAsync(new object?[] { 19 }, false, ct);
await writer.RollbackAsync(ct);
await writer.WriteRowAsync(new object?[] { 20 }, false, ct);
await writer.WriteRowAsync(new object?[] { 21 }, true, ct);
await writer.CommitAsync(ct);
await writer.WriteRowAsync(new object?[] { 22 }, false, ct);
await writer.WriteRowAsync(new object?[] { 23 }, false, ct);
}

var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id");
await using var reader = await cmd.ExecuteReaderAsync();
int expected = 10;

while (await reader.ReadAsync(ct))
{
var id = reader.GetInt32(0);
Assert.Equal(expected++, id);
}

Assert.Equal(22, expected);
}
}

[Theory]
[InlineData(ClickHouseTransactionMode.Default)]
[InlineData(ClickHouseTransactionMode.Auto)]
public Task TransactionModeAuto(ClickHouseTransactionMode mode)
{
return WithTemporaryTable("tran_auto", "id Int32", Test);

async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct)
{
await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct))
{
await writer.WriteTableAsync(new[] { Enumerable.Range(0, 10) }, 10, mode, ct);
await writer.RollbackAsync(ct);
await writer.WriteTableAsync(new[] { Enumerable.Range(10, 10) }, 10, mode, ct);
await writer.CommitAsync(ct);
await writer.WriteTableAsync(new[] { Enumerable.Range(20, 10) }, 10, mode, ct);
}

await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct))
{
await writer.WriteRowAsync(new object?[] { 30 }, true, ct);
await writer.RollbackAsync(ct);
await writer.WriteRowAsync(new object?[] { 31 }, true, ct);
await writer.CommitAsync(ct);
await writer.WriteRowAsync(new object?[] { 32 }, true, ct);
}

var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id");
await using var reader = await cmd.ExecuteReaderAsync();
int expected = 0;

while (await reader.ReadAsync(ct))
{
var id = reader.GetInt32(0);
Assert.Equal(expected++, id);
}

Assert.Equal(33, expected);
}
}

[Fact]
public Task TransactionModeAutoBackwardCompatibility()
{
// Check that the default transaction mode is 'Auto' when not specified
return WithTemporaryTable("tran_auto_bc", "id Int32", Test);

async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct)
{
await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct))
{
await writer.WriteTableAsync(new[] { Enumerable.Range(0, 10) }, 10, ct);
await writer.RollbackAsync(ct);
await writer.WriteTableAsync(new[] { Enumerable.Range(10, 10) }, 10, ct);
await writer.CommitAsync(ct);
await writer.WriteTableAsync(new[] { Enumerable.Range(20, 10) }, 10, ct);
}

await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct))
{
await writer.WriteRowAsync(new object?[] { 30 }, ct);
await writer.RollbackAsync(ct);
await writer.WriteRowAsync(new object?[] { 31 }, ct);
await writer.CommitAsync(ct);
await writer.WriteRowAsync(new object?[] { 32 }, ct);
}

var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id");
await using var reader = await cmd.ExecuteReaderAsync();
int expected = 0;

while (await reader.ReadAsync(ct))
{
var id = reader.GetInt32(0);
Assert.Equal(expected++, id);
}

Assert.Equal(33, expected);
}
}

protected override string GetTempTableName(string tableNameSuffix)
{
return $"{TestTableName}_{tableNameSuffix}";
Expand Down
11 changes: 8 additions & 3 deletions src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#region License Apache 2.0
/* Copyright 2019-2022 Octonica
/* Copyright 2019-2023 Octonica
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,12 @@ public ClickHouseConnection OpenConnection(Action<ClickHouseConnectionStringBuil
return connection;
}

protected async Task WithTemporaryTable(string tableNameSuffix, string columns, Func<ClickHouseConnection, string, Task> runTest)
protected Task WithTemporaryTable(string tableNameSuffix, string columns, Func<ClickHouseConnection, string, Task> runTest)
{
return WithTemporaryTable(tableNameSuffix, columns, (cn, tableName, _) => runTest(cn, tableName));
}

protected async Task WithTemporaryTable(string tableNameSuffix, string columns, Func<ClickHouseConnection, string, CancellationToken, Task> runTest, CancellationToken ct = default)
{
var tableName = GetTempTableName(tableNameSuffix);
try
Expand All @@ -70,7 +75,7 @@ protected async Task WithTemporaryTable(string tableNameSuffix, string columns,
cmd = connection.CreateCommand($"CREATE TABLE {tableName}({columns}) ENGINE=Memory");
await cmd.ExecuteNonQueryAsync();

await runTest(connection, tableName);
await runTest(connection, tableName, ct);
}
finally
{
Expand Down
Loading

0 comments on commit ecf5418

Please sign in to comment.