Skip to content

Commit

Permalink
Merge pull request #356 from sylwiaszunejko/metadata-refactor
Browse files Browse the repository at this point in the history
Make `schemaDescriber` solely owner of tablets metadata
  • Loading branch information
dkropachev authored Nov 29, 2024
2 parents 5be33d8 + 6b25456 commit 3e0a01a
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 348 deletions.
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
tablet.keyspaceName = qry.routingInfo.keyspace
tablet.tableName = qry.routingInfo.table

c.session.addTablet(&tablet)
c.session.metadataDescriber.addTablet(&tablet)
}
}

Expand Down
14 changes: 7 additions & 7 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,32 +110,32 @@ func (s *Session) handleSchemaEvent(frames []frame) {
for _, frame := range frames {
switch f := frame.(type) {
case *schemaChangeKeyspace:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
s.handleKeyspaceChange(f.keyspace, f.change)
case *schemaChangeTable:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
s.handleTableChange(f.keyspace, f.object, f.change)
case *schemaChangeAggregate:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
case *schemaChangeFunction:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
case *schemaChangeType:
s.schemaDescriber.clearSchema(f.keyspace)
s.metadataDescriber.clearSchema(f.keyspace)
}
}
}

func (s *Session) handleKeyspaceChange(keyspace, change string) {
s.control.awaitSchemaAgreement()
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithKeyspace(keyspace)
s.metadataDescriber.removeTabletsWithKeyspace(keyspace)
}
s.policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace, Change: change})
}

func (s *Session) handleTableChange(keyspace, table, change string) {
if change == "DROPPED" || change == "UPDATED" {
s.removeTabletsWithTable(keyspace, table)
s.metadataDescriber.removeTabletsWithTable(keyspace, table)
}
}

Expand Down
224 changes: 1 addition & 223 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,180 +504,6 @@ func (h *HostInfo) ScyllaShardAwarePortTLS() uint16 {
return h.scyllaShardAwarePortTLS
}

type ReplicaInfo struct {
hostId UUID
shardId int
}

type TabletInfo struct {
keyspaceName string
tableName string
firstToken int64
lastToken int64
replicas []ReplicaInfo
}

func (t *TabletInfo) KeyspaceName() string {
return t.keyspaceName
}

func (t *TabletInfo) FirstToken() int64 {
return t.firstToken
}

func (t *TabletInfo) LastToken() int64 {
return t.lastToken
}

func (t *TabletInfo) TableName() string {
return t.tableName
}

func (t *TabletInfo) Replicas() []ReplicaInfo {
return t.replicas
}

type TabletInfoList []*TabletInfo

// Search for place in tablets table with specific Keyspace and Table name
func (t TabletInfoList) findTablets(keyspace string, table string) (int, int) {
l := -1
r := -1
for i, tablet := range t {
if tablet.KeyspaceName() == keyspace && tablet.TableName() == table {
if l == -1 {
l = i
}
r = i
} else if l != -1 {
break
}
}

return l, r
}

func (t TabletInfoList) addTabletToTabletsList(tablet *TabletInfo) TabletInfoList {
l, r := t.findTablets(tablet.keyspaceName, tablet.tableName)
if l == -1 && r == -1 {
l = 0
r = 0
} else {
r = r + 1
}

l1, r1 := l, r
l2, r2 := l1, r1

// find first overlaping range
for l1 < r1 {
mid := (l1 + r1) / 2
if t[mid].FirstToken() < tablet.FirstToken() {
l1 = mid + 1
} else {
r1 = mid
}
}
start := l1

if start > l && t[start-1].LastToken() > tablet.FirstToken() {
start = start - 1
}

// find last overlaping range
for l2 < r2 {
mid := (l2 + r2) / 2
if t[mid].LastToken() < tablet.LastToken() {
l2 = mid + 1
} else {
r2 = mid
}
}
end := l2
if end < r && t[end].FirstToken() >= tablet.LastToken() {
end = end - 1
}
if end == len(t) {
end = end - 1
}

updated_tablets := t
if start <= end {
// Delete elements from index start to end
updated_tablets = append(t[:start], t[end+1:]...)
}
// Insert tablet element at index start
t = append(updated_tablets[:start], append([]*TabletInfo{tablet}, updated_tablets[start:]...)...)
return t
}

// Remove all tablets that have given host as a replica
func (t TabletInfoList) removeTabletsWithHostFromTabletsList(host *HostInfo) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t)) // Preallocate for efficiency

for _, tablet := range t {
// Check if any replica matches the given host ID
shouldExclude := false
for _, replica := range tablet.replicas {
if replica.hostId.String() == host.HostID() {
shouldExclude = true
break
}
}
if !shouldExclude {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithKeyspaceFromTabletsList(keyspace string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if tablet.keyspaceName != keyspace {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

func (t TabletInfoList) removeTabletsWithTableFromTabletsList(keyspace string, table string) TabletInfoList {
filteredTablets := make([]*TabletInfo, 0, len(t))

for _, tablet := range t {
if !(tablet.keyspaceName == keyspace && tablet.tableName == table) {
filteredTablets = append(filteredTablets, tablet)
}
}

t = filteredTablets
return t
}

// Search for place in tablets table for token starting from index l to index r
func (t TabletInfoList) findTabletForToken(token Token, l int, r int) *TabletInfo {
for l < r {
var m int
if r*l > 0 {
m = l + (r-l)/2
} else {
m = (r + l) / 2
}
if int64Token(t[m].LastToken()).Less(token) {
l = m + 1
} else {
r = m
}
}

return t[l]
}

// Polls system.peers at a specific interval to find new hosts
type ringDescriber struct {
session *Session
Expand Down Expand Up @@ -1051,7 +877,7 @@ func refreshRing(r *ringDescriber) error {
}

for _, host := range prevHosts {
r.session.removeTabletsWithHost(host)
r.session.metadataDescriber.removeTabletsWithHost(host)
r.session.removeHost(host)
}

Expand All @@ -1061,54 +887,6 @@ func refreshRing(r *ringDescriber) error {
return nil
}

func (s *Session) addTablet(tablet *TabletInfo) error {
tablets := s.getTablets()
tablets = tablets.addTabletToTabletsList(tablet)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithHost(host *HostInfo) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithHostFromTabletsList(host)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithKeyspace(keyspace string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithKeyspaceFromTabletsList(keyspace)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

func (s *Session) removeTabletsWithTable(keyspace string, table string) error {
tablets := s.getTablets()
tablets = tablets.removeTabletsWithTableFromTabletsList(keyspace, table)

s.ring.setTablets(tablets)
s.policy.SetTablets(tablets)

s.schemaDescriber.refreshTabletsSchema()

return nil
}

const (
ringRefreshDebounceTime = 1 * time.Second
)
Expand Down
Loading

0 comments on commit 3e0a01a

Please sign in to comment.