From ecf5418dbe9884f29ec63181742575b88a169a15 Mon Sep 17 00:00:00 2001 From: Victor Sushko Date: Wed, 2 Aug 2023 19:34:00 +0500 Subject: [PATCH] Add tests for Commit/Rollback (#77) --- .../ClickHouseColumnWriterTests.cs | 157 ++++++++++++++++++ .../ClickHouseTestsBase.cs | 11 +- .../ClickHouseColumnWriter.cs | 105 +++++++++--- 3 files changed, 245 insertions(+), 28 deletions(-) diff --git a/src/Octonica.ClickHouseClient.Tests/ClickHouseColumnWriterTests.cs b/src/Octonica.ClickHouseClient.Tests/ClickHouseColumnWriterTests.cs index 13081b4..e3ebc16 100644 --- a/src/Octonica.ClickHouseClient.Tests/ClickHouseColumnWriterTests.cs +++ b/src/Octonica.ClickHouseClient.Tests/ClickHouseColumnWriterTests.cs @@ -15,6 +15,8 @@ */ #endregion +using Octonica.ClickHouseClient.Exceptions; +using Octonica.ClickHouseClient.Utils; using System; using System.Collections.Generic; using System.Data; @@ -1339,6 +1341,161 @@ async Task Test(ClickHouseConnection cn, string tableName) } } + [Fact] + public Task TransactionModeBlock() + { + return WithTemporaryTable("tran_block", "id Int32", Test); + + async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct) + { + var list = MappedReadOnlyList.Map(Enumerable.Range(0, 48).ToList(), i => i < 47 ? i : throw new IndexOutOfRangeException("Too long!")); + await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct)) + { + // There will be three blocks in the list. The last block will produce an error, but first two blocks must be commited. + writer.MaxBlockSize = 16; + + var ex = await Assert.ThrowsAnyAsync(() => writer.WriteTableAsync(new[] { list }, list.Count, ClickHouseTransactionMode.Block, ct)); + Assert.NotNull(ex.InnerException); + Assert.IsType(ex.InnerException); + } + + var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id"); + await using var reader = await cmd.ExecuteReaderAsync(); + int expected = 0; + + while (await reader.ReadAsync(ct)) + { + var id = reader.GetInt32(0); + Assert.Equal(expected++, id); + } + + Assert.Equal(32, expected); + } + } + + [Fact] + public Task TransactionModeManual() + { + return WithTemporaryTable("tran_manual", "id Int32", Test); + + static async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct) + { + await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct)) + { + await writer.WriteTableAsync(new[] { Enumerable.Range(0, 10) }, 10, ClickHouseTransactionMode.Manual, ct); + await writer.RollbackAsync(ct); + await writer.WriteTableAsync(new[] { Enumerable.Range(10, 10) }, 10, ClickHouseTransactionMode.Manual, ct); + await writer.CommitAsync(ct); + await writer.WriteTableAsync(new[] { Enumerable.Range(20, 10) }, 10, ClickHouseTransactionMode.Manual, ct); + } + + await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct)) + { + await writer.WriteRowAsync(new object?[] { 18 }, false, ct); + await writer.WriteRowAsync(new object?[] { 19 }, false, ct); + await writer.RollbackAsync(ct); + await writer.WriteRowAsync(new object?[] { 20 }, false, ct); + await writer.WriteRowAsync(new object?[] { 21 }, true, ct); + await writer.CommitAsync(ct); + await writer.WriteRowAsync(new object?[] { 22 }, false, ct); + await writer.WriteRowAsync(new object?[] { 23 }, false, ct); + } + + var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id"); + await using var reader = await cmd.ExecuteReaderAsync(); + int expected = 10; + + while (await reader.ReadAsync(ct)) + { + var id = reader.GetInt32(0); + Assert.Equal(expected++, id); + } + + Assert.Equal(22, expected); + } + } + + [Theory] + [InlineData(ClickHouseTransactionMode.Default)] + [InlineData(ClickHouseTransactionMode.Auto)] + public Task TransactionModeAuto(ClickHouseTransactionMode mode) + { + return WithTemporaryTable("tran_auto", "id Int32", Test); + + async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct) + { + await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct)) + { + await writer.WriteTableAsync(new[] { Enumerable.Range(0, 10) }, 10, mode, ct); + await writer.RollbackAsync(ct); + await writer.WriteTableAsync(new[] { Enumerable.Range(10, 10) }, 10, mode, ct); + await writer.CommitAsync(ct); + await writer.WriteTableAsync(new[] { Enumerable.Range(20, 10) }, 10, mode, ct); + } + + await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct)) + { + await writer.WriteRowAsync(new object?[] { 30 }, true, ct); + await writer.RollbackAsync(ct); + await writer.WriteRowAsync(new object?[] { 31 }, true, ct); + await writer.CommitAsync(ct); + await writer.WriteRowAsync(new object?[] { 32 }, true, ct); + } + + var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id"); + await using var reader = await cmd.ExecuteReaderAsync(); + int expected = 0; + + while (await reader.ReadAsync(ct)) + { + var id = reader.GetInt32(0); + Assert.Equal(expected++, id); + } + + Assert.Equal(33, expected); + } + } + + [Fact] + public Task TransactionModeAutoBackwardCompatibility() + { + // Check that the default transaction mode is 'Auto' when not specified + return WithTemporaryTable("tran_auto_bc", "id Int32", Test); + + async Task Test(ClickHouseConnection connection, string tableName, CancellationToken ct) + { + await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct)) + { + await writer.WriteTableAsync(new[] { Enumerable.Range(0, 10) }, 10, ct); + await writer.RollbackAsync(ct); + await writer.WriteTableAsync(new[] { Enumerable.Range(10, 10) }, 10, ct); + await writer.CommitAsync(ct); + await writer.WriteTableAsync(new[] { Enumerable.Range(20, 10) }, 10, ct); + } + + await using (var writer = await connection.CreateColumnWriterAsync($"INSERT INTO {tableName}(id) VALUES", ct)) + { + await writer.WriteRowAsync(new object?[] { 30 }, ct); + await writer.RollbackAsync(ct); + await writer.WriteRowAsync(new object?[] { 31 }, ct); + await writer.CommitAsync(ct); + await writer.WriteRowAsync(new object?[] { 32 }, ct); + } + + var cmd = connection.CreateCommand($"SELECT * FROM {tableName} ORDER BY id"); + await using var reader = await cmd.ExecuteReaderAsync(); + int expected = 0; + + while (await reader.ReadAsync(ct)) + { + var id = reader.GetInt32(0); + Assert.Equal(expected++, id); + } + + Assert.Equal(33, expected); + } + } + protected override string GetTempTableName(string tableNameSuffix) { return $"{TestTableName}_{tableNameSuffix}"; diff --git a/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs b/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs index fbfb6e4..6c49eb3 100644 --- a/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs +++ b/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2022 Octonica +/* Copyright 2019-2023 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,7 +57,12 @@ public ClickHouseConnection OpenConnection(Action runTest) + protected Task WithTemporaryTable(string tableNameSuffix, string columns, Func runTest) + { + return WithTemporaryTable(tableNameSuffix, columns, (cn, tableName, _) => runTest(cn, tableName)); + } + + protected async Task WithTemporaryTable(string tableNameSuffix, string columns, Func runTest, CancellationToken ct = default) { var tableName = GetTempTableName(tableNameSuffix); try @@ -70,7 +75,7 @@ protected async Task WithTemporaryTable(string tableNameSuffix, string columns, cmd = connection.CreateCommand($"CREATE TABLE {tableName}({columns}) ENGINE=Memory"); await cmd.ExecuteNonQueryAsync(); - await runTest(connection, tableName); + await runTest(connection, tableName, ct); } finally { diff --git a/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs b/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs index 5ddabfa..05faa62 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseColumnWriter.cs @@ -193,7 +193,7 @@ public int GetOrdinal(string name) /// Please note that the method always commits a transaction. No subsequent call of is required. public void WriteRow(params object?[] values) { - TaskHelper.WaitNonAsyncTask(WriteRow(values, commit: false, async: false, CancellationToken.None)); + TaskHelper.WaitNonAsyncTask(WriteRow(values, commit: true, async: false, CancellationToken.None)); } /// @@ -201,10 +201,10 @@ public void WriteRow(params object?[] values) /// /// The list of column values. /// A representing asyncronous operation. - /// Please note that the method is always commits a transaction. No subsequent call of is required. + /// Please note that the method always commits a transaction. No subsequent call of is required. public void WriteRow(IReadOnlyCollection values) { - TaskHelper.WaitNonAsyncTask(WriteRow(values, commit: false, async: false, CancellationToken.None)); + TaskHelper.WaitNonAsyncTask(WriteRow(values, commit: true, async: false, CancellationToken.None)); } /// @@ -226,10 +226,10 @@ public void WriteRow(IReadOnlyCollection values, bool commit) /// /// The list of column values. /// A representing asyncronous operation. - /// Please note that the method is always commits a transaction. No subsequent call of is required. + /// Please note that the method always commits a transaction. No subsequent call of is required. public async Task WriteRowAsync(IReadOnlyCollection values) { - await WriteRow(values, commit: false, async: true, CancellationToken.None); + await WriteRow(values, commit: true, async: true, CancellationToken.None); } @@ -253,10 +253,10 @@ public async Task WriteRowAsync(IReadOnlyCollection values, bool commit /// The list of column values. /// The cancellation instruction. /// A representing asyncronous operation. - /// Please note that the method is always commits a transaction. No subsequent call of is required. + /// Please note that the method always commits a transaction. No subsequent call of is required. public async Task WriteRowAsync(IReadOnlyCollection values, CancellationToken cancellationToken) { - await WriteRow(values, commit: false, async: true, cancellationToken); + await WriteRow(values, commit: true, async: true, cancellationToken); } /// @@ -571,7 +571,7 @@ private async ValueTask SendTable(ClickHouseTableWriter table, bool commit, bool await _session.SendTable(table, async, cancellationToken); if (commit) - await EndWrite(disposing: false, closeSession: false, async, cancellationToken); + await EndWrite(TerminationMode.Confirm, closeSession: false, async, cancellationToken); } catch (ClickHouseHandledException) { @@ -638,11 +638,11 @@ private async ValueTask RepeatQuery(bool async, CancellationToken cancellationTo { try { - await EndWrite(disposing: true, closeSession: true, async, cancellationToken); + await EndWrite(TerminationMode.Cancel, closeSession: true, async, cancellationToken); } - catch (Exception disposingEx) + catch (Exception cancellationEx) { - throw new AggregateException(ex, disposingEx); + throw new AggregateException(ex, cancellationEx); } var hEx = ClickHouseHandledException.Wrap(ex); @@ -660,7 +660,7 @@ private async ValueTask RepeatQuery(bool async, CancellationToken cancellationTo /// A subsequent writing operation will send a new INSERT query to the server. public void Commit() { - TaskHelper.WaitNonAsyncTask(EndWrite(disposing: false, closeSession: false, async: false, CancellationToken.None)); + TaskHelper.WaitNonAsyncTask(EndWrite(TerminationMode.Confirm, closeSession: false, async: false, CancellationToken.None)); } /// @@ -672,7 +672,7 @@ public void Commit() /// A subsequent writing operation will send a new INSERT query to the server. public async Task CommitAsync(CancellationToken cancellationToken) { - await EndWrite(disposing: false, closeSession: false, async: true, cancellationToken); + await EndWrite(TerminationMode.Confirm, closeSession: false, async: true, cancellationToken); } /// @@ -680,7 +680,28 @@ public async Task CommitAsync(CancellationToken cancellationToken) /// public void EndWrite() { - TaskHelper.WaitNonAsyncTask(EndWrite(disposing: false, closeSession: true, false, CancellationToken.None)); + TaskHelper.WaitNonAsyncTask(EndWrite(TerminationMode.Confirm, closeSession: true, false, CancellationToken.None)); + } + + /// + /// Notifies the server that non-commited rows shoud be discarded. This method takes an effect + /// only if the pervious operation was made in the mode. + /// + public void Rollback() + { + TaskHelper.WaitNonAsyncTask(EndWrite(TerminationMode.Cancel, closeSession: false, async: false, CancellationToken.None)); + } + + /// + /// Asyncronously notifies the server that non-commited rows shoud be discarded. This method takes an effect + /// only if the pervious operation was made in the mode. + /// + /// The cancellation instruction. + /// A representing asyncronous operation. + /// A subsequent writing operation will send a new INSERT query to the server. + public async Task RollbackAsync(CancellationToken cancellationToken) + { + await EndWrite(TerminationMode.Cancel, closeSession: false, async: true, cancellationToken); } /// @@ -690,13 +711,13 @@ public void EndWrite() /// A representing asyncronous operation. public async Task EndWriteAsync(CancellationToken cancellationToken) { - await EndWrite(disposing: false, closeSession: true, true, cancellationToken); + await EndWrite(TerminationMode.Confirm, closeSession: true, true, cancellationToken); } - private async ValueTask EndWrite(bool disposing, bool closeSession, bool async, CancellationToken cancellationToken) + private async ValueTask EndWrite(TerminationMode mode, bool closeSession, bool async, CancellationToken cancellationToken) { - // The session should not be in the open state after disposing - Debug.Assert(closeSession || !disposing); + // If the writer is dispesed the session should also be disposed + Debug.Assert(closeSession || mode != TerminationMode.Dispose); if (IsClosed) return; @@ -711,10 +732,21 @@ private async ValueTask EndWrite(bool disposing, bool closeSession, bool async, try { - if (disposing) - await _session.SendCancel(async); - else - await _session.SendTable(ClickHouseEmptyTableWriter.Instance, async, cancellationToken); + switch (mode) + { + case TerminationMode.None: + break; + case TerminationMode.Cancel: + case TerminationMode.Dispose: + await _session.SendCancel(async); + break; + case TerminationMode.Confirm: + await _session.SendTable(ClickHouseEmptyTableWriter.Instance, async, cancellationToken); + break; + default: + Debug.Fail($"Unexpected termination mode: {mode}."); + break; + } bool isProfileEvents; do @@ -734,7 +766,7 @@ private async ValueTask EndWrite(bool disposing, bool closeSession, bool async, // Connection state can't be resotred if the server raised an exception. // This error is probably caused by the wrong formatted data. var exception = ((ServerErrorMessage)message).Exception; - if (disposing) + if (mode == TerminationMode.Dispose) { await _session.SetFailed(exception, false, async); break; @@ -755,7 +787,7 @@ private async ValueTask EndWrite(bool disposing, bool closeSession, bool async, } catch (ClickHouseHandledException ex) { - if (!disposing) + if (mode != TerminationMode.Dispose) throw; // Connection state can't be restored @@ -790,7 +822,7 @@ public ValueTask DisposeAsync() private async ValueTask Dispose(bool async) { - await EndWrite(disposing: true, closeSession: true, async, CancellationToken.None); + await EndWrite(TerminationMode.Dispose, closeSession: true, async, CancellationToken.None); } internal static async ValueTask CreateColumnWriterFactory(ColumnInfo columnInfo, object? column, int columnIndex, int rowCount, ClickHouseColumnSettings? settings, bool async, CancellationToken cancellationToken) @@ -1397,5 +1429,28 @@ public IClickHouseColumnWriter Dispatch() return _columnInfo.TypeInfo.CreateColumnWriter(_columnInfo.Name, rows, _columnSettings); } } + + private enum TerminationMode + { + /// + /// Send nothing and wait for EndOfStream + /// + None = 0, + + /// + /// Send Cancel and wait for EndOfStream + /// + Cancel = 1, + + /// + /// Send confirmation message (completely empty talbe) and wait for EndOfStream + /// + Confirm = 2, + + /// + /// Send Cancel and then release all resources associated with the writer + /// + Dispose = 3 + } } }