Skip to content

Commit 448eea6

Browse files
craig[bot]stevendannayuzefovich
committed
154436: rangefeed: defer unregistration until stream is removed from manager r=wenyihu6 a=stevendanna Unregistering the registration from the processor has the side-effect of closing the underlying memory budget. We don't want that to happen until all of the events have been cleared from the queue. Here, we pass the unregistration callback up via the Disconnector and then the stream manager calls it when removing the stream. Epic: none Release note: None 156112: sql/rowexec: enable tests under secondary tenants r=yuzefovich a=yuzefovich Given the modifications in recent commits, this mostly required plumbing the right codec into `eval.Context`. Additionally, a few tests needed some extra adjustments for the tenant prefix present in the spans. Only `TestWriteResumeSpan` is still skipped since it wasn't immediately clear what's wrong there. Epic: CRDB-48945 Release note: None 156309: *: miscellaneous test-tenant-related improvements r=yuzefovich a=yuzefovich Fixes: #142797. Fixes: #142801. Fixes: #142802. Fixes: #142804. Fixes: #142805. Co-authored-by: Steven Danna <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
4 parents 529ffb8 + abc2b62 + 0418e87 + 0a475f8 commit 448eea6

40 files changed

+351
-203
lines changed

pkg/ccl/multiregionccl/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ go_test(
105105
"//pkg/testutils/skip",
106106
"//pkg/testutils/sqlutils",
107107
"//pkg/testutils/testcluster",
108-
"//pkg/util",
109108
"//pkg/util/ctxgroup",
110109
"//pkg/util/encoding",
111110
"//pkg/util/leaktest",

pkg/ccl/multiregionccl/cold_start_latency_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/cockroachdb/cockroach/pkg/rpc"
2121
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
2222
"github.com/cockroachdb/cockroach/pkg/server"
23-
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2423
"github.com/cockroachdb/cockroach/pkg/spanconfig"
2524
"github.com/cockroachdb/cockroach/pkg/testutils"
2625
"github.com/cockroachdb/cockroach/pkg/testutils/pgurlutils"
@@ -117,13 +116,11 @@ func TestColdStartLatency(t *testing.T) {
117116
args.Knobs.Server = serverKnobs
118117
perServerArgs[i] = args
119118
}
120-
cs := cluster.MakeTestingClusterSettings()
121119
tc := testcluster.NewTestCluster(t, numNodes, base.TestClusterArgs{
122120
ParallelStart: true,
123121
ServerArgsPerNode: perServerArgs,
124122
ServerArgs: base.TestServerArgs{
125-
DefaultTestTenant: base.TODOTestTenantDisabled,
126-
Settings: cs,
123+
DefaultTestTenant: base.TestControlsTenantsExplicitly,
127124
},
128125
})
129126
go func() {
@@ -261,7 +258,6 @@ COMMIT;`}
261258
const password = "asdf"
262259
{
263260
tenant, tenantDB := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{
264-
Settings: cs,
265261
TenantID: serverutils.TestTenantID(),
266262
TestingKnobs: base.TestingKnobs{
267263
Server: tenantServerKnobs(0),
@@ -310,7 +306,6 @@ COMMIT;`}
310306
start := timeutil.Now()
311307
sn := tenantServerKnobs(i)
312308
tenant, err := tc.Server(i).TenantController().StartTenant(ctx, base.TestTenantArgs{
313-
Settings: cs,
314309
TenantID: serverutils.TestTenantID(),
315310
DisableCreateTenant: true,
316311
SkipTenantCheck: true,

pkg/ccl/multiregionccl/datadriven_test.go

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -208,20 +208,12 @@ func testMultiRegionDataDriven(t *testing.T, testPath string) {
208208
tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
209209
ServerArgsPerNode: serverArgs,
210210
ServerArgs: base.TestServerArgs{
211-
// We need to disable the default test tenant here
212-
// because it appears as though operations like
213-
// "wait-for-zone-config-changes" only work correctly
214-
// when called from the system tenant. More
215-
// investigation is required (tracked with #76378).
216-
DefaultTestTenant: base.TODOTestTenantDisabled,
211+
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(156305),
217212
},
218213
})
219214
ds.tc = tc
220215

221-
sqlConn, err := ds.getSQLConn(0)
222-
if err != nil {
223-
return err.Error()
224-
}
216+
sysConn := tc.SystemLayer(0).SQLConn(t)
225217
// Speed up closing of timestamps, in order to sleep less below before
226218
// we can use follower_read_timestamp(). follower_read_timestamp() uses
227219
// sum of the following settings. Also, disable all kvserver lease
@@ -237,7 +229,7 @@ SET CLUSTER SETTING kv.allocator.load_based_lease_rebalancing.enabled = false;
237229
SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '5m'
238230
`,
239231
";") {
240-
_, err = sqlConn.Exec(stmt)
232+
_, err := sysConn.Exec(stmt)
241233
if err != nil {
242234
return err.Error()
243235
}
@@ -341,8 +333,9 @@ SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '5m'
341333
if err != nil {
342334
return err.Error()
343335
}
344-
cache := ds.tc.Server(idx).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
345-
tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID))
336+
s := ds.tc.ApplicationLayer(idx)
337+
cache := s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
338+
tablePrefix := keys.MustAddr(s.Codec().TablePrefix(tableID))
346339
entry, err := cache.TestingGetCached(ctx, tablePrefix, false, roachpb.LAG_BY_CLUSTER_SETTING)
347340
if err != nil {
348341
return err.Error()
@@ -738,22 +731,22 @@ func (r *replicaPlacement) satisfiesExpectedPlacement(expected *replicaPlacement
738731
continue
739732
} else {
740733
return errors.Newf(
741-
"expected node %s to not be present but had replica type %s",
734+
"expected node %d to not be present but had replica type %s",
742735
node,
743736
actualRt.String())
744737
}
745738
}
746739

747740
if !found {
748741
return errors.Newf(
749-
"expected node %s to have replica type %s but was not found",
742+
"expected node %d to have replica type %s but was not found",
750743
node,
751744
expectedRt.String())
752745
}
753746

754747
if expectedRt != actualRt {
755748
return errors.Newf(
756-
"expected node %s to have replica type %s but was %s",
749+
"expected node %d to have replica type %s but was %s",
757750
node,
758751
expectedRt.String(),
759752
actualRt.String())
@@ -793,15 +786,16 @@ func getRangeKeyForInput(
793786
var db string
794787
d.ScanArgs(t, dbName, &db)
795788

796-
execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)
789+
s := tc.ApplicationLayer(0)
790+
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
797791

798792
tableDesc, err := lookupTable(&execCfg, db, tbName)
799793
if err != nil {
800794
return nil, err
801795
}
802796

803797
if !d.HasArg(partitionName) {
804-
return tableDesc.TableSpan(keys.SystemSQLCodec).Key, nil
798+
return tableDesc.TableSpan(s.Codec()).Key, nil
805799
}
806800

807801
var partition string
@@ -831,7 +825,7 @@ func getRangeKeyForInput(
831825

832826
_, keyPrefix, err := rowenc.DecodePartitionTuple(
833827
&tree.DatumAlloc{},
834-
keys.SystemSQLCodec,
828+
s.Codec(),
835829
tableDesc,
836830
primaryInd,
837831
part,

pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ func TestingCreateMultiRegionClusterWithRegionList(
135135
// is used to create the MR cluster for all test cases. For
136136
// bonus points, the code to re-enable this should also provide more
137137
// flexibility in disabling the default test tenant by callers of this
138-
// function. Re-enablement is tracked with #76378.
139-
DefaultTestTenant: base.TODOTestTenantDisabled,
138+
// function.
139+
DefaultTestTenant: base.TestDoesNotWorkWithSecondaryTenantsButWeDontKnowWhyYet(156308),
140140
},
141141
})
142142

pkg/ccl/multiregionccl/region_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ import (
2828
"github.com/stretchr/testify/require"
2929
)
3030

31-
// startTestCluster starts a 3 node cluster.
32-
//
33-
// Note, if a testfeed depends on particular testing knobs, those may
34-
// need to be applied to each of the servers in the test cluster
35-
// returned from this function.
3631
func TestMultiRegionDatabaseStats(t *testing.T) {
3732
defer leaktest.AfterTest(t)()
3833
defer log.Scope(t).Close(t)

pkg/ccl/multiregionccl/regional_by_row_test.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
3535
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
3636
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
37-
"github.com/cockroachdb/cockroach/pkg/util"
3837
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3938
"github.com/cockroachdb/cockroach/pkg/util/log"
4039
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -173,16 +172,11 @@ func TestAlterTableLocalityRegionalByRowCorrectZoneConfigBeforeBackfill(t *testi
173172
func TestAlterTableLocalityRegionalByRowError(t *testing.T) {
174173
defer leaktest.AfterTest(t)()
175174
defer log.Scope(t).Close(t)
176-
skip.UnderRace(t, "takes >400s under race")
177-
skip.UnderDeadlock(t, "skipping as per issue #146428")
175+
176+
skip.UnderDuress(t, "too slow")
178177

179178
var chunkSize int64 = 100
180179
var maxValue = 4000
181-
if util.RaceEnabled {
182-
// Race builds are a lot slower, so use a smaller number of rows.
183-
maxValue = 200
184-
chunkSize = 5
185-
}
186180
// BulkInsertIntoTable adds testCase 0 to maxValue inclusive, so
187181
// we round (maxValue + 1) / chunkSize to the nearest int.
188182
// To round up x / y using integers, we do (x + y - 1) / y.
@@ -350,11 +344,6 @@ func TestAlterTableLocalityRegionalByRowError(t *testing.T) {
350344
params.Locality.Tiers = []roachpb.Tier{
351345
{Key: "region", Value: "ajstorm-1"},
352346
}
353-
// Need to disable the test tenant here because
354-
// when running inside a tenant, for some reason
355-
// this test doesn't error when expected. More
356-
// investigation is required. Tracked with #76378.
357-
params.DefaultTestTenant = base.TODOTestTenantDisabled
358347
var sqlDB *gosql.DB
359348
params.Knobs = base.TestingKnobs{
360349
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
@@ -378,11 +367,12 @@ func TestAlterTableLocalityRegionalByRowError(t *testing.T) {
378367
// Decrease the adopt loop interval so that retries happen quickly.
379368
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
380369
}
381-
var s serverutils.TestServerInterface
370+
var srv serverutils.TestServerInterface
382371
var kvDB *kv.DB
383-
s, sqlDB, kvDB = serverutils.StartServer(t, params)
372+
srv, sqlDB, kvDB = serverutils.StartServer(t, params)
384373
db = sqlDB
385-
defer s.Stopper().Stop(ctx)
374+
defer srv.Stopper().Stop(ctx)
375+
s := srv.ApplicationLayer()
386376

387377
// Disable strict GC TTL enforcement because we're going to shove a zero-value
388378
// TTL into the system with AddImmediateGCZoneConfig.
@@ -437,7 +427,7 @@ USE t;
437427
// that the job did not succeed even though it was canceled.
438428
testutils.SucceedsSoon(t, func() error {
439429
tableDesc := desctestutils.TestingGetPublicTableDescriptor(
440-
kvDB, keys.SystemSQLCodec, "t", "test",
430+
kvDB, s.Codec(), "t", "test",
441431
)
442432
if len(tableDesc.AllMutations()) != 0 {
443433
return errors.Errorf(
@@ -493,13 +483,13 @@ USE t;
493483
return nil
494484
})
495485

496-
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test")
486+
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "t", "test")
497487
if _, err := sqltestutils.AddImmediateGCZoneConfig(db, tableDesc.GetID()); err != nil {
498488
t.Fatal(err)
499489
}
500490
// Ensure that the writes from the partial new indexes are cleaned up.
501491
testutils.SucceedsSoon(t, func() error {
502-
return sqltestutils.CheckTableKeyCount(ctx, kvDB, keys.SystemSQLCodec, 1, maxValue)
492+
return sqltestutils.CheckTableKeyCount(ctx, kvDB, s.Codec(), 1, maxValue)
503493
})
504494
})
505495
}

pkg/ccl/multiregionccl/unique_test.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"testing"
1111

1212
"github.com/cockroachdb/cockroach/pkg/base"
13-
"github.com/cockroachdb/cockroach/pkg/keys"
1413
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1514
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
1615
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
@@ -28,10 +27,10 @@ import (
2827
func TestValidateUniqueConstraints(t *testing.T) {
2928
defer leaktest.AfterTest(t)()
3029
defer log.Scope(t).Close(t)
31-
// This test fails when run within a tenant. More investigation is
32-
// required. Tracked with #76378.
33-
s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{DefaultTestTenant: base.TODOTestTenantDisabled})
34-
defer s.Stopper().Stop(context.Background())
30+
31+
srv, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
32+
defer srv.Stopper().Stop(context.Background())
33+
s := srv.ApplicationLayer()
3534
r := sqlutils.MakeSQLRunner(db)
3635

3736
// Create two tables and a view.
@@ -57,7 +56,7 @@ CREATE TABLE u (
5756
insertValuesT := func(values []tree.Datum) {
5857
// Get the table descriptor and primary index of t.
5958
tableDesc := desctestutils.TestingGetTableDescriptor(
60-
kvDB, keys.SystemSQLCodec, "test", "public", "t",
59+
kvDB, s.Codec(), "test", "public", "t",
6160
)
6261
primaryIndex := tableDesc.GetPrimaryIndex()
6362

@@ -67,7 +66,7 @@ CREATE TABLE u (
6766

6867
// Construct the primary index entry to insert.
6968
primaryIndexEntry, err := rowenc.EncodePrimaryIndex(
70-
keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true, /* includeEmpty */
69+
s.Codec(), tableDesc, primaryIndex, colIDtoRowIndex, values, true, /* includeEmpty */
7170
)
7271
require.NoError(t, err)
7372
require.Equal(t, 1, len(primaryIndexEntry))
@@ -82,7 +81,7 @@ CREATE TABLE u (
8281
insertValuesU := func(values []tree.Datum) {
8382
// Get the table descriptor and indexes of u.
8483
tableDesc := desctestutils.TestingGetTableDescriptor(
85-
kvDB, keys.SystemSQLCodec, "test", "public", "u",
84+
kvDB, s.Codec(), "test", "public", "u",
8685
)
8786
primaryIndex := tableDesc.GetPrimaryIndex()
8887
secondaryIndex := tableDesc.PublicNonPrimaryIndexes()[0]
@@ -94,12 +93,12 @@ CREATE TABLE u (
9493

9594
// Construct the primary and secondary index entries to insert.
9695
primaryIndexEntry, err := rowenc.EncodePrimaryIndex(
97-
keys.SystemSQLCodec, tableDesc, primaryIndex, colIDtoRowIndex, values, true, /* includeEmpty */
96+
s.Codec(), tableDesc, primaryIndex, colIDtoRowIndex, values, true, /* includeEmpty */
9897
)
9998
require.NoError(t, err)
10099
require.Equal(t, 1, len(primaryIndexEntry))
101100
secondaryIndexEntry, err := rowenc.EncodeSecondaryIndex(
102-
context.Background(), keys.SystemSQLCodec, tableDesc, secondaryIndex,
101+
context.Background(), s.Codec(), tableDesc, secondaryIndex,
103102
colIDtoRowIndex, values, rowenc.EmptyVectorIndexEncodingHelper, true, /* includeEmpty */
104103
)
105104
require.NoError(t, err)

pkg/ccl/partitionccl/drop_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
4949
asyncNotification := make(chan struct{})
5050

5151
var params base.TestServerArgs
52-
params.DefaultTestTenant = base.TODOTestTenantDisabled
5352
params.Knobs = base.TestingKnobs{
5453
GCJob: &sql.GCJobTestingKnobs{
5554
RunBeforeResume: func(_ jobspb.JobID) error {
@@ -202,8 +201,7 @@ SELECT job_id
202201
ctx := context.Background()
203202
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
204203
ServerArgs: base.TestServerArgs{
205-
DefaultTestTenant: base.TODOTestTenantDisabled,
206-
Knobs: knobs,
204+
Knobs: knobs,
207205
},
208206
})
209207
defer tc.Stopper().Stop(ctx)
@@ -248,8 +246,7 @@ range_max_bytes = 654321000`)
248246
ctx := context.Background()
249247
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
250248
ServerArgs: base.TestServerArgs{
251-
DefaultTestTenant: base.TODOTestTenantDisabled,
252-
Knobs: knobs,
249+
Knobs: knobs,
253250
},
254251
})
255252
defer tc.Stopper().Stop(ctx)

pkg/ccl/partitionccl/partition_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,10 @@ func TestRemovePartitioningExpiredLicense(t *testing.T) {
3131
defer utilccl.TestingEnableEnterprise()()
3232

3333
ctx := context.Background()
34-
s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{
35-
UseDatabase: "d",
36-
DefaultTestTenant: base.TODOTestTenantDisabled,
34+
srv, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{
35+
UseDatabase: "d",
3736
})
38-
defer s.Stopper().Stop(ctx)
37+
defer srv.Stopper().Stop(ctx)
3938

4039
// Create a partitioned table and index.
4140
sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw)

pkg/ccl/workloadccl/allccl/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ go_test(
6060
"//pkg/testutils/skip",
6161
"//pkg/testutils/sqlutils",
6262
"//pkg/testutils/testcluster",
63-
"//pkg/util",
6463
"//pkg/util/bufalloc",
6564
"//pkg/util/leaktest",
6665
"//pkg/util/log",

0 commit comments

Comments
 (0)