From bd68dbaa0cd73eb67bcaf05232ee2ec2029ffc83 Mon Sep 17 00:00:00 2001 From: victor-sushko Date: Sun, 27 Mar 2022 22:22:27 +0500 Subject: [PATCH] Add support for passing parameters to the query as constant literals (#49) --- src/ConnectionSettingsHelper.cs | 14 +- .../ClickHouseCommandTests.cs | 6 +- .../ClickHouseConnectionStringBuilderTests.cs | 11 +- .../ClickHouseParameterTests.cs | 5 +- .../ClickHouseTestsBase.cs | 14 +- .../ClickHouseCommand.cs | 53 +- .../ClickHouseConnection.cs | 2042 +++++++++-------- .../ClickHouseConnectionSettings.cs | 8 +- .../ClickHouseConnectionStringBuilder.cs | 23 +- .../ClickHouseParameter.cs | 18 +- .../ClickHouseParameterMode.cs | 56 + 11 files changed, 1201 insertions(+), 1049 deletions(-) create mode 100644 src/Octonica.ClickHouseClient/ClickHouseParameterMode.cs diff --git a/src/ConnectionSettingsHelper.cs b/src/ConnectionSettingsHelper.cs index 2195f40..43ff203 100644 --- a/src/ConnectionSettingsHelper.cs +++ b/src/ConnectionSettingsHelper.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2020-2021 Octonica +/* Copyright 2020-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,17 +22,17 @@ namespace Octonica.ClickHouseClient { internal static class ConnectionSettingsHelper { - public static ClickHouseConnectionSettings GetConnectionSettings() + public static ClickHouseConnectionSettings GetConnectionSettings(Action? updateSettings = null) { - return GetConnectionSettingsInternal().settings; + return GetConnectionSettingsInternal(updateSettings).settings; } - public static string GetConnectionString() + public static string GetConnectionString(Action? updateSettings = null) { - return GetConnectionSettingsInternal().connectionString; + return GetConnectionSettingsInternal(updateSettings).connectionString; } - private static (ClickHouseConnectionSettings settings, string connectionString) GetConnectionSettingsInternal() + private static (ClickHouseConnectionSettings settings, string connectionString) GetConnectionSettingsInternal(Action? updateSettings) { const string envVariableName = "CLICKHOUSE_TEST_CONNECTION"; const string configFileName = "clickHouse.dbconfig"; @@ -44,6 +44,7 @@ private static (ClickHouseConnectionSettings settings, string connectionString) try { var builder = new ClickHouseConnectionStringBuilder(configTextFromEnvVar); + updateSettings?.Invoke(builder); return (builder.BuildSettings(), configTextFromEnvVar); } catch (Exception ex) @@ -65,6 +66,7 @@ private static (ClickHouseConnectionSettings settings, string connectionString) try { var builder = new ClickHouseConnectionStringBuilder(configText); + updateSettings?.Invoke(builder); return (builder.BuildSettings(), configText); } catch (Exception ex) diff --git a/src/Octonica.ClickHouseClient.Tests/ClickHouseCommandTests.cs b/src/Octonica.ClickHouseClient.Tests/ClickHouseCommandTests.cs index ca0be09..70127e2 100644 --- a/src/Octonica.ClickHouseClient.Tests/ClickHouseCommandTests.cs +++ b/src/Octonica.ClickHouseClient.Tests/ClickHouseCommandTests.cs @@ -471,7 +471,7 @@ public async Task InsertWithParameters() { try { - await using var connection = await OpenConnectionAsync(); + await using var connection = await OpenConnectionAsync(builder => builder.ParametersMode = ClickHouseParameterMode.Interpolate); var cmd = connection.CreateCommand("DROP TABLE IF EXISTS insert_with_parameters_test"); await cmd.ExecuteNonQueryAsync(); @@ -505,7 +505,7 @@ public async Task DeleteWithParameters() { try { - await using var connection = await OpenConnectionAsync(); + await using var connection = await OpenConnectionAsync(builder => builder.ParametersMode = ClickHouseParameterMode.Interpolate); var cmd = connection.CreateCommand("DROP TABLE IF EXISTS delete_with_parameters_test"); await cmd.ExecuteNonQueryAsync(); @@ -548,6 +548,7 @@ public async Task UpdateWithParameters() p.ParameterName = "str_param"; var insertedValue = "IZyy8d\\'\"\n\t\v\b\rLsVeTtdfk6MjJl"; p.Value = insertedValue; + ((ClickHouseParameter)p).ParameterMode = ClickHouseParameterMode.Interpolate; cmd.Parameters.Add(p); await cmd.ExecuteNonQueryAsync(); // Assert: Not Throws @@ -566,6 +567,7 @@ public async Task SelectWithOffsetLimitParameters() await using var connection = await OpenConnectionAsync(); await using var cmd = connection.CreateCommand("select cast(42 as UInt64) limit @Limit offset @Offset"); + cmd.ParametersMode = ClickHouseParameterMode.Interpolate; var p_limit = cmd.CreateParameter(); var p_offset = cmd.CreateParameter(); p_limit.ParameterName = "Limit"; diff --git a/src/Octonica.ClickHouseClient.Tests/ClickHouseConnectionStringBuilderTests.cs b/src/Octonica.ClickHouseClient.Tests/ClickHouseConnectionStringBuilderTests.cs index cdfa9ab..0014952 100644 --- a/src/Octonica.ClickHouseClient.Tests/ClickHouseConnectionStringBuilderTests.cs +++ b/src/Octonica.ClickHouseClient.Tests/ClickHouseConnectionStringBuilderTests.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -131,6 +131,9 @@ public void Default() Assert.True(settings.ServerCertificateHash.IsEmpty); ++checkedPropertiesCount; + Assert.Equal(ClickHouseConnectionStringBuilder.DefaultParametersMode, settings.ParametersMode); + ++checkedPropertiesCount; + Assert.Equal(checkedPropertiesCount, settings.GetType().GetProperties().Length); } @@ -151,7 +154,8 @@ public void Clone() "CommandTimeout=123;" + "TLSMode=rEqUIrE;" + "RootCertificate=/usr/local/share/ca-certificates/Yandex/YandexInternalRootCA.pem;" + - "ServerCertificateHash=1234-5678 9abc-def0"); + "ServerCertificateHash=1234-5678 9abc-def0;" + + "ParametersMode=Interpolate"); var settings = builder.BuildSettings(); @@ -202,6 +206,9 @@ public void Clone() Assert.Equal(new byte[] { 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0 }, settings.ServerCertificateHash.ToArray()); ++checkedPropertiesCount; + Assert.Equal(ClickHouseParameterMode.Interpolate, settings.ParametersMode); + ++checkedPropertiesCount; + Assert.Equal(checkedPropertiesCount, settings.GetType().GetProperties().Length); } diff --git a/src/Octonica.ClickHouseClient.Tests/ClickHouseParameterTests.cs b/src/Octonica.ClickHouseClient.Tests/ClickHouseParameterTests.cs index acd5323..09282ae 100644 --- a/src/Octonica.ClickHouseClient.Tests/ClickHouseParameterTests.cs +++ b/src/Octonica.ClickHouseClient.Tests/ClickHouseParameterTests.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2021 Octonica +/* Copyright 2021-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,7 +50,7 @@ static ClickHouseParameterTests() [Fact] public void Clone() { - var p1 = new ClickHouseParameter("p1") { Value = new[] { 42 }, ClickHouseDbType = ClickHouseDbType.VarNumeric, Precision = 19, Scale = 7 }; + var p1 = new ClickHouseParameter("p1") { Value = new[] { 42 }, ClickHouseDbType = ClickHouseDbType.VarNumeric, Precision = 19, Scale = 7, ParameterMode = ClickHouseParameterMode.Interpolate }; var p2 = p1.Clone(); @@ -61,6 +61,7 @@ public void Clone() Assert.Equal(ClickHouseDbType.VarNumeric, p2.ClickHouseDbType); Assert.Equal(19, p2.Precision); Assert.Equal(7, p2.Scale); + Assert.Equal(ClickHouseParameterMode.Interpolate, p2.ParameterMode); p2.TimeZone = TimeZoneInfo.Local; p2.Size = 35; diff --git a/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs b/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs index ab9010b..fbfb6e4 100644 --- a/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs +++ b/src/Octonica.ClickHouseClient.Tests/ClickHouseTestsBase.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,14 +25,14 @@ public abstract class ClickHouseTestsBase { private ClickHouseConnectionSettings? _settings; - public ClickHouseConnectionSettings GetDefaultConnectionSettings() + public ClickHouseConnectionSettings GetDefaultConnectionSettings(Action? updateSettings = null) { if (_settings != null) { return _settings; } - _settings = ConnectionSettingsHelper.GetConnectionSettings(); + _settings = ConnectionSettingsHelper.GetConnectionSettings(updateSettings); return _settings; } @@ -44,14 +44,14 @@ public async Task OpenConnectionAsync(ClickHouseConnection return connection; } - public async Task OpenConnectionAsync() + public async Task OpenConnectionAsync(Action? updateSettings = null) { - return await OpenConnectionAsync(GetDefaultConnectionSettings(), CancellationToken.None); + return await OpenConnectionAsync(GetDefaultConnectionSettings(updateSettings), CancellationToken.None); } - public ClickHouseConnection OpenConnection() + public ClickHouseConnection OpenConnection(Action? updateSettings = null) { - ClickHouseConnection connection = new ClickHouseConnection(GetDefaultConnectionSettings()); + ClickHouseConnection connection = new ClickHouseConnection(GetDefaultConnectionSettings(updateSettings)); connection.Open(); return connection; diff --git a/src/Octonica.ClickHouseClient/ClickHouseCommand.cs b/src/Octonica.ClickHouseClient/ClickHouseCommand.cs index ff2b5b3..4bef407 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseCommand.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseCommand.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -153,6 +153,12 @@ protected override DbTransaction? DbTransaction /// if the data reader should return profile events as a recordset. The default value is . public bool IgnoreProfileEvents { get; set; } = true; + /// + /// Gets or sets the mode of passing parameters to the query. The value of this property overrides . + /// + /// The mode of passing parameters to the query. The default value is . + public ClickHouseParameterMode ParametersMode { get; set; } = ClickHouseParameterMode.Inherit; + /// /// Creates a new instance of . /// @@ -818,6 +824,15 @@ private TimeSpan GetCommandTimeout(ClickHouseConnection? connection) return _commandTimeout ?? connection?.CommandTimeSpan ?? TimeSpan.FromSeconds(ClickHouseConnectionStringBuilder.DefaultCommandTimeout); } + private ClickHouseParameterMode GetParametersMode(ClickHouseConnection? connection) + { + var mode = ParametersMode; + if (mode != ClickHouseParameterMode.Inherit) + return mode; + + return connection?.ParametersMode ?? ClickHouseParameterMode.Default; + } + private IClickHouseTableWriter CreateParameterTableWriter(IClickHouseTypeInfoProvider typeInfoProvider, string tableName) { return new ClickHouseTableWriter(tableName, 1, Parameters.Select(p => p.CreateParameterColumnWriter(typeInfoProvider))); @@ -856,6 +871,7 @@ private string PrepareCommandText(IClickHouseTypeInfoProvider typeInfoProvider, } parameters = null; + var inheritParameterMode = GetParametersMode(Connection); var queryStringBuilder = new StringBuilder(query.Length); for (int i = 0; i < parameterPositions.Count; i++) { @@ -868,21 +884,34 @@ private string PrepareCommandText(IClickHouseTypeInfoProvider typeInfoProvider, if (!Parameters.TryGetValue(parameterName, out var parameter)) throw new ClickHouseException(ClickHouseErrorCodes.QueryParameterNotFound, $"Parameter \"{parameterName}\" not found."); - var specifiedType = typeSeparatorIdx >= 0 ? query.AsMemory().Slice(offset + typeSeparatorIdx + 1, length - typeSeparatorIdx - 2) : ReadOnlyMemory.Empty; - parameter.OutputParameterValue(queryStringBuilder, specifiedType, typeInfoProvider); + var parameterMode = parameter.GetParameterMode(inheritParameterMode); + switch (parameterMode) + { + case ClickHouseParameterMode.Interpolate: + var specifiedType = typeSeparatorIdx >= 0 ? query.AsMemory().Slice(offset + typeSeparatorIdx + 1, length - typeSeparatorIdx - 2) : ReadOnlyMemory.Empty; + parameter.OutputParameterValue(queryStringBuilder, specifiedType, typeInfoProvider); + break; - /* TODO: add parameter inline mode - if (!parameters.Contains(parameter.ParameterName)) - parameters.Add(parameter.ParameterName); + case ClickHouseParameterMode.Default: + case ClickHouseParameterMode.Binary: + if (parameters == null) + parameters = new HashSet(); - if (typeSeparatorIdx >= 0) - queryStringBuilder.Append("(CAST("); + if (!parameters.Contains(parameter.ParameterName)) + parameters.Add(parameter.ParameterName); - queryStringBuilder.Append("(SELECT ").Append(parametersTable).Append('.').Append(parameter.Id).Append(" FROM ").Append(parametersTable).Append(')'); + if (typeSeparatorIdx >= 0) + queryStringBuilder.Append("(CAST("); - if (typeSeparatorIdx >= 0) - queryStringBuilder.Append(" AS ").Append(query, offset + typeSeparatorIdx + 1, length - typeSeparatorIdx - 2).Append("))"); - */ + queryStringBuilder.Append("(SELECT ").Append(parametersTable).Append('.').Append(parameter.Id).Append(" FROM ").Append(parametersTable).Append(')'); + + if (typeSeparatorIdx >= 0) + queryStringBuilder.Append(" AS ").Append(query, offset + typeSeparatorIdx + 1, length - typeSeparatorIdx - 2).Append("))"); + break; + + default: + throw new ClickHouseException(ClickHouseErrorCodes.InternalError, $"Internal error. Unexpected parameter mode: {parameterMode}."); + } } var lastPartStart = parameterPositions[^1].offset + parameterPositions[^1].length; diff --git a/src/Octonica.ClickHouseClient/ClickHouseConnection.cs b/src/Octonica.ClickHouseClient/ClickHouseConnection.cs index b56bc66..57f3605 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseConnection.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseConnection.cs @@ -1,1013 +1,1029 @@ -#region License Apache 2.0 -/* Copyright 2019-2021 Octonica - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#endregion - -using System; -using System.Data; -using System.Data.Common; -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.IO; -using System.Net.Security; -using System.Net.Sockets; -using System.Security.Authentication; -using System.Security.Cryptography.X509Certificates; -using System.Threading; -using System.Threading.Tasks; -using System.Transactions; -using Octonica.ClickHouseClient.Exceptions; -using Octonica.ClickHouseClient.Protocol; -using Octonica.ClickHouseClient.Types; -using Octonica.ClickHouseClient.Utils; - -namespace Octonica.ClickHouseClient -{ - /// - /// Represents a connection to a ClickHouse database. This class cannot be inherited. - /// - public sealed class ClickHouseConnection : DbConnection - { - private const int MinBufferSize = 32; - - private readonly IClickHouseTypeInfoProvider? _typeInfoProvider; - - private ClickHouseConnectionState _connectionState; - - /// - /// Gets or sets the string used to open a connection to a ClickHouse database server. - /// - [AllowNull] - public override string ConnectionString - { - get - { - var state = _connectionState; - if (state.Settings == null) - return string.Empty; - - return new ClickHouseConnectionStringBuilder(state.Settings).ConnectionString; - } - set - { - var newSettings = value == null ? null : new ClickHouseConnectionStringBuilder(value).BuildSettings(); - var state = _connectionState; - while (true) - { - if (ReferenceEquals(state.Settings, newSettings)) - break; - - if (state.State != ConnectionState.Closed && state.State != ConnectionState.Broken) - throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection string can not be modified because the connection is active."); - - var newState = new ClickHouseConnectionState(state.State, state.TcpClient, newSettings, unchecked(state.Counter + 1)); - if (TryChangeConnectionState(state, newState, out state)) - break; - } - } - } - - /// - /// The connection doesn't support a timeout. This property always returns . - /// - /// . - public override int ConnectionTimeout => Timeout.Infinite; - - /// - /// Gets the name of the database specified in the connection settings. - /// - /// The name of the database specified in the connection settings. The default value is an empty string. - public override string Database => _connectionState.Settings?.Database ?? string.Empty; - - /// - /// Gets the name of the database server specified in the connection settings. - /// The name of the server contains the hostname and the port. If the port is equal to the default ClickHouse server port (9000) the - /// name of the server will conatin only hostname. - /// - /// The name of the database server specified in the connection settings. The default value is an empty string. - public override string DataSource - { - get - { - var state = _connectionState; - if (state.Settings == null) - return string.Empty; - - return state.Settings.Host + (state.Settings.Port != ClickHouseConnectionStringBuilder.DefaultPort ? ":" + state.Settings.Port : string.Empty); - } - } - - /// - /// When the connection is open gets the version of the ClickHouse database server. - /// - /// The version of the ClickHouse database server. The default value is an empty string. - public override string ServerVersion => _connectionState.TcpClient?.ServerInfo.Version.ToString() ?? string.Empty; - - /// - /// Gets the state of the connection. - /// - /// The state of the connection. - public override ConnectionState State => _connectionState.State; - - /// - /// Gets or sets the callback for custom validation of the server's certificate. When the callback is set - /// other TLS certificate validation options are ignored. - /// - public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; } - - internal TimeSpan? CommandTimeSpan - { - get - { - var commandTimeout = _connectionState.Settings?.CommandTimeout; - if (commandTimeout == null) - return null; - - return TimeSpan.FromSeconds(commandTimeout.Value); - } - } - - /// - /// Initializes a new instance of class. - /// - public ClickHouseConnection() - { - _connectionState = new ClickHouseConnectionState(); - } - - /// - /// Initializes a new instance of with the settings. - /// - /// The connection string. - /// Optional parameter. The provider of types for the connection. If the value is not specified the default type provider () will be used. - public ClickHouseConnection(string connectionString, IClickHouseTypeInfoProvider? typeInfoProvider = null) - : this(new ClickHouseConnectionStringBuilder(connectionString), typeInfoProvider) - { - } - - /// - /// Initializes a new instance of with the settings. - /// - /// The connection string builder which will be used for building the connection settings. - /// Optional parameter. The provider of types for the connection. If the value is not specified the default type provider () will be used. - public ClickHouseConnection(ClickHouseConnectionStringBuilder stringBuilder, IClickHouseTypeInfoProvider? typeInfoProvider = null) - { - if (stringBuilder == null) - throw new ArgumentNullException(nameof(stringBuilder)); - - var connectionSettings = stringBuilder.BuildSettings(); - - _connectionState = new ClickHouseConnectionState(ConnectionState.Closed, null, connectionSettings, 0); - _typeInfoProvider = typeInfoProvider; - } - - /// - /// Initializes a new instance of with the settings. - /// - /// The connection settings. - /// Optional parameter. The provider of types for the connection. If the value is not specified the default type provider () will be used. - public ClickHouseConnection(ClickHouseConnectionSettings connectionSettings, IClickHouseTypeInfoProvider? typeInfoProvider = null) - { - if (connectionSettings == null) - throw new ArgumentNullException(nameof(connectionSettings)); - - _connectionState = new ClickHouseConnectionState(ConnectionState.Closed, null, connectionSettings, 0); - _typeInfoProvider = typeInfoProvider; - } - - /// - /// Not supported. The database cannot be changed while the connection is open. - /// - /// Always throws . - public override void ChangeDatabase(string databaseName) - { - throw new NotSupportedException(); - } - - /// - public override Task ChangeDatabaseAsync(string databaseName, CancellationToken cancellationToken = default) - { - throw new NotSupportedException(); - } - - /// - /// Closes the connection to the database. - /// - public override void Close() - { - TaskHelper.WaitNonAsyncTask(Close(false)); - } - - /// - public override async Task CloseAsync() - { - await Close(true); - } - - /// - /// Not supported. Transactions are not supported by the ClickHouse server. - /// - /// Always throws . - public override void EnlistTransaction(Transaction? transaction) - { - throw new NotSupportedException(); - } - - /// - /// Not supported. Schema information is not implemented. - /// - /// Always throws . - public override DataTable GetSchema() - { - throw new NotImplementedException(); - } - - /// - public override DataTable GetSchema(string collectionName) - { - throw new NotImplementedException(); - } - - /// - public override DataTable GetSchema(string collectionName, string?[] restrictionValues) - { - throw new NotImplementedException(); - } - - /// - /// Opens a database connection. - /// - public override void Open() - { - TaskHelper.WaitNonAsyncTask(Open(false, CancellationToken.None)); - } - - /// - /// Opens a database connection asyncronously. - /// - /// The cancellation instruction. - /// A representing asyncronous operation. - public override async Task OpenAsync(CancellationToken cancellationToken) - { - await Open(true, cancellationToken); - } - - /// - /// Not supported. Transactions are not supported by the ClickHouse server. - /// - /// Always throws . - protected override DbTransaction BeginDbTransaction(System.Data.IsolationLevel isolationLevel) - { - throw new NotSupportedException(); - } - - /// - /// Not supported. Transactions are not supported by the ClickHouse server. - /// - /// Always throws . - protected override ValueTask BeginDbTransactionAsync(System.Data.IsolationLevel isolationLevel, CancellationToken cancellationToken) - { - throw new NotSupportedException(); - } - - /// - /// Creates and returns a object associated with the connection. - /// - /// A object. - public new ClickHouseCommand CreateCommand() - { - return new ClickHouseCommand(this); - } - - /// - /// Creates and returns a object associated with the connection. - /// - /// The text for a new command. - /// A object. - public ClickHouseCommand CreateCommand(string commandText) - { - return new ClickHouseCommand(this) {CommandText = commandText}; - } - - /// - protected override DbCommand CreateDbCommand() - { - return CreateCommand(); - } - - /// - /// Creates and returns a object. - /// - /// The INSERT statement. - /// A object. - /// - /// The command () must be a valid INSERT statement ending with VALUES. For example, - /// INSERT INTO table(field1, ... fieldN) VALUES - /// - public ClickHouseColumnWriter CreateColumnWriter(string insertFormatCommand) - { - return TaskHelper.WaitNonAsyncTask(CreateColumnWriter(insertFormatCommand, false, CancellationToken.None)); - } - - /// - /// Asyncronously creates and returns a object. - /// - /// The INSERT statement. - /// The cancellation instruction. - /// A representing asyncronous operation. - /// - /// The command () must be a valid INSERT statement ending with VALUES. For example, - /// INSERT INTO table(field1, ... fieldN) VALUES - /// - public async Task CreateColumnWriterAsync(string insertFormatCommand, CancellationToken cancellationToken) - { - return await CreateColumnWriter(insertFormatCommand, true, cancellationToken); - } - - private async ValueTask CreateColumnWriter(string insertFormatCommand, bool async, CancellationToken cancellationToken) - { - var connectionState = _connectionState; - if (connectionState.TcpClient == null) - { - Debug.Assert(connectionState.State != ConnectionState.Open); - throw new ClickHouseException(ClickHouseErrorCodes.ConnectionClosed, "The connection is closed."); - } - - ClickHouseTcpClient.Session? session = null; - bool cancelOnFailure = false; - try - { - session = await connectionState.TcpClient.OpenSession(async, null, CancellationToken.None, cancellationToken); - - var messageBuilder = new ClientQueryMessage.Builder {QueryKind = QueryKind.InitialQuery, Query = insertFormatCommand}; - await session.SendQuery(messageBuilder, null, async, cancellationToken); - - cancelOnFailure = true; - var msg = await session.ReadMessage(async, cancellationToken); - switch (msg.MessageCode) - { - case ServerMessageCode.Error: - throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand); - - case ServerMessageCode.TableColumns: - break; - - default: - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); - } - - msg = await session.ReadMessage(async, cancellationToken); - ClickHouseTable data; - switch (msg.MessageCode) - { - case ServerMessageCode.Error: - throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand); - - case ServerMessageCode.Data: - data = await session.ReadTable((ServerDataMessage) msg, null, async, cancellationToken); - break; - - default: - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); - } - - return new ClickHouseColumnWriter(session, data.Header.Columns); - } - catch (ClickHouseServerException) - { - if (session != null) - await session.Dispose(async); - - throw; - } - catch(ClickHouseHandledException) - { - if (session != null) - await session.Dispose(async); - - throw; - } - catch(Exception ex) - { - if (session != null) - { - var aggrEx = await session.SetFailed(ex, cancelOnFailure, async); - if (aggrEx != null) - throw aggrEx; - } - - throw; - } - } - - /// - /// For an open connection gets the default timezone of the ClickHouse server. - /// - /// The default timezone of the ClickHouse server. - /// Throws if the connection is not open. - public TimeZoneInfo GetServerTimeZone() - { - var connectionState = _connectionState; - var serverInfo = connectionState.TcpClient?.ServerInfo; - if (serverInfo == null || connectionState.State != ConnectionState.Open) - throw new ClickHouseException(ClickHouseErrorCodes.ConnectionClosed, "The connection is closed."); - - return TimeZoneHelper.GetTimeZoneInfo(serverInfo.Timezone); - } - - /// - /// Closes the connection and releases resources associated with it. - /// - protected override void Dispose(bool disposing) - { - var connectionState = _connectionState; - var counter = connectionState.Counter; - while (connectionState.Counter == counter) - { - var targetState = connectionState.State == ConnectionState.Closed ? ConnectionState.Closed : ConnectionState.Broken; - if (connectionState.State == targetState && connectionState.TcpClient == null) - break; - - var tcpClient = connectionState.TcpClient; - if (!TryChangeConnectionState(connectionState, targetState, null, out connectionState, out _)) - continue; - - tcpClient?.Dispose(); - break; - } - - base.Dispose(disposing); - } - - private async ValueTask Open(bool async, CancellationToken cancellationToken) - { - if (!BitConverter.IsLittleEndian) - { - throw new NotSupportedException( - "An architecture of the processor is not supported. Only little-endian architectures are supported." + Environment.NewLine + - "Please, report an issue if you see this message (https://github.com/Octonica/ClickHouseClient/issues)."); - } - - var connectionState = _connectionState; - switch (connectionState.State) - { - case ConnectionState.Closed: - break; - case ConnectionState.Open: - return; // Re-entrance is allowed - case ConnectionState.Connecting: - throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is already opening."); - case ConnectionState.Broken: - throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is broken."); - default: - throw new NotSupportedException($"Internal error. The state {_connectionState} is not supported."); - } - - if (!TryChangeConnectionState(connectionState, ConnectionState.Connecting, out connectionState, out var onStateChanged)) - throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The state of the connection was modified."); - - var stateChangeEx = onStateChanged(this); - var connectionSettings = connectionState.Settings; - if (stateChangeEx != null || connectionSettings == null) - { - var initialEx = stateChangeEx ?? new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is not initialized."); - if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, out _, out onStateChanged)) - throw new AggregateException(initialEx, new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The state of the connection was modified.")); - - var stateChangeEx2 = onStateChanged(this); - if (stateChangeEx2 != null) - throw new AggregateException(initialEx, stateChangeEx2); - - if (stateChangeEx != null) - throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangeEx); - - throw initialEx; - } - - const int defaultHttpPort = 8123; - TcpClient? client = null; - SslStream? sslStream = null; - ClickHouseBinaryProtocolWriter? writer = null; - ClickHouseBinaryProtocolReader? reader = null; - - try - { - try - { - client = new TcpClient {SendTimeout = connectionSettings.ReadWriteTimeout, ReceiveTimeout = connectionSettings.ReadWriteTimeout}; - - if (async) - await client.ConnectAsync(connectionSettings.Host, connectionSettings.Port); - else - client.Connect(connectionSettings.Host, connectionSettings.Port); - } - catch - { - client?.Client?.Close(0); - client?.Dispose(); - client = null; - throw; - } - - if (connectionSettings.TlsMode == ClickHouseTlsMode.Require) - { - var certValidationCallback = RemoteCertificateValidationCallback; - if (certValidationCallback == null && (connectionSettings.RootCertificate != null || !connectionSettings.ServerCertificateHash.IsEmpty)) - certValidationCallback = (_, cert, chain, errors) => ValidateServerCertificate(connectionSettings, cert, chain, errors); - - sslStream = new SslStream(client.GetStream(), true, certValidationCallback); - - try - { - if (async) - await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions { TargetHost = connectionSettings.Host }, cancellationToken); - else - sslStream.AuthenticateAsClient(connectionSettings.Host); - } - catch(AuthenticationException authEx) - { - throw new ClickHouseException(ClickHouseErrorCodes.TlsError, $"TLS handshake error.", authEx); - } - } - - var stream = sslStream ?? (Stream)client.GetStream(); - writer = new ClickHouseBinaryProtocolWriter(stream, Math.Max(connectionSettings.BufferSize, MinBufferSize)); - - var clientHello = new ClientHelloMessage.Builder - { - ClientName = connectionSettings.ClientName, - ClientVersion = connectionSettings.ClientVersion, - User = connectionSettings.User, - Database = connectionSettings.Database, - Password = connectionSettings.Password, - ProtocolRevision = ClickHouseProtocolRevisions.CurrentRevision - }.Build(); - - clientHello.Write(writer); - - await writer.Flush(async, cancellationToken); - - reader = new ClickHouseBinaryProtocolReader(stream, Math.Max(connectionSettings.BufferSize, MinBufferSize)); - var message = await reader.ReadMessage(false, async, cancellationToken); - - switch (message.MessageCode) - { - case ServerMessageCode.Hello: - var helloMessage = (ServerHelloMessage) message; - - bool hasExtraByte = reader.TryPeekByte(out var extraByte); - if (!hasExtraByte && client.Available > 0) - { - hasExtraByte = true; - extraByte = await reader.ReadByte(async, cancellationToken); - } - - if (hasExtraByte) - { - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Expected the end of the data. Unexpected byte (0x{extraByte:X}) received from the server."); - } - - var serverInfo = helloMessage.ServerInfo; - var configuredTypeInfoProvider = (_typeInfoProvider ?? DefaultTypeInfoProvider.Instance).Configure(serverInfo); - var tcpClient = new ClickHouseTcpClient(client, reader, writer, connectionSettings, serverInfo, configuredTypeInfoProvider, sslStream); - - if (!TryChangeConnectionState(connectionState, ConnectionState.Open, tcpClient, out _, out onStateChanged)) - throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The state of the connection was modified."); - - break; - - case ServerMessageCode.Error: - throw ((ServerErrorMessage) message).Exception; - - default: - if ((int) message.MessageCode == 'H') - { - // It looks like HTTP - string httpDetectedMessage; - if (connectionSettings.Port == defaultHttpPort) - { - // It's definitely HTTP - httpDetectedMessage = $"Detected an attempt to connect by HTTP protocol with the default port {defaultHttpPort}. "; - } - else - { - httpDetectedMessage = - $"Internal error. Unexpected message code (0x{message.MessageCode:X}) received from the server. " + - "This error may by caused by an attempt to connect with HTTP protocol. "; - } - - httpDetectedMessage += - $"{ClickHouseConnectionStringBuilder.DefaultClientName} supports only ClickHouse native protocol. " + - $"The default port for the native protocol is {ClickHouseConnectionStringBuilder.DefaultPort}."; - - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, httpDetectedMessage); - } - - if ((int) message.MessageCode == 0x15) - { - // 0x15 stands for TLS alert message - var sslAlertMessage = - $"Unexpected message code (0x{message.MessageCode:X}) received from the server. " + - "This code may indicate that the server requires establishing a connection over TLS."; - - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, sslAlertMessage); - } - - throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Internal error. Unexpected message code (0x{message.MessageCode:X}) received from the server."); - } - } - catch (Exception ex) - { - reader?.Dispose(); - writer?.Dispose(); - sslStream?.Dispose(); - client?.Client?.Close(0); - client?.Dispose(); - - if (TryChangeConnectionState(connectionState, ConnectionState.Closed, out _, out onStateChanged)) - stateChangeEx = onStateChanged(this); - - if (connectionSettings.Port == defaultHttpPort && ex is IOException) - { - var extraMessage = - $"{ex.Message} This error may be caused by an attempt to connect to the default HTTP port ({defaultHttpPort}). " + - $"{ClickHouseConnectionStringBuilder.DefaultClientName} supports only ClickHouse native protocol. " + - $"The default port for the native protocol is {ClickHouseConnectionStringBuilder.DefaultPort}."; - - var extraEx = new IOException(extraMessage, ex); - if (stateChangeEx != null) - throw new AggregateException(extraEx, stateChangeEx); - - throw extraEx; - } - - if (stateChangeEx != null) - throw new AggregateException(ex, stateChangeEx); - - throw; - } - - stateChangeEx = onStateChanged.Invoke(this); - if (stateChangeEx != null) - throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangeEx); - } - - /// - /// Send ping to the server and wait for response. - /// - /// - /// Returns true if ping was successful. - /// Returns false if the connection is busy with a command execution. - /// - public bool TryPing() - { - return TaskHelper.WaitNonAsyncTask(TryPing(false, CancellationToken.None)); - } - - /// - public Task TryPingAsync() - { - return TryPingAsync(CancellationToken.None); - } - - /// - public async Task TryPingAsync(CancellationToken cancellationToken) - { - return await TryPing(true, cancellationToken); - } - - private async ValueTask TryPing(bool async, CancellationToken cancellationToken) - { - if (_connectionState.TcpClient?.State == ClickHouseTcpClientState.Active) - return false; - - ClickHouseTcpClient.Session? session = null; - try - { - using (var ts = new CancellationTokenSource(TimeSpan.FromMilliseconds(5))) - { - try - { - session = await OpenSession(async, null, CancellationToken.None, ts.Token); - } - catch (OperationCanceledException ex) - { - if (ex.CancellationToken == ts.Token) - return false; - - throw; - } - } - - await session.SendPing(async, cancellationToken); - var responseMsg = await session.ReadMessage(async, cancellationToken); - - switch (responseMsg.MessageCode) - { - case ServerMessageCode.Pong: - return true; - - case ServerMessageCode.Error: - // Something else caused this error. Keep it in InnerException for debug. - throw new ClickHouseException( - ClickHouseErrorCodes.ProtocolUnexpectedResponse, - $"Internal error. Unexpected message code (0x{responseMsg.MessageCode:X}) received from the server as a response to ping.", - ((ServerErrorMessage)responseMsg).Exception); - - default: - throw new ClickHouseException( - ClickHouseErrorCodes.ProtocolUnexpectedResponse, - $"Internal error. Unexpected message code (0x{responseMsg.MessageCode:X}) received from the server as a response to ping."); - } - } - catch (ClickHouseHandledException) - { - throw; - } - catch (Exception ex) - { - if (session != null) - { - await session.SetFailed(ex, false, async); - session = null; - } - - throw; - } - finally - { - if (session != null) - { - if (async) - await session.DisposeAsync(); - else - session.Dispose(); - } - } - } - - internal ValueTask OpenSession(bool async, IClickHouseSessionExternalResources? externalResources, CancellationToken sessionCancellationToken, CancellationToken cancellationToken) - { - var connectionSession = new ConnectionSession(this, externalResources); - return connectionSession.OpenSession(async, sessionCancellationToken, cancellationToken); - } - - internal async ValueTask Close(bool async) - { - var connectionState = _connectionState; - var counter = connectionState.Counter; - while (connectionState.Counter == counter) - { - var tcpClient = connectionState.TcpClient; - Func? onStateChanged; - Exception? stateChangedEx; - switch (connectionState.State) - { - case ConnectionState.Closed: - return; // Re-entrance is allowed - - case ConnectionState.Open: - ClickHouseTcpClient.Session? session = null; - try - { - // Acquire session for preventing access to the communication object - var sessionTask = tcpClient?.OpenSession(async, null, CancellationToken.None, CancellationToken.None); - if (sessionTask != null) - session = await sessionTask.Value; - } - catch (ObjectDisposedException) - { - if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, null, out connectionState, out onStateChanged)) - continue; - - stateChangedEx = onStateChanged(this); - if (stateChangedEx != null) - throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangedEx); - - return; - } - catch - { - if (session != null) - await session.Dispose(async); - - throw; - } - - if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, null, out connectionState, out onStateChanged)) - { - if (session != null) - await session.Dispose(async); - - continue; - } - - tcpClient?.Dispose(); - - stateChangedEx = onStateChanged(this); - if (stateChangedEx != null) - throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangedEx); - - return; - - case ConnectionState.Broken: - if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, null, out connectionState, out onStateChanged)) - continue; - - tcpClient?.Dispose(); - - stateChangedEx = onStateChanged(this); - if (stateChangedEx != null) - throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangedEx); - - break; - - case ConnectionState.Connecting: - throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is opening. It can't be closed."); - - default: - throw new NotSupportedException($"Internal error. The state {_connectionState} is not supported."); - } - } - } - - private bool TryChangeConnectionState(ClickHouseConnectionState from, ClickHouseConnectionState to, out ClickHouseConnectionState actualState) - { - actualState = Interlocked.CompareExchange(ref _connectionState, to, from); - if (ReferenceEquals(actualState, from)) - { - actualState = to; - return true; - } - - return false; - } - - private bool TryChangeConnectionState( - ClickHouseConnectionState state, - ConnectionState newState, - ClickHouseTcpClient? client, - out ClickHouseConnectionState actualState, - [NotNullWhen(true)] out Func? onStateChanged) - { - var counter = state.State != ConnectionState.Connecting && newState == ConnectionState.Connecting ? unchecked(state.Counter + 1) : state.Counter; - var nextState = new ClickHouseConnectionState(newState, client, state.Settings, counter); - if (TryChangeConnectionState(state, nextState, out actualState)) - { - onStateChanged = CreateConnectionStateChangedCallback(state.State, actualState.State); - return true; - } - - onStateChanged = null; - return false; - } - - private bool TryChangeConnectionState( - ClickHouseConnectionState state, - ConnectionState newState, - out ClickHouseConnectionState actualState, - [NotNullWhen(true)] out Func? onStateChanged) - { - return TryChangeConnectionState(state, newState, state.TcpClient, out actualState, out onStateChanged); - } - - private static Func CreateConnectionStateChangedCallback(ConnectionState originalState, ConnectionState currentState) - { - if (originalState == currentState) - return _ => null; - - return FireEvent; - - Exception? FireEvent(ClickHouseConnection connection) - { - try - { - connection.OnStateChange(new StateChangeEventArgs(originalState, currentState)); - } - catch (Exception ex) - { - return ex; - } - - return null; - } - } - - private static bool ValidateServerCertificate(ClickHouseConnectionSettings connectionSettings, X509Certificate? cert, X509Chain? chain, SslPolicyErrors errors) - { - if (errors == SslPolicyErrors.None) - return true; - - if (cert == null) - return false; - - if (!connectionSettings.ServerCertificateHash.IsEmpty) - { - var certHash = cert.GetCertHash(); - if (connectionSettings.ServerCertificateHash.Span.SequenceEqual(certHash)) - return true; - } - - if (chain != null && connectionSettings.RootCertificate != null) - { - if ((errors & ~SslPolicyErrors.RemoteCertificateChainErrors) != SslPolicyErrors.None) - return false; - - var collection = CertificateHelper.LoadFromFile(connectionSettings.RootCertificate); -#if NET5_0_OR_GREATER - chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; - chain.ChainPolicy.CustomTrustStore.AddRange(collection); - var isValid = chain.Build(cert as X509Certificate2 ?? new X509Certificate2(cert)); - return isValid; -#else - foreach (var chainElement in chain.ChainElements) - { - if (chainElement.ChainElementStatus.Length != 0) - { - bool ignoreError = true; - foreach (var status in chainElement.ChainElementStatus) - { - if (status.Status == X509ChainStatusFlags.UntrustedRoot) - continue; - - ignoreError = false; - break; - } - - if (!ignoreError) - break; - } - - if (collection.Contains(chainElement.Certificate)) - return true; - } -#endif - } - - return false; - } - - private class ConnectionSession : IClickHouseSessionExternalResources - { - private readonly ClickHouseConnection _connection; - private readonly ClickHouseConnectionState _state; - private readonly IClickHouseSessionExternalResources? _externalResources; - - public ConnectionSession(ClickHouseConnection connection, IClickHouseSessionExternalResources? externalResources) - { - _connection = connection; - _state = _connection._connectionState; - _externalResources = externalResources; - - var tcpClient = _state.TcpClient; - if (tcpClient == null) - { - Debug.Assert(_state.State != ConnectionState.Open); - throw new ClickHouseException(ClickHouseErrorCodes.ConnectionClosed, "The connection is closed."); - } - - if (_state.State != ConnectionState.Open) - throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is closed."); - } - - public ValueTask OpenSession(bool async, CancellationToken sessionCancellationToken, CancellationToken cancellationToken) - { - Debug.Assert(_state.TcpClient != null); - return _state.TcpClient.OpenSession(async, this, sessionCancellationToken, cancellationToken); - } - - public ValueTask Release(bool async) - { - return _externalResources?.Release(async) ?? default; - } - - public async ValueTask ReleaseOnFailure(Exception? exception, bool async) - { - Exception? ex = null; - if (_connection.TryChangeConnectionState(_state, ConnectionState.Broken, null, out _, out var onStateChanged)) - ex = onStateChanged(_connection); - - Exception? externalEx = null; - if (_externalResources != null) - externalEx = await _externalResources.ReleaseOnFailure(exception, async); - - if (ex != null && externalEx != null) - return new AggregateException(ex, externalEx); - - return externalEx ?? ex; - } - } - } -} +#region License Apache 2.0 +/* Copyright 2019-2022 Octonica + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#endregion + +using System; +using System.Data; +using System.Data.Common; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using System.Transactions; +using Octonica.ClickHouseClient.Exceptions; +using Octonica.ClickHouseClient.Protocol; +using Octonica.ClickHouseClient.Types; +using Octonica.ClickHouseClient.Utils; + +namespace Octonica.ClickHouseClient +{ + /// + /// Represents a connection to a ClickHouse database. This class cannot be inherited. + /// + public sealed class ClickHouseConnection : DbConnection + { + private const int MinBufferSize = 32; + + private readonly IClickHouseTypeInfoProvider? _typeInfoProvider; + + private ClickHouseConnectionState _connectionState; + + /// + /// Gets or sets the string used to open a connection to a ClickHouse database server. + /// + [AllowNull] + public override string ConnectionString + { + get + { + var state = _connectionState; + if (state.Settings == null) + return string.Empty; + + return new ClickHouseConnectionStringBuilder(state.Settings).ConnectionString; + } + set + { + var newSettings = value == null ? null : new ClickHouseConnectionStringBuilder(value).BuildSettings(); + var state = _connectionState; + while (true) + { + if (ReferenceEquals(state.Settings, newSettings)) + break; + + if (state.State != ConnectionState.Closed && state.State != ConnectionState.Broken) + throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection string can not be modified because the connection is active."); + + var newState = new ClickHouseConnectionState(state.State, state.TcpClient, newSettings, unchecked(state.Counter + 1)); + if (TryChangeConnectionState(state, newState, out state)) + break; + } + } + } + + /// + /// The connection doesn't support a timeout. This property always returns . + /// + /// . + public override int ConnectionTimeout => Timeout.Infinite; + + /// + /// Gets the name of the database specified in the connection settings. + /// + /// The name of the database specified in the connection settings. The default value is an empty string. + public override string Database => _connectionState.Settings?.Database ?? string.Empty; + + /// + /// Gets the name of the database server specified in the connection settings. + /// The name of the server contains the hostname and the port. If the port is equal to the default ClickHouse server port (9000) the + /// name of the server will conatin only hostname. + /// + /// The name of the database server specified in the connection settings. The default value is an empty string. + public override string DataSource + { + get + { + var state = _connectionState; + if (state.Settings == null) + return string.Empty; + + return state.Settings.Host + (state.Settings.Port != ClickHouseConnectionStringBuilder.DefaultPort ? ":" + state.Settings.Port : string.Empty); + } + } + + /// + /// When the connection is open gets the version of the ClickHouse database server. + /// + /// The version of the ClickHouse database server. The default value is an empty string. + public override string ServerVersion => _connectionState.TcpClient?.ServerInfo.Version.ToString() ?? string.Empty; + + /// + /// Gets the state of the connection. + /// + /// The state of the connection. + public override ConnectionState State => _connectionState.State; + + /// + /// Gets or sets the callback for custom validation of the server's certificate. When the callback is set + /// other TLS certificate validation options are ignored. + /// + public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; } + + internal TimeSpan? CommandTimeSpan + { + get + { + var commandTimeout = _connectionState.Settings?.CommandTimeout; + if (commandTimeout == null) + return null; + + return TimeSpan.FromSeconds(commandTimeout.Value); + } + } + + /// + /// Gets the default mode of passing parameters to the query for the connection. + /// + /// The default mode of passing parameters to the query for the connection. The default value is . + public ClickHouseParameterMode ParametersMode + { + get + { + var mode = _connectionState.Settings?.ParametersMode; + if (mode == null || mode.Value == ClickHouseParameterMode.Inherit) + return ClickHouseParameterMode.Default; + + return mode.Value; + } + } + + /// + /// Initializes a new instance of class. + /// + public ClickHouseConnection() + { + _connectionState = new ClickHouseConnectionState(); + } + + /// + /// Initializes a new instance of with the settings. + /// + /// The connection string. + /// Optional parameter. The provider of types for the connection. If the value is not specified the default type provider () will be used. + public ClickHouseConnection(string connectionString, IClickHouseTypeInfoProvider? typeInfoProvider = null) + : this(new ClickHouseConnectionStringBuilder(connectionString), typeInfoProvider) + { + } + + /// + /// Initializes a new instance of with the settings. + /// + /// The connection string builder which will be used for building the connection settings. + /// Optional parameter. The provider of types for the connection. If the value is not specified the default type provider () will be used. + public ClickHouseConnection(ClickHouseConnectionStringBuilder stringBuilder, IClickHouseTypeInfoProvider? typeInfoProvider = null) + { + if (stringBuilder == null) + throw new ArgumentNullException(nameof(stringBuilder)); + + var connectionSettings = stringBuilder.BuildSettings(); + + _connectionState = new ClickHouseConnectionState(ConnectionState.Closed, null, connectionSettings, 0); + _typeInfoProvider = typeInfoProvider; + } + + /// + /// Initializes a new instance of with the settings. + /// + /// The connection settings. + /// Optional parameter. The provider of types for the connection. If the value is not specified the default type provider () will be used. + public ClickHouseConnection(ClickHouseConnectionSettings connectionSettings, IClickHouseTypeInfoProvider? typeInfoProvider = null) + { + if (connectionSettings == null) + throw new ArgumentNullException(nameof(connectionSettings)); + + _connectionState = new ClickHouseConnectionState(ConnectionState.Closed, null, connectionSettings, 0); + _typeInfoProvider = typeInfoProvider; + } + + /// + /// Not supported. The database cannot be changed while the connection is open. + /// + /// Always throws . + public override void ChangeDatabase(string databaseName) + { + throw new NotSupportedException(); + } + + /// + public override Task ChangeDatabaseAsync(string databaseName, CancellationToken cancellationToken = default) + { + throw new NotSupportedException(); + } + + /// + /// Closes the connection to the database. + /// + public override void Close() + { + TaskHelper.WaitNonAsyncTask(Close(false)); + } + + /// + public override async Task CloseAsync() + { + await Close(true); + } + + /// + /// Not supported. Transactions are not supported by the ClickHouse server. + /// + /// Always throws . + public override void EnlistTransaction(Transaction? transaction) + { + throw new NotSupportedException(); + } + + /// + /// Not supported. Schema information is not implemented. + /// + /// Always throws . + public override DataTable GetSchema() + { + throw new NotImplementedException(); + } + + /// + public override DataTable GetSchema(string collectionName) + { + throw new NotImplementedException(); + } + + /// + public override DataTable GetSchema(string collectionName, string?[] restrictionValues) + { + throw new NotImplementedException(); + } + + /// + /// Opens a database connection. + /// + public override void Open() + { + TaskHelper.WaitNonAsyncTask(Open(false, CancellationToken.None)); + } + + /// + /// Opens a database connection asyncronously. + /// + /// The cancellation instruction. + /// A representing asyncronous operation. + public override async Task OpenAsync(CancellationToken cancellationToken) + { + await Open(true, cancellationToken); + } + + /// + /// Not supported. Transactions are not supported by the ClickHouse server. + /// + /// Always throws . + protected override DbTransaction BeginDbTransaction(System.Data.IsolationLevel isolationLevel) + { + throw new NotSupportedException(); + } + + /// + /// Not supported. Transactions are not supported by the ClickHouse server. + /// + /// Always throws . + protected override ValueTask BeginDbTransactionAsync(System.Data.IsolationLevel isolationLevel, CancellationToken cancellationToken) + { + throw new NotSupportedException(); + } + + /// + /// Creates and returns a object associated with the connection. + /// + /// A object. + public new ClickHouseCommand CreateCommand() + { + return new ClickHouseCommand(this); + } + + /// + /// Creates and returns a object associated with the connection. + /// + /// The text for a new command. + /// A object. + public ClickHouseCommand CreateCommand(string commandText) + { + return new ClickHouseCommand(this) {CommandText = commandText}; + } + + /// + protected override DbCommand CreateDbCommand() + { + return CreateCommand(); + } + + /// + /// Creates and returns a object. + /// + /// The INSERT statement. + /// A object. + /// + /// The command () must be a valid INSERT statement ending with VALUES. For example, + /// INSERT INTO table(field1, ... fieldN) VALUES + /// + public ClickHouseColumnWriter CreateColumnWriter(string insertFormatCommand) + { + return TaskHelper.WaitNonAsyncTask(CreateColumnWriter(insertFormatCommand, false, CancellationToken.None)); + } + + /// + /// Asyncronously creates and returns a object. + /// + /// The INSERT statement. + /// The cancellation instruction. + /// A representing asyncronous operation. + /// + /// The command () must be a valid INSERT statement ending with VALUES. For example, + /// INSERT INTO table(field1, ... fieldN) VALUES + /// + public async Task CreateColumnWriterAsync(string insertFormatCommand, CancellationToken cancellationToken) + { + return await CreateColumnWriter(insertFormatCommand, true, cancellationToken); + } + + private async ValueTask CreateColumnWriter(string insertFormatCommand, bool async, CancellationToken cancellationToken) + { + var connectionState = _connectionState; + if (connectionState.TcpClient == null) + { + Debug.Assert(connectionState.State != ConnectionState.Open); + throw new ClickHouseException(ClickHouseErrorCodes.ConnectionClosed, "The connection is closed."); + } + + ClickHouseTcpClient.Session? session = null; + bool cancelOnFailure = false; + try + { + session = await connectionState.TcpClient.OpenSession(async, null, CancellationToken.None, cancellationToken); + + var messageBuilder = new ClientQueryMessage.Builder {QueryKind = QueryKind.InitialQuery, Query = insertFormatCommand}; + await session.SendQuery(messageBuilder, null, async, cancellationToken); + + cancelOnFailure = true; + var msg = await session.ReadMessage(async, cancellationToken); + switch (msg.MessageCode) + { + case ServerMessageCode.Error: + throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand); + + case ServerMessageCode.TableColumns: + break; + + default: + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); + } + + msg = await session.ReadMessage(async, cancellationToken); + ClickHouseTable data; + switch (msg.MessageCode) + { + case ServerMessageCode.Error: + throw ((ServerErrorMessage) msg).Exception.CopyWithQuery(insertFormatCommand); + + case ServerMessageCode.Data: + data = await session.ReadTable((ServerDataMessage) msg, null, async, cancellationToken); + break; + + default: + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Unexpected server message. Received the message of type {msg.MessageCode}."); + } + + return new ClickHouseColumnWriter(session, data.Header.Columns); + } + catch (ClickHouseServerException) + { + if (session != null) + await session.Dispose(async); + + throw; + } + catch(ClickHouseHandledException) + { + if (session != null) + await session.Dispose(async); + + throw; + } + catch(Exception ex) + { + if (session != null) + { + var aggrEx = await session.SetFailed(ex, cancelOnFailure, async); + if (aggrEx != null) + throw aggrEx; + } + + throw; + } + } + + /// + /// For an open connection gets the default timezone of the ClickHouse server. + /// + /// The default timezone of the ClickHouse server. + /// Throws if the connection is not open. + public TimeZoneInfo GetServerTimeZone() + { + var connectionState = _connectionState; + var serverInfo = connectionState.TcpClient?.ServerInfo; + if (serverInfo == null || connectionState.State != ConnectionState.Open) + throw new ClickHouseException(ClickHouseErrorCodes.ConnectionClosed, "The connection is closed."); + + return TimeZoneHelper.GetTimeZoneInfo(serverInfo.Timezone); + } + + /// + /// Closes the connection and releases resources associated with it. + /// + protected override void Dispose(bool disposing) + { + var connectionState = _connectionState; + var counter = connectionState.Counter; + while (connectionState.Counter == counter) + { + var targetState = connectionState.State == ConnectionState.Closed ? ConnectionState.Closed : ConnectionState.Broken; + if (connectionState.State == targetState && connectionState.TcpClient == null) + break; + + var tcpClient = connectionState.TcpClient; + if (!TryChangeConnectionState(connectionState, targetState, null, out connectionState, out _)) + continue; + + tcpClient?.Dispose(); + break; + } + + base.Dispose(disposing); + } + + private async ValueTask Open(bool async, CancellationToken cancellationToken) + { + if (!BitConverter.IsLittleEndian) + { + throw new NotSupportedException( + "An architecture of the processor is not supported. Only little-endian architectures are supported." + Environment.NewLine + + "Please, report an issue if you see this message (https://github.com/Octonica/ClickHouseClient/issues)."); + } + + var connectionState = _connectionState; + switch (connectionState.State) + { + case ConnectionState.Closed: + break; + case ConnectionState.Open: + return; // Re-entrance is allowed + case ConnectionState.Connecting: + throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is already opening."); + case ConnectionState.Broken: + throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is broken."); + default: + throw new NotSupportedException($"Internal error. The state {_connectionState} is not supported."); + } + + if (!TryChangeConnectionState(connectionState, ConnectionState.Connecting, out connectionState, out var onStateChanged)) + throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The state of the connection was modified."); + + var stateChangeEx = onStateChanged(this); + var connectionSettings = connectionState.Settings; + if (stateChangeEx != null || connectionSettings == null) + { + var initialEx = stateChangeEx ?? new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is not initialized."); + if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, out _, out onStateChanged)) + throw new AggregateException(initialEx, new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The state of the connection was modified.")); + + var stateChangeEx2 = onStateChanged(this); + if (stateChangeEx2 != null) + throw new AggregateException(initialEx, stateChangeEx2); + + if (stateChangeEx != null) + throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangeEx); + + throw initialEx; + } + + const int defaultHttpPort = 8123; + TcpClient? client = null; + SslStream? sslStream = null; + ClickHouseBinaryProtocolWriter? writer = null; + ClickHouseBinaryProtocolReader? reader = null; + + try + { + try + { + client = new TcpClient {SendTimeout = connectionSettings.ReadWriteTimeout, ReceiveTimeout = connectionSettings.ReadWriteTimeout}; + + if (async) + await client.ConnectAsync(connectionSettings.Host, connectionSettings.Port); + else + client.Connect(connectionSettings.Host, connectionSettings.Port); + } + catch + { + client?.Client?.Close(0); + client?.Dispose(); + client = null; + throw; + } + + if (connectionSettings.TlsMode == ClickHouseTlsMode.Require) + { + var certValidationCallback = RemoteCertificateValidationCallback; + if (certValidationCallback == null && (connectionSettings.RootCertificate != null || !connectionSettings.ServerCertificateHash.IsEmpty)) + certValidationCallback = (_, cert, chain, errors) => ValidateServerCertificate(connectionSettings, cert, chain, errors); + + sslStream = new SslStream(client.GetStream(), true, certValidationCallback); + + try + { + if (async) + await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions { TargetHost = connectionSettings.Host }, cancellationToken); + else + sslStream.AuthenticateAsClient(connectionSettings.Host); + } + catch(AuthenticationException authEx) + { + throw new ClickHouseException(ClickHouseErrorCodes.TlsError, $"TLS handshake error.", authEx); + } + } + + var stream = sslStream ?? (Stream)client.GetStream(); + writer = new ClickHouseBinaryProtocolWriter(stream, Math.Max(connectionSettings.BufferSize, MinBufferSize)); + + var clientHello = new ClientHelloMessage.Builder + { + ClientName = connectionSettings.ClientName, + ClientVersion = connectionSettings.ClientVersion, + User = connectionSettings.User, + Database = connectionSettings.Database, + Password = connectionSettings.Password, + ProtocolRevision = ClickHouseProtocolRevisions.CurrentRevision + }.Build(); + + clientHello.Write(writer); + + await writer.Flush(async, cancellationToken); + + reader = new ClickHouseBinaryProtocolReader(stream, Math.Max(connectionSettings.BufferSize, MinBufferSize)); + var message = await reader.ReadMessage(false, async, cancellationToken); + + switch (message.MessageCode) + { + case ServerMessageCode.Hello: + var helloMessage = (ServerHelloMessage) message; + + bool hasExtraByte = reader.TryPeekByte(out var extraByte); + if (!hasExtraByte && client.Available > 0) + { + hasExtraByte = true; + extraByte = await reader.ReadByte(async, cancellationToken); + } + + if (hasExtraByte) + { + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Expected the end of the data. Unexpected byte (0x{extraByte:X}) received from the server."); + } + + var serverInfo = helloMessage.ServerInfo; + var configuredTypeInfoProvider = (_typeInfoProvider ?? DefaultTypeInfoProvider.Instance).Configure(serverInfo); + var tcpClient = new ClickHouseTcpClient(client, reader, writer, connectionSettings, serverInfo, configuredTypeInfoProvider, sslStream); + + if (!TryChangeConnectionState(connectionState, ConnectionState.Open, tcpClient, out _, out onStateChanged)) + throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The state of the connection was modified."); + + break; + + case ServerMessageCode.Error: + throw ((ServerErrorMessage) message).Exception; + + default: + if ((int) message.MessageCode == 'H') + { + // It looks like HTTP + string httpDetectedMessage; + if (connectionSettings.Port == defaultHttpPort) + { + // It's definitely HTTP + httpDetectedMessage = $"Detected an attempt to connect by HTTP protocol with the default port {defaultHttpPort}. "; + } + else + { + httpDetectedMessage = + $"Internal error. Unexpected message code (0x{message.MessageCode:X}) received from the server. " + + "This error may by caused by an attempt to connect with HTTP protocol. "; + } + + httpDetectedMessage += + $"{ClickHouseConnectionStringBuilder.DefaultClientName} supports only ClickHouse native protocol. " + + $"The default port for the native protocol is {ClickHouseConnectionStringBuilder.DefaultPort}."; + + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, httpDetectedMessage); + } + + if ((int) message.MessageCode == 0x15) + { + // 0x15 stands for TLS alert message + var sslAlertMessage = + $"Unexpected message code (0x{message.MessageCode:X}) received from the server. " + + "This code may indicate that the server requires establishing a connection over TLS."; + + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, sslAlertMessage); + } + + throw new ClickHouseException(ClickHouseErrorCodes.ProtocolUnexpectedResponse, $"Internal error. Unexpected message code (0x{message.MessageCode:X}) received from the server."); + } + } + catch (Exception ex) + { + reader?.Dispose(); + writer?.Dispose(); + sslStream?.Dispose(); + client?.Client?.Close(0); + client?.Dispose(); + + if (TryChangeConnectionState(connectionState, ConnectionState.Closed, out _, out onStateChanged)) + stateChangeEx = onStateChanged(this); + + if (connectionSettings.Port == defaultHttpPort && ex is IOException) + { + var extraMessage = + $"{ex.Message} This error may be caused by an attempt to connect to the default HTTP port ({defaultHttpPort}). " + + $"{ClickHouseConnectionStringBuilder.DefaultClientName} supports only ClickHouse native protocol. " + + $"The default port for the native protocol is {ClickHouseConnectionStringBuilder.DefaultPort}."; + + var extraEx = new IOException(extraMessage, ex); + if (stateChangeEx != null) + throw new AggregateException(extraEx, stateChangeEx); + + throw extraEx; + } + + if (stateChangeEx != null) + throw new AggregateException(ex, stateChangeEx); + + throw; + } + + stateChangeEx = onStateChanged.Invoke(this); + if (stateChangeEx != null) + throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangeEx); + } + + /// + /// Send ping to the server and wait for response. + /// + /// + /// Returns true if ping was successful. + /// Returns false if the connection is busy with a command execution. + /// + public bool TryPing() + { + return TaskHelper.WaitNonAsyncTask(TryPing(false, CancellationToken.None)); + } + + /// + public Task TryPingAsync() + { + return TryPingAsync(CancellationToken.None); + } + + /// + public async Task TryPingAsync(CancellationToken cancellationToken) + { + return await TryPing(true, cancellationToken); + } + + private async ValueTask TryPing(bool async, CancellationToken cancellationToken) + { + if (_connectionState.TcpClient?.State == ClickHouseTcpClientState.Active) + return false; + + ClickHouseTcpClient.Session? session = null; + try + { + using (var ts = new CancellationTokenSource(TimeSpan.FromMilliseconds(5))) + { + try + { + session = await OpenSession(async, null, CancellationToken.None, ts.Token); + } + catch (OperationCanceledException ex) + { + if (ex.CancellationToken == ts.Token) + return false; + + throw; + } + } + + await session.SendPing(async, cancellationToken); + var responseMsg = await session.ReadMessage(async, cancellationToken); + + switch (responseMsg.MessageCode) + { + case ServerMessageCode.Pong: + return true; + + case ServerMessageCode.Error: + // Something else caused this error. Keep it in InnerException for debug. + throw new ClickHouseException( + ClickHouseErrorCodes.ProtocolUnexpectedResponse, + $"Internal error. Unexpected message code (0x{responseMsg.MessageCode:X}) received from the server as a response to ping.", + ((ServerErrorMessage)responseMsg).Exception); + + default: + throw new ClickHouseException( + ClickHouseErrorCodes.ProtocolUnexpectedResponse, + $"Internal error. Unexpected message code (0x{responseMsg.MessageCode:X}) received from the server as a response to ping."); + } + } + catch (ClickHouseHandledException) + { + throw; + } + catch (Exception ex) + { + if (session != null) + { + await session.SetFailed(ex, false, async); + session = null; + } + + throw; + } + finally + { + if (session != null) + { + if (async) + await session.DisposeAsync(); + else + session.Dispose(); + } + } + } + + internal ValueTask OpenSession(bool async, IClickHouseSessionExternalResources? externalResources, CancellationToken sessionCancellationToken, CancellationToken cancellationToken) + { + var connectionSession = new ConnectionSession(this, externalResources); + return connectionSession.OpenSession(async, sessionCancellationToken, cancellationToken); + } + + internal async ValueTask Close(bool async) + { + var connectionState = _connectionState; + var counter = connectionState.Counter; + while (connectionState.Counter == counter) + { + var tcpClient = connectionState.TcpClient; + Func? onStateChanged; + Exception? stateChangedEx; + switch (connectionState.State) + { + case ConnectionState.Closed: + return; // Re-entrance is allowed + + case ConnectionState.Open: + ClickHouseTcpClient.Session? session = null; + try + { + // Acquire session for preventing access to the communication object + var sessionTask = tcpClient?.OpenSession(async, null, CancellationToken.None, CancellationToken.None); + if (sessionTask != null) + session = await sessionTask.Value; + } + catch (ObjectDisposedException) + { + if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, null, out connectionState, out onStateChanged)) + continue; + + stateChangedEx = onStateChanged(this); + if (stateChangedEx != null) + throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangedEx); + + return; + } + catch + { + if (session != null) + await session.Dispose(async); + + throw; + } + + if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, null, out connectionState, out onStateChanged)) + { + if (session != null) + await session.Dispose(async); + + continue; + } + + tcpClient?.Dispose(); + + stateChangedEx = onStateChanged(this); + if (stateChangedEx != null) + throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangedEx); + + return; + + case ConnectionState.Broken: + if (!TryChangeConnectionState(connectionState, ConnectionState.Closed, null, out connectionState, out onStateChanged)) + continue; + + tcpClient?.Dispose(); + + stateChangedEx = onStateChanged(this); + if (stateChangedEx != null) + throw new ClickHouseException(ClickHouseErrorCodes.CallbackError, "External callback error. See the inner exception for details.", stateChangedEx); + + break; + + case ConnectionState.Connecting: + throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is opening. It can't be closed."); + + default: + throw new NotSupportedException($"Internal error. The state {_connectionState} is not supported."); + } + } + } + + private bool TryChangeConnectionState(ClickHouseConnectionState from, ClickHouseConnectionState to, out ClickHouseConnectionState actualState) + { + actualState = Interlocked.CompareExchange(ref _connectionState, to, from); + if (ReferenceEquals(actualState, from)) + { + actualState = to; + return true; + } + + return false; + } + + private bool TryChangeConnectionState( + ClickHouseConnectionState state, + ConnectionState newState, + ClickHouseTcpClient? client, + out ClickHouseConnectionState actualState, + [NotNullWhen(true)] out Func? onStateChanged) + { + var counter = state.State != ConnectionState.Connecting && newState == ConnectionState.Connecting ? unchecked(state.Counter + 1) : state.Counter; + var nextState = new ClickHouseConnectionState(newState, client, state.Settings, counter); + if (TryChangeConnectionState(state, nextState, out actualState)) + { + onStateChanged = CreateConnectionStateChangedCallback(state.State, actualState.State); + return true; + } + + onStateChanged = null; + return false; + } + + private bool TryChangeConnectionState( + ClickHouseConnectionState state, + ConnectionState newState, + out ClickHouseConnectionState actualState, + [NotNullWhen(true)] out Func? onStateChanged) + { + return TryChangeConnectionState(state, newState, state.TcpClient, out actualState, out onStateChanged); + } + + private static Func CreateConnectionStateChangedCallback(ConnectionState originalState, ConnectionState currentState) + { + if (originalState == currentState) + return _ => null; + + return FireEvent; + + Exception? FireEvent(ClickHouseConnection connection) + { + try + { + connection.OnStateChange(new StateChangeEventArgs(originalState, currentState)); + } + catch (Exception ex) + { + return ex; + } + + return null; + } + } + + private static bool ValidateServerCertificate(ClickHouseConnectionSettings connectionSettings, X509Certificate? cert, X509Chain? chain, SslPolicyErrors errors) + { + if (errors == SslPolicyErrors.None) + return true; + + if (cert == null) + return false; + + if (!connectionSettings.ServerCertificateHash.IsEmpty) + { + var certHash = cert.GetCertHash(); + if (connectionSettings.ServerCertificateHash.Span.SequenceEqual(certHash)) + return true; + } + + if (chain != null && connectionSettings.RootCertificate != null) + { + if ((errors & ~SslPolicyErrors.RemoteCertificateChainErrors) != SslPolicyErrors.None) + return false; + + var collection = CertificateHelper.LoadFromFile(connectionSettings.RootCertificate); +#if NET5_0_OR_GREATER + chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; + chain.ChainPolicy.CustomTrustStore.AddRange(collection); + var isValid = chain.Build(cert as X509Certificate2 ?? new X509Certificate2(cert)); + return isValid; +#else + foreach (var chainElement in chain.ChainElements) + { + if (chainElement.ChainElementStatus.Length != 0) + { + bool ignoreError = true; + foreach (var status in chainElement.ChainElementStatus) + { + if (status.Status == X509ChainStatusFlags.UntrustedRoot) + continue; + + ignoreError = false; + break; + } + + if (!ignoreError) + break; + } + + if (collection.Contains(chainElement.Certificate)) + return true; + } +#endif + } + + return false; + } + + private class ConnectionSession : IClickHouseSessionExternalResources + { + private readonly ClickHouseConnection _connection; + private readonly ClickHouseConnectionState _state; + private readonly IClickHouseSessionExternalResources? _externalResources; + + public ConnectionSession(ClickHouseConnection connection, IClickHouseSessionExternalResources? externalResources) + { + _connection = connection; + _state = _connection._connectionState; + _externalResources = externalResources; + + var tcpClient = _state.TcpClient; + if (tcpClient == null) + { + Debug.Assert(_state.State != ConnectionState.Open); + throw new ClickHouseException(ClickHouseErrorCodes.ConnectionClosed, "The connection is closed."); + } + + if (_state.State != ConnectionState.Open) + throw new ClickHouseException(ClickHouseErrorCodes.InvalidConnectionState, "The connection is closed."); + } + + public ValueTask OpenSession(bool async, CancellationToken sessionCancellationToken, CancellationToken cancellationToken) + { + Debug.Assert(_state.TcpClient != null); + return _state.TcpClient.OpenSession(async, this, sessionCancellationToken, cancellationToken); + } + + public ValueTask Release(bool async) + { + return _externalResources?.Release(async) ?? default; + } + + public async ValueTask ReleaseOnFailure(Exception? exception, bool async) + { + Exception? ex = null; + if (_connection.TryChangeConnectionState(_state, ConnectionState.Broken, null, out _, out var onStateChanged)) + ex = onStateChanged(_connection); + + Exception? externalEx = null; + if (_externalResources != null) + externalEx = await _externalResources.ReleaseOnFailure(exception, async); + + if (ex != null && externalEx != null) + return new AggregateException(ex, externalEx); + + return externalEx ?? ex; + } + } + } +} diff --git a/src/Octonica.ClickHouseClient/ClickHouseConnectionSettings.cs b/src/Octonica.ClickHouseClient/ClickHouseConnectionSettings.cs index dcb3c71..9a55e0c 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseConnectionSettings.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseConnectionSettings.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -98,6 +98,11 @@ public sealed class ClickHouseConnectionSettings /// public ReadOnlyMemory ServerCertificateHash { get; } + /// + /// Gets the default mode of passing parameters to the query for the connection. + /// + public ClickHouseParameterMode ParametersMode { get; } + internal readonly int CompressionBlockSize = 1024 * 8; // Maybe it should be configurable internal ClickHouseConnectionSettings(ClickHouseConnectionStringBuilder builder) @@ -122,6 +127,7 @@ internal ClickHouseConnectionSettings(ClickHouseConnectionStringBuilder builder) TlsMode = builder.TlsMode; RootCertificate = builder.RootCertificate; ServerCertificateHash = ParseHashString(builder.ServerCertificateHash); + ParametersMode = builder.ParametersMode; } private static byte[]? ParseHashString(string? hashString) diff --git a/src/Octonica.ClickHouseClient/ClickHouseConnectionStringBuilder.cs b/src/Octonica.ClickHouseClient/ClickHouseConnectionStringBuilder.cs index a4be12a..220783e 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseConnectionStringBuilder.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseConnectionStringBuilder.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,7 +73,12 @@ public class ClickHouseConnectionStringBuilder : DbConnectionStringBuilder /// /// The default value for the TLS mode is . /// - public static readonly ClickHouseTlsMode DefaultTlsMode = ClickHouseTlsMode.Disable; + public const ClickHouseTlsMode DefaultTlsMode = ClickHouseTlsMode.Disable; + + /// + /// The default value for the mode of passing parameters to the query is . + /// + public const ClickHouseParameterMode DefaultParametersMode = ClickHouseParameterMode.Default; /// /// Gets or sets the name or the IP address of the host. @@ -241,6 +246,16 @@ public string? ServerCertificateHash set => this[nameof(ServerCertificateHash)] = value; } + /// + /// Gets the default mode of passing parameters to the query for the connection. + /// + /// The default mode of passing parameters to the query for the connection. The default value is . + public ClickHouseParameterMode ParametersMode + { + get => GetEnumOrDefault(nameof(ParametersMode), DefaultParametersMode); + set => this[nameof(ParametersMode)] = value == DefaultParametersMode ? null : value.ToString("G"); + } + static ClickHouseConnectionStringBuilder() { var asm = typeof(ClickHouseConnectionStringBuilder).Assembly; @@ -262,7 +277,8 @@ static ClickHouseConnectionStringBuilder() nameof(User), nameof(TlsMode), nameof(RootCertificate), - nameof(ServerCertificateHash) + nameof(ServerCertificateHash), + nameof(ParametersMode) }; } @@ -303,6 +319,7 @@ public ClickHouseConnectionStringBuilder(ClickHouseConnectionSettings settings) TlsMode = settings.TlsMode; RootCertificate = settings.RootCertificate; ServerCertificateHash = HashToString(settings.ServerCertificateHash); + ParametersMode = settings.ParametersMode; if (settings.ClientName != DefaultClientName) ClientName = settings.ClientName; diff --git a/src/Octonica.ClickHouseClient/ClickHouseParameter.cs b/src/Octonica.ClickHouseClient/ClickHouseParameter.cs index a40b070..a8ad0b3 100644 --- a/src/Octonica.ClickHouseClient/ClickHouseParameter.cs +++ b/src/Octonica.ClickHouseClient/ClickHouseParameter.cs @@ -1,5 +1,5 @@ #region License Apache 2.0 -/* Copyright 2019-2021 Octonica +/* Copyright 2019-2022 Octonica * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -283,6 +283,12 @@ public int ArrayRank } } + /// + /// Gets or sets the mode of passing this parameter to the query. The value of this property overrides . + /// + /// The mode of passing this parameter to the query. The default value is . + public ClickHouseParameterMode ParameterMode { get; set; } = ClickHouseParameterMode.Inherit; + /// /// Initializes a new instance of with the default name. /// @@ -358,6 +364,7 @@ public void CopyTo(ClickHouseParameter parameter) parameter.SourceVersion = SourceVersion; parameter._valueTypeInfo = null; + parameter.ParameterMode = ParameterMode; } internal IClickHouseColumnWriter CreateParameterColumnWriter(IClickHouseTypeInfoProvider typeInfoProvider) @@ -520,6 +527,15 @@ private static bool ValidateParameterName(string? parameterName, [MaybeNullWhen( return ParameterNameRegex.IsMatch(id); } + internal ClickHouseParameterMode GetParameterMode(ClickHouseParameterMode inheritParameterMode) + { + var mode = ParameterMode; + if (mode == ClickHouseParameterMode.Inherit) + return inheritParameterMode; + + return mode; + } + internal static string TrimParameterName(string parameterName) { if (parameterName.Length > 0) diff --git a/src/Octonica.ClickHouseClient/ClickHouseParameterMode.cs b/src/Octonica.ClickHouseClient/ClickHouseParameterMode.cs new file mode 100644 index 0000000..6d1cc82 --- /dev/null +++ b/src/Octonica.ClickHouseClient/ClickHouseParameterMode.cs @@ -0,0 +1,56 @@ +#region License Apache 2.0 +/* Copyright 2022 Octonica + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#endregion + +namespace Octonica.ClickHouseClient +{ + /// + /// Specifies the list of available modes of passing parameters to the query. + /// + public enum ClickHouseParameterMode + { + /// + /// The default mode. Currently the default mode is . + /// + Default = 0, + + /// + /// This value indicates that the mode should be inherited. + /// Parameters inherit the mode from a command. + /// A command inherits the mode from a connection. + /// For a connection this value is equivalent to . + /// + Inherit = 1, + + /// + /// This value indicates that parameters should be passed to the query in the binary format. This is the default mode. + /// + /// + /// In this mode parameters will be passed to the query as a table with the single row. Each parameter will be replaced by SELECT subquery. + /// This mode doesn't allow to pass parameters in parts of the query where scalar subqueries are not allowed. + /// + Binary = 2, + + /// + /// This value indicates that parameters should be passed to the query as constant literals. + /// + /// + /// In this mode parameters' values will be interpolated to the query string as constant literals. + /// This mode allows to use parameters in any part of the query where a constant is allowed. + /// + Interpolate = 3 + } +}