Skip to content

Commit

Permalink
Add methods
Browse files Browse the repository at this point in the history
Signed-off-by: pritamdas99 <[email protected]>
  • Loading branch information
pritamdas99 committed Sep 20, 2024
1 parent a6f6733 commit 7c54afa
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 0 deletions.
11 changes: 11 additions & 0 deletions solr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,14 @@ type BalanceReplica struct {
WaitForFinalState bool `json:"waitForFinalState,omitempty" yaml:"waitForFinalState,omitempty"`
Async string `json:"async,omitempty" yaml:"async,omitempty"`
}

type CoreList struct {
coreName string
collection string
}

type UpdateList struct {
target string
replica string
collection string
}
161 changes: 161 additions & 0 deletions solr/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,164 @@ func (sc *Client) CleanupAsync(async string) error {
wg.Wait()
return errr
}

func (sc *Client) Balance() error {
var errr error
async := "balance-replica"
err := sc.CleanupAsync(async)
if err != nil {
klog.Error(fmt.Sprintf("Failed to clean asyncid******************************* %v\n", async))
time.Sleep(30 * time.Second)
errr = err
} else {
klog.Info(fmt.Sprintf("Cleanup async successful for %v", async))
time.Sleep(10 * time.Second)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
resp, err := sc.BalanceReplica(async)
if err != nil {
klog.Error(fmt.Errorf("failed to do balance request. err %v", err))
errr = err
}
responseBody, err := sc.DecodeResponse(resp)
if err != nil {
klog.Error(fmt.Errorf("failed to decode response for async %s, err %v", async, err))
errr = err
}
_, err = sc.GetResponseStatus(responseBody)
if err != nil {
klog.Error(fmt.Errorf("failed to decode response for async %s, err %v", async, err))
errr = err
}

err = sc.CheckupStatus(async)
if err != nil {
klog.Error("Error while checking status************ ", err)
errr = err
}
}()
wg.Wait()
return errr
}

func (sc *Client) Run(lst []UpdateList) error {
var errr error
var wg sync.WaitGroup
for _, x := range lst {
target := x.target
replica := x.replica
collection := x.collection
async := fmt.Sprintf("%s-%s-%s", replica, collection, target)
err := sc.CleanupAsync(async)
if err != nil {
klog.Error(fmt.Sprintf("Failed to clean asyncid******************************* %v\n", async))
time.Sleep(30 * time.Second)
errr = err
} else {
klog.Info(fmt.Sprintf("Cleanup async successful for %v", async))
time.Sleep(10 * time.Second)
}
wg.Add(1)
go func() {
defer wg.Done()
resp, err := sc.MoveReplica(target, replica, collection, async)
if err != nil {
klog.Error(fmt.Errorf("failed to do request for target %s, replica %s, collection %s, err %v", target, replica, collection, err))
errr = err
}
responseBody, err := sc.DecodeResponse(resp)
if err != nil {
klog.Error(fmt.Errorf("failed to decode response for target %s, replica %s, collection %s, err %v", target, replica, collection, err))
errr = err
}
_, err = sc.GetResponseStatus(responseBody)
if err != nil {
klog.Error(fmt.Errorf("failed to decode response for target %s, replica %s, collection %s, err %v", target, replica, collection, err))
errr = err
}

err = sc.CheckupStatus(async)
if err != nil {
errr = err
klog.Error("Error while checking status************ ", err)
}
}()
}
wg.Wait()
return errr
}

func (sc *Client) Down(nodeList []string, x int, mp map[string][]CoreList) error {
n := len(nodeList)
ls2 := nodeList[n-x:]
ls1 := nodeList[:n-x]
fmt.Println("ls1 ", ls1)
fmt.Println("ls2 ", ls2)
ar := make([]UpdateList, 0)
for _, node := range ls2 {
for _, core := range mp[node] {
id := -1
mx := 1000000000
for j, l1 := range ls1 {
if len(mp[l1]) < mx {
mx = len(mp[l1])
id = j
}
}
ar = append(ar, UpdateList{
target: ls1[id],
replica: core.coreName,
collection: core.collection,
})
mp[ls1[id]] = append(mp[ls1[id]], core)
fmt.Println(core.coreName, core.collection, ls1[id])
}
}
err := sc.Run(ar)
return err
}
func (sc *Client) Up(nodeList []string, mp map[string][]CoreList) error {
for _, x := range nodeList {
if _, ok := mp[x]; !ok {
mp[x] = make([]CoreList, 0)
}
}
ar := make([]UpdateList, 0)
for {
mn := 10000000000
minNode := ""
mx := -1
maxNode := ""
for x, y := range mp {
n := len(y)
if mx < n {
mx = n
maxNode = x
}

if mn > n {
mn = n
minNode = x
}
}
if maxNode == minNode || mx-mn <= 1 {
break
}
target := minNode
core := mp[maxNode][0].coreName
collection := mp[maxNode][0].collection
mp[minNode] = append(mp[minNode], mp[maxNode][0])
mp[maxNode] = mp[maxNode][1:]
ar = append(ar, UpdateList{
target: target,
replica: core,
collection: collection,
})
fmt.Println(target, core, collection)
}
err := sc.Run(ar)
return err
}

0 comments on commit 7c54afa

Please sign in to comment.