Skip to content

Commit

Permalink
fix(connections): fixed issues where connections were not being closed
Browse files Browse the repository at this point in the history
  • Loading branch information
roehlerw committed Feb 15, 2021
1 parent 37b9423 commit fd7bf8b
Show file tree
Hide file tree
Showing 15 changed files with 426 additions and 332 deletions.
84 changes: 45 additions & 39 deletions PluginMySQL/API/Discover/GetAllSchemas.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,58 +39,64 @@ WHERE t.TABLE_SCHEMA NOT IN ('mysql', 'information_schema', 'performance_schema'
public static async IAsyncEnumerable<Schema> GetAllSchemas(IConnectionFactory connFactory, int sampleSize = 5)
{
var conn = connFactory.GetConnection();
await conn.OpenAsync();

var cmd = connFactory.GetCommand(GetAllTablesAndColumnsQuery, conn);
var reader = await cmd.ExecuteReaderAsync();

Schema schema = null;
var currentSchemaId = "";
while (await reader.ReadAsync())
try
{
var schemaId =
$"{Utility.Utility.GetSafeName(reader.GetValueById(TableSchema).ToString(), '`')}.{Utility.Utility.GetSafeName(reader.GetValueById(TableName).ToString(), '`')}";
if (schemaId != currentSchemaId)
await conn.OpenAsync();

var cmd = connFactory.GetCommand(GetAllTablesAndColumnsQuery, conn);
var reader = await cmd.ExecuteReaderAsync();

Schema schema = null;
var currentSchemaId = "";
while (await reader.ReadAsync())
{
// return previous schema
if (schema != null)
var schemaId =
$"{Utility.Utility.GetSafeName(reader.GetValueById(TableSchema).ToString(), '`')}.{Utility.Utility.GetSafeName(reader.GetValueById(TableName).ToString(), '`')}";
if (schemaId != currentSchemaId)
{
// get sample and count
yield return await AddSampleAndCount(connFactory, schema, sampleSize);
// return previous schema
if (schema != null)
{
// get sample and count
yield return await AddSampleAndCount(connFactory, schema, sampleSize);
}

// start new schema
currentSchemaId = schemaId;
var parts = DecomposeSafeName(currentSchemaId).TrimEscape();
schema = new Schema
{
Id = currentSchemaId,
Name = $"{parts.Schema}.{parts.Table}",
Properties = { },
DataFlowDirection = Schema.Types.DataFlowDirection.Read
};
}

// start new schema
currentSchemaId = schemaId;
var parts = DecomposeSafeName(currentSchemaId).TrimEscape();
schema = new Schema
// add column to schema
var property = new Property
{
Id = currentSchemaId,
Name = $"{parts.Schema}.{parts.Table}",
Properties = { },
DataFlowDirection = Schema.Types.DataFlowDirection.Read
Id = $"`{reader.GetValueById(ColumnName)}`",
Name = reader.GetValueById(ColumnName).ToString(),
IsKey = reader.GetValueById(ColumnKey).ToString() == "PRI",
IsNullable = reader.GetValueById(IsNullable).ToString() == "YES",
Type = GetType(reader.GetValueById(DataType).ToString()),
TypeAtSource = GetTypeAtSource(reader.GetValueById(DataType).ToString(),
reader.GetValueById(CharacterMaxLength))
};
schema?.Properties.Add(property);
}

// add column to schema
var property = new Property
if (schema != null)
{
Id = $"`{reader.GetValueById(ColumnName)}`",
Name = reader.GetValueById(ColumnName).ToString(),
IsKey = reader.GetValueById(ColumnKey).ToString() == "PRI",
IsNullable = reader.GetValueById(IsNullable).ToString() == "YES",
Type = GetType(reader.GetValueById(DataType).ToString()),
TypeAtSource = GetTypeAtSource(reader.GetValueById(DataType).ToString(),
reader.GetValueById(CharacterMaxLength))
};
schema?.Properties.Add(property);
// get sample and count
yield return await AddSampleAndCount(connFactory, schema, sampleSize);
}
}

await conn.CloseAsync();

if (schema != null)
finally
{
// get sample and count
yield return await AddSampleAndCount(connFactory, schema, sampleSize);
await conn.CloseAsync();
}
}

