From 792bfbbedd6d1234c62228411d88330050cc6b59 Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Tue, 13 Feb 2024 21:58:35 -0500 Subject: [PATCH] lock table for read/modify operations --- bttest/inmem.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/bttest/inmem.go b/bttest/inmem.go index fa7051b..2d860a2 100644 --- a/bttest/inmem.go +++ b/bttest/inmem.go @@ -455,7 +455,7 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo // Read all rows tbl.rows.Ascend(addRow) } - gcRules := tbl.gcRules() + gcRules := tbl.gcRulesNoLock() tbl.mu.RUnlock() rows := make([]*row, 0, len(rowSet)) @@ -883,12 +883,15 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) } fs := tbl.columnFamilies() + + tbl.mu.Lock() + defer tbl.mu.Unlock() r := tbl.mutableRow(string(req.RowKey)) if err := applyMutations(tbl, r, req.Mutations, fs); err != nil { return nil, err } // JIT per-row GC - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion for f, _ := range r.families { if _, ok := fs[f]; !ok { @@ -920,7 +923,8 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))} cfs := tbl.columnFamilies() - + tbl.mu.Lock() + defer tbl.mu.Unlock() for i, entry := range req.Entries { r := tbl.mutableRow(string(entry.RowKey)) code, msg := int32(codes.OK), "" @@ -932,7 +936,7 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu Index: int64(i), Status: &statpb.Status{Code: code, Message: msg}, } - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion; could be skipped if mutableRow doesn't return an existing row for f, _ := range r.families { if _, ok := cfs[f]; !ok { @@ -954,7 +958,8 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate res := &btpb.CheckAndMutateRowResponse{} cfs := tbl.columnFamilies() - + tbl.mu.Lock() + defer tbl.mu.Unlock() r := tbl.mutableRow(string(req.RowKey)) // Figure out which mutation to apply. @@ -982,7 +987,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate if err := applyMutations(tbl, r, muts, cfs); err != nil { return nil, err } - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion; could be skipped if mutableRow doesn't return an existing row for f, _ := range r.families { if _, ok := cfs[f]; !ok { @@ -1113,13 +1118,14 @@ func appendOrReplaceCell(cs []cell, newCell cell) []cell { func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWriteRowRequest) (*btpb.ReadModifyWriteRowResponse, error) { s.mu.Lock() tbl, ok := s.tables[req.TableName] - s.mu.Unlock() + defer s.mu.Unlock() if !ok { return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) } cfs := tbl.columnFamilies() - + tbl.mu.Lock() + defer tbl.mu.Unlock() rowKey := string(req.RowKey) r := tbl.mutableRow(rowKey) resultRow := newRow(rowKey) // copy of updated cells @@ -1177,7 +1183,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri resultFamily.cellsByColumn(col) // create the column resultFamily.Cells[col] = []cell{newCell} // overwrite the cells } - r.gc(tbl.gcRules()) + r.gc(tbl.gcRulesNoLock()) // JIT family deletion; could be skipped if mutableRow doesn't return an existing row for f, _ := range r.families { if _, ok := cfs[f]; !ok { @@ -1318,7 +1324,10 @@ func (t *table) gcRules() map[string]*btapb.GcRule { // This method doesn't add or remove rows, so we only need a read lock for the table. t.mu.RLock() defer t.mu.RUnlock() + return t.gcRulesNoLock() +} +func (t *table) gcRulesNoLock() map[string]*btapb.GcRule { // Gather GC rules we'll apply. rules := make(map[string]*btapb.GcRule) // keyed by "fam" for fam, cf := range t.families {