Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: fix concurrent same-row mutations #11

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo
// Read all rows
tbl.rows.Ascend(addRow)
}
gcRules := tbl.gcRules()
gcRules := tbl.gcRulesNoLock()
tbl.mu.RUnlock()

rows := make([]*row, 0, len(rowSet))
Expand Down Expand Up @@ -883,12 +883,15 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt
return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
}
fs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
r := tbl.mutableRow(string(req.RowKey))
if err := applyMutations(tbl, r, req.Mutations, fs); err != nil {
return nil, err
}
// JIT per-row GC
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion
for f, _ := range r.families {
if _, ok := fs[f]; !ok {
Expand Down Expand Up @@ -920,7 +923,8 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
for i, entry := range req.Entries {
r := tbl.mutableRow(string(entry.RowKey))
code, msg := int32(codes.OK), ""
Expand All @@ -932,7 +936,7 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
Index: int64(i),
Status: &statpb.Status{Code: code, Message: msg},
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand All @@ -954,7 +958,8 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
res := &btpb.CheckAndMutateRowResponse{}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
r := tbl.mutableRow(string(req.RowKey))

// Figure out which mutation to apply.
Expand Down Expand Up @@ -982,7 +987,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
if err := applyMutations(tbl, r, muts, cfs); err != nil {
return nil, err
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand Down Expand Up @@ -1119,7 +1124,8 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
rowKey := string(req.RowKey)
r := tbl.mutableRow(rowKey)
resultRow := newRow(rowKey) // copy of updated cells
Expand Down Expand Up @@ -1177,7 +1183,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
resultFamily.cellsByColumn(col) // create the column
resultFamily.Cells[col] = []cell{newCell} // overwrite the cells
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand Down Expand Up @@ -1318,7 +1324,10 @@ func (t *table) gcRules() map[string]*btapb.GcRule {
// This method doesn't add or remove rows, so we only need a read lock for the table.
t.mu.RLock()
defer t.mu.RUnlock()
return t.gcRulesNoLock()
}

func (t *table) gcRulesNoLock() map[string]*btapb.GcRule {
// Gather GC rules we'll apply.
rules := make(map[string]*btapb.GcRule) // keyed by "fam"
for fam, cf := range t.families {
Expand Down
98 changes: 98 additions & 0 deletions bttest/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,104 @@ func TestConcurrentMutationsReadModifyAndGC(t *testing.T) {
}
}

func TestConcurrentMutations(t *testing.T) {
// 50 concurrent mutations of different cells on the same row
// expect all 50 values after
s := newTestServer(t)
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
defer cancel()
if _, err := s.CreateTable(
ctx,
&btapb.CreateTableRequest{Parent: "c", TableId: "t"}); err != nil {
t.Fatal(err)
}
const name = `c/tables/t`
req := &btapb.ModifyColumnFamiliesRequest{
Name: name,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
}},
}
_, err := s.ModifyColumnFamilies(ctx, req)
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
ms := func(i int) []*btpb.Mutation {
return []*btpb.Mutation{{
Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
FamilyName: "cf",
ColumnQualifier: []byte(fmt.Sprintf("%d", i)),
Value: []byte(fmt.Sprintf("%d", i)),
TimestampMicros: 1000,
}},
}}
}

rowKey := []byte("rowkey")
start := make(chan bool)
for i := 0; i < 50; i++ {
i := i
wg.Add(1)
go func(i int) {
defer wg.Done()
<-start
for ctx.Err() == nil {
req := &btpb.MutateRowRequest{
TableName: name,
RowKey: rowKey,
Mutations: ms(i),
}
if _, err := s.MutateRow(ctx, req); err != nil {
panic(err) // can't use t.Fatal in goroutine
}
}
}(i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
close(start)
select {
case <-done:
case <-time.After(1 * time.Second):
t.Error("Concurrent mutations haven't completed after 1s")
}

// verify
mock := &MockReadRowsServer{}
rreq := &btpb.ReadRowsRequest{TableName: name}
if err = s.ReadRows(rreq, mock); err != nil {
t.Fatalf("ReadRows error: %v", err)
}
if len(mock.responses) != 1 {
t.Fatal("Response count: got 0, want 1")
}
if len(mock.responses[0].Chunks) != 50 {
t.Errorf("Chunk count: got %d, want 50", len(mock.responses[0].Chunks))
}

var gotChunks []*btpb.ReadRowsResponse_CellChunk
for _, res := range mock.responses {
gotChunks = append(gotChunks, res.Chunks...)
}
var seen []string
for i, c := range gotChunks {
if !bytes.Equal(c.RowKey, rowKey) {
t.Fatalf("expected row %q got %q", c.RowKey, rowKey)
}
if !bytes.Equal(c.Qualifier.Value, c.Value) {
t.Fatalf("[%d] expected equal got %q %q", i, c.Qualifier.Value, c.Value)
}
seen = append(seen, string(c.Qualifier.Value))
}
sort.Strings(seen)
t.Logf("seen %#v", seen)
}

func TestCreateTableResponse(t *testing.T) {
// We need to ensure that invoking CreateTable returns
// the ColumnFamilies as well as Granularity.
Expand Down
Loading