Expand Down
42 changes: 24 additions & 18 deletions PluginMySQL/API/Discover/GetCountOfRecords.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,35 @@ public static async Task<Count> GetCountOfRecords(IConnectionFactory connFactory
}

var conn = connFactory.GetConnection();
await conn.OpenAsync();

var cmd = connFactory.GetCommand($"SELECT COUNT(*) as count FROM ({query}) as q", conn);
var reader = await cmd.ExecuteReaderAsync();

var count = -1;
while (await reader.ReadAsync())

try
{
count = Convert.ToInt32(reader.GetValueById("count"));
}
await conn.OpenAsync();

await conn.CloseAsync();
var cmd = connFactory.GetCommand($"SELECT COUNT(*) as count FROM ({query}) as q", conn);
var reader = await cmd.ExecuteReaderAsync();

return count == -1
? new Count
var count = -1;
while (await reader.ReadAsync())
{
Kind = Count.Types.Kind.Unavailable,
count = Convert.ToInt32(reader.GetValueById("count"));
}
: new Count
{
Kind = Count.Types.Kind.Exact,
Value = count
};

return count == -1
? new Count
{
Kind = Count.Types.Kind.Unavailable,
}
: new Count
{
Kind = Count.Types.Kind.Exact,
Value = count
};
}
finally
{
await conn.CloseAsync();
}
}
}
}
61 changes: 34 additions & 27 deletions PluginMySQL/API/Discover/GetRefreshSchemaForTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,44 @@ public static async Task<Schema> GetRefreshSchemaForTable(IConnectionFactory con
? connFactory.GetConnection()
: connFactory.GetConnection(decomposed.Database);

await conn.OpenAsync();
try
{
await conn.OpenAsync();

var cmd = connFactory.GetCommand(
string.Format(GetTableAndColumnsQuery, decomposed.Schema, decomposed.Table), conn);
var reader = await cmd.ExecuteReaderAsync();
var refreshProperties = new List<Property>();
var cmd = connFactory.GetCommand(
string.Format(GetTableAndColumnsQuery, decomposed.Schema, decomposed.Table), conn);
var reader = await cmd.ExecuteReaderAsync();
var refreshProperties = new List<Property>();

while (await reader.ReadAsync())
{
// add column to refreshProperties
var property = new Property
while (await reader.ReadAsync())
{
Id = Utility.Utility.GetSafeName(reader.GetValueById(ColumnName).ToString(), '`'),
Name = reader.GetValueById(ColumnName).ToString(),
IsKey = reader.GetValueById(ColumnKey).ToString() == "PRI",
IsNullable = reader.GetValueById(IsNullable).ToString() == "YES",
Type = GetType(reader.GetValueById(DataType).ToString()),
TypeAtSource = GetTypeAtSource(reader.GetValueById(DataType).ToString(),
reader.GetValueById(CharacterMaxLength))
};
refreshProperties.Add(property);
// add column to refreshProperties
var property = new Property
{
Id = Utility.Utility.GetSafeName(reader.GetValueById(ColumnName).ToString(), '`'),
Name = reader.GetValueById(ColumnName).ToString(),
IsKey = reader.GetValueById(ColumnKey).ToString() == "PRI",
IsNullable = reader.GetValueById(IsNullable).ToString() == "YES",
Type = GetType(reader.GetValueById(DataType).ToString()),
TypeAtSource = GetTypeAtSource(reader.GetValueById(DataType).ToString(),
reader.GetValueById(CharacterMaxLength))
};
refreshProperties.Add(property);
}

// add properties
schema.Properties.Clear();
schema.Properties.AddRange(refreshProperties);



// get sample and count
return await AddSampleAndCount(connFactory, schema, sampleSize);
}
finally
{
await conn.CloseAsync();
}

// add properties
schema.Properties.Clear();
schema.Properties.AddRange(refreshProperties);

await conn.CloseAsync();

// get sample and count
return await AddSampleAndCount(connFactory, schema, sampleSize);
}

