Skip to content

Commit

Permalink
lock table for read/modify operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Feb 14, 2024
1 parent 13f985b commit 967f764
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo
// Read all rows
tbl.rows.Ascend(addRow)
}
gcRules := tbl.gcRules()
gcRules := tbl.gcRulesNoLock()
tbl.mu.RUnlock()

rows := make([]*row, 0, len(rowSet))
Expand Down Expand Up @@ -883,12 +883,15 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt
return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
}
fs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
r := tbl.mutableRow(string(req.RowKey))
if err := applyMutations(tbl, r, req.Mutations, fs); err != nil {
return nil, err
}
// JIT per-row GC
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion
for f, _ := range r.families {
if _, ok := fs[f]; !ok {
Expand Down Expand Up @@ -920,7 +923,8 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
for i, entry := range req.Entries {
r := tbl.mutableRow(string(entry.RowKey))
code, msg := int32(codes.OK), ""
Expand All @@ -932,7 +936,7 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
Index: int64(i),
Status: &statpb.Status{Code: code, Message: msg},
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand All @@ -954,7 +958,8 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
res := &btpb.CheckAndMutateRowResponse{}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
r := tbl.mutableRow(string(req.RowKey))

// Figure out which mutation to apply.
Expand Down Expand Up @@ -982,7 +987,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
if err := applyMutations(tbl, r, muts, cfs); err != nil {
return nil, err
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand Down Expand Up @@ -1119,7 +1124,8 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
}

cfs := tbl.columnFamilies()

tbl.mu.Lock()
defer tbl.mu.Unlock()
rowKey := string(req.RowKey)
r := tbl.mutableRow(rowKey)
resultRow := newRow(rowKey) // copy of updated cells
Expand Down Expand Up @@ -1177,7 +1183,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
resultFamily.cellsByColumn(col) // create the column
resultFamily.Cells[col] = []cell{newCell} // overwrite the cells
}
r.gc(tbl.gcRules())
r.gc(tbl.gcRulesNoLock())
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
for f, _ := range r.families {
if _, ok := cfs[f]; !ok {
Expand Down Expand Up @@ -1318,7 +1324,10 @@ func (t *table) gcRules() map[string]*btapb.GcRule {
// This method doesn't add or remove rows, so we only need a read lock for the table.
t.mu.RLock()
defer t.mu.RUnlock()
return t.gcRulesNoLock()
}

func (t *table) gcRulesNoLock() map[string]*btapb.GcRule {
// Gather GC rules we'll apply.
rules := make(map[string]*btapb.GcRule) // keyed by "fam"
for fam, cf := range t.families {
Expand Down

0 comments on commit 967f764

Please sign in to comment.