Skip to content

Commit

Permalink
Load string parameters column sizes to reduce SqlServer query plans c…
Browse files Browse the repository at this point in the history
…ount (#197)

* Load string parameters column sizes to reduce SqlServer query plans count

* Updated to include column size loading logic

* Added constant size support for query executors

* Added HOCON configuration and documentation for that

* Fixed typo

* Updated method name according to plugin

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
IgorFedchenko and Aaronontheweb authored May 12, 2021
1 parent ee613a9 commit fd501c9
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,5 @@ resetdev.bat

# FAKE build folder
.fake/

.idea/
87 changes: 87 additions & 0 deletions src/Akka.Persistence.SqlServer/Helpers/ColumnSizeLoader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// //-----------------------------------------------------------------------
// // <copyright file="ColumnSizeLoader.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Linq;
using Akka.Persistence.Sql.Common.Journal;

namespace Akka.Persistence.SqlServer.Helpers
{
/// <summary>
/// Helper class that can load column sizes from SqlServer table
/// </summary>
internal static class ColumnSizeLoader
{
/// <summary>
/// Loads column sizes for Journal
/// </summary>
/// <returns></returns>
public static JournalColumnSizesInfo LoadJournalColumnSizes(QueryConfiguration conventions, DbConnection connection)
{
using (var command = connection.CreateCommand())
{
connection.Open();
command.CommandText = $"SELECT * FROM {conventions.FullJournalTableName}";

// start reading - no need to load the table's content
using (var reader = command.ExecuteReader())
{
// load columns metadata
var results = LoadSchemaTableInfo(reader);

return new JournalColumnSizesInfo(
persistenceIdColumnSize: (int)results.First(r => r["ColumnName"].ToString() == conventions.PersistenceIdColumnName)["ColumnSize"],
tagsColumnSize: (int)results.First(r => r["ColumnName"].ToString() == conventions.TagsColumnName)["ColumnSize"],
manifestColumnSize: (int)results.First(r => r["ColumnName"].ToString() == conventions.ManifestColumnName)["ColumnSize"]
);
}
}
}

public static SnapshotColumnSizesInfo LoadSnapshotColumnSizes(Sql.Common.Snapshot.QueryConfiguration conventions, DbConnection connection)
{
using (var command = connection.CreateCommand())
{
connection.Open();
command.CommandText = $"SELECT * FROM {conventions.FullSnapshotTableName}";

// start reading - no need to load the table's content
using (var reader = command.ExecuteReader())
{
// load columns metadata
var results = LoadSchemaTableInfo(reader);

return new SnapshotColumnSizesInfo(
persistenceIdColumnSize: (int)results.First(r => r["ColumnName"].ToString() == conventions.PersistenceIdColumnName)["ColumnSize"],
manifestColumnSize: (int)results.First(r => r["ColumnName"].ToString() == conventions.ManifestColumnName)["ColumnSize"]
);
}
}
}

private static List<Dictionary<string, object>> LoadSchemaTableInfo(DbDataReader reader)
{
var results = new List<Dictionary<string, object>>();

// iterate through the table schema and extract metadata
DataTable schemaTable = reader.GetSchemaTable();
foreach (DataRow row in schemaTable.Rows)
{
var dict = new Dictionary<string, object>();
foreach (DataColumn col in schemaTable.Columns)
{
dict.Add(col.ColumnName, row[col.Ordinal]);
}
results.Add(dict);
}

return results;
}
}
}
35 changes: 35 additions & 0 deletions src/Akka.Persistence.SqlServer/Helpers/JournalColumnSizesInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// //-----------------------------------------------------------------------
// // <copyright file="ColumnSizesInfo.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

namespace Akka.Persistence.SqlServer.Helpers
{
/// <summary>
/// Represents information about SQL Journal Column sizes
/// </summary>
internal class JournalColumnSizesInfo
{
public JournalColumnSizesInfo(int persistenceIdColumnSize, int tagsColumnSize, int manifestColumnSize)
{
PersistenceIdColumnSize = persistenceIdColumnSize;
TagsColumnSize = tagsColumnSize;
ManifestColumnSize = manifestColumnSize;
}

/// <summary>
/// Size of PersistenceId column
/// </summary>
public int PersistenceIdColumnSize { get; }
/// <summary>
/// Size of Tags column
/// </summary>
public int TagsColumnSize { get; }
/// <summary>
/// Size of manifest column
/// </summary>
public int ManifestColumnSize { get; }
}
}
30 changes: 30 additions & 0 deletions src/Akka.Persistence.SqlServer/Helpers/SnapshotColumnSizesInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// //-----------------------------------------------------------------------
// // <copyright file="SnapshotColumnSizesInfo.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

namespace Akka.Persistence.SqlServer.Helpers
{
/// <summary>
/// Represents information about SQL SnapshotStore Column sizes
/// </summary>
internal class SnapshotColumnSizesInfo
{
public SnapshotColumnSizesInfo(int persistenceIdColumnSize, int manifestColumnSize)
{
PersistenceIdColumnSize = persistenceIdColumnSize;
ManifestColumnSize = manifestColumnSize;
}

/// <summary>
/// Size of PersistenceId column
/// </summary>
public int PersistenceIdColumnSize { get; }
/// <summary>
/// Size of manifest column
/// </summary>
public int ManifestColumnSize { get; }
}
}
57 changes: 55 additions & 2 deletions src/Akka.Persistence.SqlServer/Journal/BatchingSqlServerJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Data;
using System.Data.Common;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Persistence.SqlServer.Helpers;
using Akka.Util;

namespace Akka.Persistence.SqlServer.Journal
{
public sealed class BatchingSqlServerJournalSetup : BatchingSqlJournalSetup
{
public bool UseConstantParameterSize { get; }

public BatchingSqlServerJournalSetup(Config config) : base(
config,
new QueryConfiguration(
Expand All @@ -35,6 +42,7 @@ public BatchingSqlServerJournalSetup(Config config) : base(
config.GetString("serializer", null),
config.GetBoolean("sequential-access", false)))
{
UseConstantParameterSize = config.GetBoolean("use-constant-parameter-size", false);
}

public BatchingSqlServerJournalSetup(
Expand All @@ -48,7 +56,8 @@ public BatchingSqlServerJournalSetup(
CircuitBreakerSettings circuitBreakerSettings,
ReplayFilterSettings replayFilterSettings,
QueryConfiguration namingConventions,
string defaultSerialzier)
string defaultSerializer,
bool useConstantParameterSize)
: base(
connectionString: connectionString,
maxConcurrentOperations: maxConcurrentOperations,
Expand All @@ -60,18 +69,24 @@ public BatchingSqlServerJournalSetup(
circuitBreakerSettings: circuitBreakerSettings,
replayFilterSettings: replayFilterSettings,
namingConventions: namingConventions,
defaultSerializer: defaultSerialzier)
defaultSerializer: defaultSerializer)
{
UseConstantParameterSize = useConstantParameterSize;
}
}

public class BatchingSqlServerJournal : BatchingSqlJournal<SqlConnection, SqlCommand>
{
private Option<JournalColumnSizesInfo> _columnSizes = Option<JournalColumnSizesInfo>.None;
private readonly bool _useConstantParameterSize;

public BatchingSqlServerJournal(Config config) : this(new BatchingSqlServerJournalSetup(config))
{ }

public BatchingSqlServerJournal(BatchingSqlServerJournalSetup setup) : base(setup)
{
_useConstantParameterSize = setup.UseConstantParameterSize;

var connectionTimeoutSeconds =
new SqlConnectionStringBuilder(setup.ConnectionString).ConnectTimeout;
var commandTimeout = setup.ConnectionTimeout;
Expand Down Expand Up @@ -169,5 +184,43 @@ protected override SqlConnection CreateConnection(string connectionString)
{
return new SqlConnection(connectionString);
}

/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();

// if need to use constant parameters, prepare column sizes for parameter generation
if (_useConstantParameterSize)
{
using (var connection = CreateConnection(Setup.ConnectionString))
{
_columnSizes = ColumnSizeLoader.LoadJournalColumnSizes(Setup.NamingConventions, connection);
}
}
}

/// <inheritdoc />
protected override void PreAddParameterToCommand(SqlCommand command, DbParameter param)
{
if (!_columnSizes.HasValue)
return;

// if column sizes are loaded, use them to define constant parameter size values
switch (param.ParameterName)
{
case "@PersistenceId":
param.Size = _columnSizes.Value.PersistenceIdColumnSize;
break;

case "@Tag":
param.Size = _columnSizes.Value.TagsColumnSize;
break;

case "@Manifest":
param.Size = _columnSizes.Value.ManifestColumnSize;
break;
}
}
}
}
23 changes: 22 additions & 1 deletion src/Akka.Persistence.SqlServer/Journal/SqlServerJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@
using System;
using System.Data.Common;
using System.Data.SqlClient;
using Akka.Actor.Setup;
using Akka.Configuration;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Persistence.SqlServer.Helpers;

namespace Akka.Persistence.SqlServer.Journal
{
public class SqlServerJournal : SqlJournal
{
private readonly bool _useConstantParameterSize;
public static readonly SqlServerPersistence Extension = SqlServerPersistence.Get(Context.System);

public SqlServerJournal(Config journalConfig) : base(journalConfig)
{

var config = journalConfig.WithFallback(Extension.DefaultJournalConfig);

_useConstantParameterSize = config.GetBoolean("use-constant-parameter-size", false);

var connectionTimeoutSeconds =
new SqlConnectionStringBuilder(
config.GetString("connection-string")).ConnectTimeout;
Expand Down Expand Up @@ -64,5 +69,21 @@ protected override DbConnection CreateDbConnection(string connectionString)
{
return new SqlConnection(connectionString);
}

/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();

// if constant parameter sizes required, provide column sizes to query executor
if (_useConstantParameterSize)
{
using (var connection = CreateDbConnection(GetConnectionString()))
{
var columnSizes = ColumnSizeLoader.LoadJournalColumnSizes(QueryExecutor.Configuration, connection);
(QueryExecutor as SqlServerQueryExecutor)?.SetColumnSizes(columnSizes);
}
}
}
}
}
41 changes: 39 additions & 2 deletions src/Akka.Persistence.SqlServer/Journal/SqlServerQueryExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@
using System.Data.Common;
using System.Data.SqlClient;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Persistence.SqlServer.Helpers;
using Akka.Util;

namespace Akka.Persistence.SqlServer.Journal
{
public class SqlServerQueryExecutor : AbstractQueryExecutor
{
public SqlServerQueryExecutor(QueryConfiguration configuration, Akka.Serialization.Serialization serialization,
private Option<JournalColumnSizesInfo> _columnSizes = Option<JournalColumnSizesInfo>.None;

public SqlServerQueryExecutor(
QueryConfiguration configuration,
Akka.Serialization.Serialization serialization,
ITimestampProvider timestampProvider)
: base(configuration, serialization, timestampProvider)
{
Expand Down Expand Up @@ -95,7 +101,38 @@ IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{

protected override DbCommand CreateCommand(DbConnection connection)
{
return new SqlCommand {Connection = (SqlConnection) connection};
return new SqlCommand { Connection = (SqlConnection) connection };
}

/// <summary>
/// Sets column sizes loaded from db schema, so that constant parameter sizes could be set during parameter generation
/// </summary>
internal void SetColumnSizes(JournalColumnSizesInfo columnSizesInfo)
{
_columnSizes = columnSizesInfo;
}

/// <inheritdoc />
protected override void PreAddParameterToCommand(DbCommand command, DbParameter param)
{
if (!_columnSizes.HasValue)
return;

// if column sizes are loaded, use them to define constant parameter size values
switch (param.ParameterName)
{
case "@PersistenceId":
param.Size = _columnSizes.Value.PersistenceIdColumnSize;
break;

case "@Tag":
param.Size = _columnSizes.Value.TagsColumnSize;
break;

case "@Manifest":
param.Size = _columnSizes.Value.ManifestColumnSize;
break;
}
}
}
}
Loading

0 comments on commit fd501c9

Please sign in to comment.