private static DecomposeResponse DecomposeSafeName(string schemaId)
Expand Down
100 changes: 53 additions & 47 deletions PluginMySQL/API/Discover/GetRefreshSchemas.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,66 +14,72 @@ public static async IAsyncEnumerable<Schema> GetRefreshSchemas(IConnectionFactor
RepeatedField<Schema> refreshSchemas, int sampleSize = 5)
{
var conn = connFactory.GetConnection();
await conn.OpenAsync();

foreach (var schema in refreshSchemas)

try
{
if (string.IsNullOrWhiteSpace(schema.Query))
{
yield return await GetRefreshSchemaForTable(connFactory, schema, sampleSize);
continue;
}
await conn.OpenAsync();

var cmd = connFactory.GetCommand(schema.Query, conn);
foreach (var schema in refreshSchemas)
{
if (string.IsNullOrWhiteSpace(schema.Query))
{
yield return await GetRefreshSchemaForTable(connFactory, schema, sampleSize);
continue;
}

var reader = await cmd.ExecuteReaderAsync();
var schemaTable = reader.GetSchemaTable();
var cmd = connFactory.GetCommand(schema.Query, conn);

var properties = new List<Property>();
if (schemaTable != null)
{
var unnamedColIndex = 0;
var reader = await cmd.ExecuteReaderAsync();
var schemaTable = reader.GetSchemaTable();

// get each column and create a property for the column
foreach (DataRow row in schemaTable.Rows)
var properties = new List<Property>();
if (schemaTable != null)
{
// get the column name
var colName = row["ColumnName"].ToString();
if (string.IsNullOrWhiteSpace(colName))
{
colName = $"UNKNOWN_{unnamedColIndex}";
unnamedColIndex++;
}
var unnamedColIndex = 0;

// create property
var property = new Property
// get each column and create a property for the column
foreach (DataRow row in schemaTable.Rows)
{
Id = Utility.Utility.GetSafeName(colName, '`'),
Name = colName,
Description = "",
Type = GetPropertyType(row),
// TypeAtSource = row["DataType"].ToString(),
IsKey = Boolean.Parse(row["IsKey"].ToString()),
IsNullable = Boolean.Parse(row["AllowDBNull"].ToString()),
IsCreateCounter = false,
IsUpdateCounter = false,
PublisherMetaJson = ""
};
// get the column name
var colName = row["ColumnName"].ToString();
if (string.IsNullOrWhiteSpace(colName))
{
colName = $"UNKNOWN_{unnamedColIndex}";
unnamedColIndex++;
}

// create property
var property = new Property
{
Id = Utility.Utility.GetSafeName(colName, '`'),
Name = colName,
Description = "",
Type = GetPropertyType(row),
// TypeAtSource = row["DataType"].ToString(),
IsKey = Boolean.Parse(row["IsKey"].ToString()),
IsNullable = Boolean.Parse(row["AllowDBNull"].ToString()),
IsCreateCounter = false,
IsUpdateCounter = false,
PublisherMetaJson = ""
};

// add property to properties
properties.Add(property);
// add property to properties
properties.Add(property);
}
}
}

// add only discovered properties to schema
schema.Properties.Clear();
schema.Properties.AddRange(properties);
// add only discovered properties to schema
schema.Properties.Clear();
schema.Properties.AddRange(properties);

// get sample and count
yield return await AddSampleAndCount(connFactory, schema, sampleSize);
// get sample and count
yield return await AddSampleAndCount(connFactory, schema, sampleSize);
}
}
finally
{
await conn.CloseAsync();
}

await conn.CloseAsync();
}
}
}
Loading

0 comments on commit fd7bf8b

Please sign in to comment.