diff --git a/server/ControlPlane/Buffers/BufferManager.cs b/server/ControlPlane/Buffers/BufferManager.cs index 64e1dce6..79d75553 100644 --- a/server/ControlPlane/Buffers/BufferManager.cs +++ b/server/ControlPlane/Buffers/BufferManager.cs @@ -11,6 +11,7 @@ namespace Tyger.ControlPlane.Buffers; public sealed partial class BufferManager { + private const int MaxTags = 100; private readonly Repository _repository; private readonly IBufferProvider _bufferProvider; private readonly IEphemeralBufferProvider _ephemeralBufferProvider; @@ -28,26 +29,23 @@ public async Task CreateBuffer(Buffer newBuffer, CancellationToken cance { if (newBuffer.Tags != null) { - string keyPattern = @"^[a-zA-Z0-9-_.]{1,128}$"; - string valuePattern = @"^[a-zA-Z0-9-_.]{0,256}$"; + if (newBuffer.Tags.Count > MaxTags) + { + throw new ValidationException($"No more than {MaxTags} tags can be set on a buffer"); + } foreach (var tag in newBuffer.Tags) { - if (!Regex.IsMatch(tag.Key, keyPattern)) + if (!TagKeyRegex().IsMatch(tag.Key)) { throw new ValidationException("Tag keys must contain up to 128 letters (a-z, A-Z), numbers (0-9) and underscores (_)"); } - if (!Regex.IsMatch(tag.Value, valuePattern)) + if (!TagValueRegex().IsMatch(tag.Value)) { throw new ValidationException("Tag values can contain up to 256 letters (a-z, A-Z), numbers (0-9) and underscores (_)"); } } - - if (newBuffer.Tags.Count > 10) - { - throw new ValidationException("Only 10 tags can be set on a buffer"); - } } string id = UniqueId.Create(); @@ -151,4 +149,10 @@ public async Task ImportBuffers(CancellationToken cancellationToken) [GeneratedRegex(@"^(?(run-(?\d+)-)?temp-)?(?\w+)$")] private static partial Regex BufferIdRegex(); + + [GeneratedRegex(@"^[a-zA-Z0-9-_.]{1,128}$")] + private static partial Regex TagKeyRegex(); + + [GeneratedRegex(@"^[a-zA-Z0-9-_.]{0,256}$")] + private static partial Regex TagValueRegex(); } diff --git a/server/ControlPlane/Database/Repository.cs b/server/ControlPlane/Database/Repository.cs index 6e528d10..4db18fcc 100644 --- a/server/ControlPlane/Database/Repository.cs +++ b/server/ControlPlane/Database/Repository.cs @@ -1227,66 +1227,67 @@ UPDATE buffers await reader.ReadAsync(cancellationToken); } - // Delete old tags - using var deleteCommand = new NpgsqlCommand - { - Connection = connection, - Transaction = tx, - CommandText = """ - DELETE FROM buffer_tags WHERE - id = $1 AND created_at = $2 - """, - Parameters = - { - new() { Value = id, NpgsqlDbType = NpgsqlDbType.Text }, - new() { Value = createdAt, NpgsqlDbType = NpgsqlDbType.TimestampTz }, - } - }; - - await deleteCommand.PrepareAsync(cancellationToken); - await deleteCommand.ExecuteNonQueryAsync(cancellationToken); - - if (tags != null) - { - // Add the new tags - foreach (var tag in tags) - { - await InsertTag(tx, id, createdAt, tag, cancellationToken); - } - } + await UpsertTags(tx, id, createdAt, true, tags); await tx.CommitAsync(cancellationToken); return new Buffer() { Id = id, ETag = newETag, CreatedAt = createdAt, Tags = tags }; }, cancellationToken); } - private static async Task InsertTag(NpgsqlTransaction tx, string id, DateTimeOffset createdAt, KeyValuePair tag, CancellationToken cancellationToken) + private static async Task UpsertTags(NpgsqlTransaction tx, string bufferId, DateTimeOffset createdAt, bool update, IDictionary? tags) { - using var insertTagCommand = new NpgsqlCommand + var batch = new NpgsqlBatch(connection: tx.Connection, transaction: tx); + if (update) { - Connection = tx.Connection, - Transaction = tx, - CommandText = """ - WITH INS AS (INSERT INTO tag_keys (name) VALUES ($4) ON CONFLICT DO NOTHING RETURNING id) - INSERT INTO buffer_tags (id, created_at, key, value) - (SELECT $1, $2, id, $3 FROM INS UNION - SELECT $1, $2, tag_keys.id, $3 FROM tag_keys WHERE name = $4) - """, + batch.BatchCommands.Add(new($""" + DELETE FROM buffer_tags + WHERE id = $1 and created_at = $2 + """) + { + Parameters = + { + new() { Value = bufferId, NpgsqlDbType = NpgsqlDbType.Text }, + new() { Value = createdAt, NpgsqlDbType = NpgsqlDbType.TimestampTz } + }, + }); + } + + if (tags is null || tags.Count == 0) + { + return; + } + batch.BatchCommands.Add(new($""" + WITH temp_tags AS ( + SELECT * FROM UNNEST($1::text[], $2::text[]) AS t(key, value) + ), + inserted_keys AS ( + INSERT INTO tag_keys (name) + SELECT DISTINCT key FROM temp_tags + ON CONFLICT (name) DO NOTHING + RETURNING id, name + ), + all_keys AS ( + SELECT id, name FROM inserted_keys + UNION + SELECT id, name FROM tag_keys WHERE name IN (SELECT key FROM temp_tags) + ) + INSERT INTO buffer_tags (id, created_at, key, value) + SELECT $3, $4, all_keys.id, temp_tags.value + FROM temp_tags + JOIN all_keys ON all_keys.name = temp_tags.key; + """) + { Parameters = { - new() { Value = id, NpgsqlDbType = NpgsqlDbType.Text }, + new() { Value = tags.Keys.ToArray(), NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Text }, + new() { Value = tags.Values.ToArray(), NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Text }, + new() { Value = bufferId, NpgsqlDbType = NpgsqlDbType.Text }, new() { Value = createdAt, NpgsqlDbType = NpgsqlDbType.TimestampTz }, - new() { Value = tag.Value, NpgsqlDbType = NpgsqlDbType.Text }, - new() { Value = tag.Key, NpgsqlDbType = NpgsqlDbType.Text }, } - }; + }); - await insertTagCommand.PrepareAsync(cancellationToken); - if (await insertTagCommand.ExecuteNonQueryAsync(cancellationToken) != 1) - { - throw new InvalidOperationException("Failed to insert tag: incorrect number of rows inserted"); - } + await batch.ExecuteNonQueryAsync(); } public async Task CreateRunWithIdempotencyKeyGuard(Run newRun, string idempotencyKey, Func> createRun, CancellationToken cancellationToken) @@ -1374,13 +1375,7 @@ RETURNING created_at buffer = buffer with { CreatedAt = reader.GetDateTime(0), ETag = eTag }; } - if (buffer.Tags != null) - { - foreach (var tag in buffer.Tags) - { - await InsertTag(tx, buffer.Id, buffer.CreatedAt, tag, cancellationToken); - } - } + await UpsertTags(tx, newBuffer.Id, buffer.CreatedAt, false, newBuffer.Tags); await tx.CommitAsync(cancellationToken); return buffer;