Skip to content

Commit

Permalink
feat(server): split worker tags table (#5228)
Browse files Browse the repository at this point in the history
* feat(server): split worker tags table
  • Loading branch information
irenarindos authored Nov 8, 2024
1 parent 49d86a8 commit 12d8898
Show file tree
Hide file tree
Showing 13 changed files with 599 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ alter table server_worker
on update cascade;

drop view server_worker_aggregate;
-- Updated in 99/01_split_worker_tag_tables.up.sql
-- Replaces view created in 52/01_worker_operational_state.up.sql to add the worker local storage state
create view server_worker_aggregate as
with worker_config_tags(worker_id, source, tags) as (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
-- Copyright (c) HashiCorp, Inc.
-- SPDX-License-Identifier: BUSL-1.1

begin;

-- Create new tables for worker config and worker api tags
create table server_worker_config_tag(
worker_id wt_public_id
constraint server_worker_fkey
references server_worker (public_id)
on delete cascade
on update cascade,
key wt_tagpair,
value wt_tagpair,
primary key (worker_id, key, value)
);
comment on table server_worker_config_tag is
'server_worker_config_tag is a table where each row represents a worker config tag.';

create table server_worker_api_tag(
worker_id wt_public_id
constraint server_worker_fkey
references server_worker (public_id)
on delete cascade
on update cascade,
key wt_tagpair,
value wt_tagpair,
primary key (worker_id, key, value)
);
comment on table server_worker_api_tag is
'server_worker_api_tag is a table where each row represents a worker api tag.';

-- Migrate from server_worker_tag to the new tables
insert into server_worker_config_tag
(worker_id, key, value)
select worker_id, key, value
from server_worker_tag
where source = 'configuration';

insert into server_worker_api_tag
(worker_id, key, value)
select worker_id, key, value
from server_worker_tag
where source = 'api';


drop view server_worker_aggregate;
-- Replaces view created in 86/01_server_worker_local_storage_state.up.sql to use the disparate worker tag tables
-- View also switches to using json_agg to build the tags for consumption
-- TODO this view will be completely dropped in future PRs on this LLB in favor of sql in query.go
create view server_worker_aggregate as
with connection_count (worker_id, count) as (
select worker_id,
count(1) as count
from session_connection
where closed_reason is null
group by worker_id
)
select w.public_id,
w.scope_id,
w.description,
w.name,
w.address,
w.create_time,
w.update_time,
w.version,
w.last_status_time,
w.type,
w.release_version,
w.operational_state,
w.local_storage_state,
cc.count as active_connection_count,
wt.tags as api_tags,
ct.tags as worker_config_tags
from server_worker w
left join (select worker_id, json_agg(json_build_object('key', key, 'value', value)) as tags from server_worker_api_tag group by worker_id) wt
on w.public_id = wt.worker_id
left join (select worker_id, json_agg(json_build_object('key', key, 'value', value)) as tags from server_worker_config_tag group by worker_id) ct
on w.public_id = ct.worker_id
left join connection_count as cc
on w.public_id = cc.worker_id;
comment on view server_worker_aggregate is
'server_worker_aggregate contains the worker resource with its worker provided config values and its configuration and api provided tags.';

-- Drop the old tables
drop table server_worker_tag;
drop table server_worker_tag_enm;

commit;
126 changes: 126 additions & 0 deletions internal/db/schema/migrations/oss/postgres_99_01_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package oss_test

import (
"context"
"testing"

"github.com/hashicorp/boundary/internal/db/common"
"github.com/hashicorp/boundary/internal/db/schema"
"github.com/hashicorp/boundary/testing/dbtest"
"github.com/stretchr/testify/require"
)

const (
makeWorkerQuery = "insert into server_worker (public_id, scope_id, type) values ($1, 'global', 'pki')"
insertWorkerTagsQuery = "insert into server_worker_tag (worker_id, key, value, source) values ($1, $2, $3, $4)"
selectWorkerApiTagsQuery = "select key, value from server_worker_api_tag where worker_id = $1"
selectWorkerConfigTagsQuery = "select key, value from server_worker_config_tag where worker_id = $1"
)

type testWorkerTags struct {
Key string
Value string
}

func Test_WorkerTagTableSplit(t *testing.T) {
t.Parallel()
require := require.New(t)
const priorMigration = 92001
const serverEnumMigration = 99001
dialect := dbtest.Postgres
ctx := context.Background()

c, u, _, err := dbtest.StartUsingTemplate(dialect, dbtest.WithTemplate(dbtest.Template1))
require.NoError(err)
t.Cleanup(func() {
require.NoError(c())
})
d, err := common.SqlOpen(dialect, u)
require.NoError(err)

// migration to the prior migration (before the one we want to test)
m, err := schema.NewManager(ctx, schema.Dialect(dialect), d, schema.WithEditions(
schema.TestCreatePartialEditions(schema.Dialect(dialect), schema.PartialEditions{"oss": priorMigration}),
))
require.NoError(err)

_, err = m.ApplyMigrations(ctx)
require.NoError(err)
state, err := m.CurrentState(ctx)
require.NoError(err)
want := &schema.State{
Initialized: true,
Editions: []schema.EditionState{
{
Name: "oss",
BinarySchemaVersion: priorMigration,
DatabaseSchemaVersion: priorMigration,
DatabaseSchemaState: schema.Equal,
},
},
}
require.Equal(want, state)

// Seed the data
execResult, err := d.ExecContext(ctx, makeWorkerQuery, "test-worker")
require.NoError(err)
rowsAffected, err := execResult.RowsAffected()
require.NoError(err)
require.Equal(int64(1), rowsAffected)
execResult, err = d.ExecContext(ctx, insertWorkerTagsQuery, "test-worker", "key1", "value1", "api")
require.NoError(err)
rowsAffected, err = execResult.RowsAffected()
require.NoError(err)
require.Equal(int64(1), rowsAffected)
execResult, err = d.ExecContext(ctx, insertWorkerTagsQuery, "test-worker", "key2", "value2", "configuration")
require.NoError(err)
rowsAffected, err = execResult.RowsAffected()
require.NoError(err)
require.Equal(int64(1), rowsAffected)

// now we're ready for the migration we want to test.
m, err = schema.NewManager(ctx, schema.Dialect(dialect), d, schema.WithEditions(
schema.TestCreatePartialEditions(schema.Dialect(dialect), schema.PartialEditions{"oss": serverEnumMigration}),
))
require.NoError(err)

_, err = m.ApplyMigrations(ctx)
require.NoError(err)
state, err = m.CurrentState(ctx)
require.NoError(err)

want = &schema.State{
Initialized: true,
Editions: []schema.EditionState{
{
Name: "oss",
BinarySchemaVersion: serverEnumMigration,
DatabaseSchemaVersion: serverEnumMigration,
DatabaseSchemaState: schema.Equal,
},
},
}
require.Equal(want, state)

// Check that the tags have been moved to the new tables
apiTags := new(testWorkerTags)
row := d.QueryRowContext(ctx, selectWorkerApiTagsQuery, "test-worker")
require.NoError(row.Scan(
&apiTags.Key,
&apiTags.Value,
))
require.Equal("key1", apiTags.Key)
require.Equal("value1", apiTags.Value)

configTags := new(testWorkerTags)
row = d.QueryRowContext(ctx, selectWorkerConfigTagsQuery, "test-worker")
require.NoError(row.Scan(
&configTags.Key,
&configTags.Value,
))
require.Equal("key2", configTags.Key)
require.Equal("value2", configTags.Value)
}
35 changes: 35 additions & 0 deletions internal/proto/controller/storage/servers/store/v1/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ message Worker {

// WorkerTag is a tag for a worker. The primary key is comprised of the
// worker_id, key, value, and source.
// WorkerTag is deprecated- use ApiTag and ConfigTag instead.
message WorkerTag {
option deprecated = true;

// worker_id is the public key that key of the worker this tag is for.
// @inject_tag: `gorm:"primary_key"`
string worker_id = 10;
Expand All @@ -98,6 +101,38 @@ message WorkerTag {
string source = 40;
}

// ApiTag is an API tag for a worker. The primary key is comprised of the
// worker_id, key, and value
message ApiTag {
// worker_id is the public key that key of the worker this tag is for.
// @inject_tag: `gorm:"primary_key"`
string worker_id = 10;

// key is the key of the tag. This must be set.
// @inject_tag: `gorm:"primary_key"`
string key = 20;

// value is the value
// @inject_tag: `gorm:"primary_key"`
string value = 30;
}

// ConfigTag is a configuration tag for a worker. The primary key is comprised of the
// worker_id, key, and value
message ConfigTag {
// worker_id is the public key that key of the worker this tag is for.
// @inject_tag: `gorm:"primary_key"`
string worker_id = 10;

// key is the key of the tag. This must be set.
// @inject_tag: `gorm:"primary_key"`
string key = 20;

// value is the value
// @inject_tag: `gorm:"primary_key"`
string value = 30;
}

// WorkerStorageBucketCredentialState is a state for a storage bucket credential for a worker.
// The primary key is comprised of the worker_id, storage_bucket_credential_id, permission_type.
message WorkerStorageBucketCredentialState {
Expand Down
16 changes: 10 additions & 6 deletions internal/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ const (

deleteWhereCreateTimeSql = `create_time < ?`

deleteTagsByWorkerIdSql = `
deleteApiTagsByWorkerIdSql = `
delete
from server_worker_tag
where
source = ?
and
worker_id = ?`
from server_worker_api_tag
where worker_id = ?
`

deleteConfigTagsByWorkerIdSql = `
delete
from server_worker_config_tag
where worker_id = ?
`

deleteWorkerAuthQuery = `
delete from worker_auth_authorized
Expand Down
Loading

0 comments on commit 12d8898

Please sign in to comment.