Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aether Scaling #64

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions drsm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ type DrsmInterface interface {
DeletePod(string)
}

// func InitDRSM(sharedPoolName string, myid PodId, db DbInfo, opt *Options, punchLivenessTime int) (DrsmInterface, error) {
func InitDRSM(sharedPoolName string, myid PodId, db DbInfo, opt *Options) (DrsmInterface, error) {
logger.DrsmLog.Debugln("client id:", myid)

d := &Drsm{sharedPoolName: sharedPoolName,
clientId: myid,
db: db,
mode: ResourceClient}

// if punchLivenessTime == 0 {
d.punchLivenessTime = 5000
// } else {
// d.punchLivenessTime = punchLivenessTime
// }
d.ConstuctDrsm(opt)

return d, nil
Expand Down Expand Up @@ -110,8 +115,8 @@ func (d *Drsm) ReleaseInt32ID(id int32) error {
}

func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) {
d.globalChunkTblMutex.Lock()
defer d.globalChunkTblMutex.Unlock()
d.globalChunkTblMutex.RLock()
defer d.globalChunkTblMutex.RUnlock()
chunkId := id >> 10
chunk, found := d.globalChunkTbl[chunkId]
if found {
Expand Down
4 changes: 2 additions & 2 deletions drsm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
for {
for {
cn = rand.Int31n(d.chunkIdRange)
d.globalChunkTblMutex.Lock()
d.globalChunkTblMutex.RLock()
_, found := d.globalChunkTbl[cn]
d.globalChunkTblMutex.Unlock()
d.globalChunkTblMutex.RUnlock()
if found {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions drsm/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ func (d *Drsm) podDownDetected() {
// Given Pod find out current Chunks owned by this POD
pd := d.podMap[p]
for k := range pd.podChunks {
d.globalChunkTblMutex.Lock()
d.globalChunkTblMutex.RLock()
c, found := d.globalChunkTbl[k]
d.globalChunkTblMutex.Unlock()
d.globalChunkTblMutex.RUnlock()
logger.DrsmLog.Debugf("found: %v chunk: %v", found, c)
if found {
go c.claimChunk(d)
Expand Down
9 changes: 7 additions & 2 deletions drsm/drsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Drsm struct {
ipModule ipam.Ipamer
prefix map[string]*ipam.Prefix
mongo *MongoDBLibrary.MongoClient
globalChunkTblMutex sync.Mutex
globalChunkTblMutex sync.RWMutex
punchLivenessTime int
}

func (d *Drsm) DeletePod(podInstance string) {
Expand All @@ -84,13 +85,17 @@ func (d *Drsm) ConstuctDrsm(opt *Options) {
d.podMap = make(map[string]*podData)
d.podDown = make(chan string, 10)
d.scanChunks = make(map[int32]*chunk)
d.globalChunkTblMutex = sync.Mutex{}
d.globalChunkTblMutex = sync.RWMutex{}
d.initIpam(opt)

//connect to DB
d.mongo, _ = MongoDBLibrary.NewMongoClient(d.db.Url, d.db.Name)
logger.DrsmLog.Debugln("mongoClient is created", d.db.Name)
_, err := d.mongo.CreateIndex(d.sharedPoolName, "_id")

if err != nil {
logger.AppLog.Infof("Failed to create id index: %v", err)
}
go d.handleDbUpdates()
go d.punchLiveness()
go d.podDownDetected()
Expand Down
7 changes: 5 additions & 2 deletions drsm/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
// looks like chunk owner getting change
owner := s.Update.UpdFields.PodId
c := getChunIdFromDocId(s.DId.Id)
d.globalChunkTblMutex.Lock()
d.globalChunkTblMutex.RLock()
cp := d.globalChunkTbl[c]
d.globalChunkTblMutex.Unlock()
d.globalChunkTblMutex.RUnlock()
// TODO update IP address as well.
cp.Owner.PodName = owner
cp.Owner.PodIp = s.Update.UpdFields.PodIp
Expand All @@ -184,6 +184,8 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan

func (d *Drsm) punchLiveness() {
// write to DB - signature every 5 second
// or make it parameterized
// ticker := time.NewTicker(time.Duration(d.punchLivenessTime) * time.Millisecond)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

Expand All @@ -199,6 +201,7 @@ func (d *Drsm) punchLiveness() {
//logger.DrsmLog.Debugln("update keepalive time")
filter := bson.M{"_id": d.clientId.PodName}

// timein := time.Now().Local().Add(time.Millisecond * time.Duration(d.punchLivenessTime+500))
timein := time.Now().Local().Add(20 * time.Second)

update := bson.D{
Expand Down