diff --git a/bttest/inmem.go b/bttest/inmem.go index fa7051b..d3ad160 100644 --- a/bttest/inmem.go +++ b/bttest/inmem.go @@ -455,7 +455,7 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo // Read all rows tbl.rows.Ascend(addRow) } - gcRules := tbl.gcRules() + gcRules := tbl.gcRulesNoLock() tbl.mu.RUnlock() rows := make([]*row, 0, len(rowSet)) @@ -883,12 +883,15 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) } fs := tbl.columnFamilies() + + tbl.mu.Lock() + defer tbl.mu.Unlock() r := tbl.mutableRow(string(req.RowKey)) if err := applyMutations(tbl, r, req.Mutations, fs); err != nil { return nil, err } // JIT per-row GC - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion for f, _ := range r.families { if _, ok := fs[f]; !ok { @@ -920,7 +923,8 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))} cfs := tbl.columnFamilies() - + tbl.mu.Lock() + defer tbl.mu.Unlock() for i, entry := range req.Entries { r := tbl.mutableRow(string(entry.RowKey)) code, msg := int32(codes.OK), "" @@ -932,7 +936,7 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu Index: int64(i), Status: &statpb.Status{Code: code, Message: msg}, } - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion; could be skipped if mutableRow doesn't return an existing row for f, _ := range r.families { if _, ok := cfs[f]; !ok { @@ -954,7 +958,8 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate res := &btpb.CheckAndMutateRowResponse{} cfs := tbl.columnFamilies() - + tbl.mu.Lock() + defer tbl.mu.Unlock() r := tbl.mutableRow(string(req.RowKey)) // Figure out which mutation to apply. @@ -982,7 +987,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate if err := applyMutations(tbl, r, muts, cfs); err != nil { return nil, err } - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion; could be skipped if mutableRow doesn't return an existing row for f, _ := range r.families { if _, ok := cfs[f]; !ok { @@ -1119,7 +1124,8 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri } cfs := tbl.columnFamilies() - + tbl.mu.Lock() + defer tbl.mu.Unlock() rowKey := string(req.RowKey) r := tbl.mutableRow(rowKey) resultRow := newRow(rowKey) // copy of updated cells @@ -1177,7 +1183,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri resultFamily.cellsByColumn(col) // create the column resultFamily.Cells[col] = []cell{newCell} // overwrite the cells } - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion; could be skipped if mutableRow doesn't return an existing row for f, _ := range r.families { if _, ok := cfs[f]; !ok { @@ -1318,7 +1324,10 @@ func (t *table) gcRules() map[string]*btapb.GcRule { // This method doesn't add or remove rows, so we only need a read lock for the table. t.mu.RLock() defer t.mu.RUnlock() + return t.gcRulesNoLock() +} +func (t *table) gcRulesNoLock() map[string]*btapb.GcRule { // Gather GC rules we'll apply. rules := make(map[string]*btapb.GcRule) // keyed by "fam" for fam, cf := range t.families { diff --git a/bttest/inmem_test.go b/bttest/inmem_test.go index a2ad436..683a858 100644 --- a/bttest/inmem_test.go +++ b/bttest/inmem_test.go @@ -173,6 +173,104 @@ func TestConcurrentMutationsReadModifyAndGC(t *testing.T) { } } +func TestConcurrentMutations(t *testing.T) { + // 50 concurrent mutations of different cells on the same row + // expect all 50 values after + s := newTestServer(t) + ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) + defer cancel() + if _, err := s.CreateTable( + ctx, + &btapb.CreateTableRequest{Parent: "c", TableId: "t"}); err != nil { + t.Fatal(err) + } + const name = `c/tables/t` + req := &btapb.ModifyColumnFamiliesRequest{ + Name: name, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: "cf", + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, + }}, + } + _, err := s.ModifyColumnFamilies(ctx, req) + if err != nil { + t.Fatal(err) + } + var wg sync.WaitGroup + ms := func(i int) []*btpb.Mutation { + return []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ + FamilyName: "cf", + ColumnQualifier: []byte(fmt.Sprintf("%d", i)), + Value: []byte(fmt.Sprintf("%d", i)), + TimestampMicros: 1000, + }}, + }} + } + + rowKey := []byte("rowkey") + start := make(chan bool) + for i := 0; i < 50; i++ { + i := i + wg.Add(1) + go func(i int) { + defer wg.Done() + <-start + for ctx.Err() == nil { + req := &btpb.MutateRowRequest{ + TableName: name, + RowKey: rowKey, + Mutations: ms(i), + } + if _, err := s.MutateRow(ctx, req); err != nil { + panic(err) // can't use t.Fatal in goroutine + } + } + }(i) + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + close(start) + select { + case <-done: + case <-time.After(1 * time.Second): + t.Error("Concurrent mutations haven't completed after 1s") + } + + // verify + mock := &MockReadRowsServer{} + rreq := &btpb.ReadRowsRequest{TableName: name} + if err = s.ReadRows(rreq, mock); err != nil { + t.Fatalf("ReadRows error: %v", err) + } + if len(mock.responses) != 1 { + t.Fatal("Response count: got 0, want 1") + } + if len(mock.responses[0].Chunks) != 50 { + t.Errorf("Chunk count: got %d, want 50", len(mock.responses[0].Chunks)) + } + + var gotChunks []*btpb.ReadRowsResponse_CellChunk + for _, res := range mock.responses { + gotChunks = append(gotChunks, res.Chunks...) + } + var seen []string + for i, c := range gotChunks { + if !bytes.Equal(c.RowKey, rowKey) { + t.Fatalf("expected row %q got %q", c.RowKey, rowKey) + } + if !bytes.Equal(c.Qualifier.Value, c.Value) { + t.Fatalf("[%d] expected equal got %q %q", i, c.Qualifier.Value, c.Value) + } + seen = append(seen, string(c.Qualifier.Value)) + } + sort.Strings(seen) + t.Logf("seen %#v", seen) +} + func TestCreateTableResponse(t *testing.T) { // We need to ensure that invoking CreateTable returns // the ColumnFamilies as well as Granularity.