From be1d4975256a856437a2237c26aa7d2e22eff6b4 Mon Sep 17 00:00:00 2001 From: David Gubler Date: Mon, 29 Jan 2024 13:03:57 +0100 Subject: [PATCH] fix: Parallelize user fetching to make it faster --- pkg/keycloakClient.go | 96 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 90 insertions(+), 6 deletions(-) diff --git a/pkg/keycloakClient.go b/pkg/keycloakClient.go index d8cc1e6..8e57e3b 100644 --- a/pkg/keycloakClient.go +++ b/pkg/keycloakClient.go @@ -149,8 +149,36 @@ func (this *KeycloakClient) GetToken() (string, error) { return fmt.Sprintf("%s", accessToken), nil } -func (this *KeycloakClient) GetUsers(token string) ([]*KeycloakUser, error) { - req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users?max=100000", this.baseURL.String(), this.realm), nil) +func (this *KeycloakClient) getUsersCount(token string) (uint32, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users/count", this.baseURL.String(), this.realm), nil) + if err != nil { + return 0, err + } + + req.Header["Authorization"] = []string{"Bearer " + token} + req.Header["cache-control"] = []string{"no-cache"} + + r, err := this.client.Do(req) + if err != nil { + return 0, err + } + defer r.Body.Close() + + body, err := io.ReadAll(r.Body) + if err != nil { + return 0, err + } + + var count uint32 = 0 + err = json.Unmarshal(body, &count) + if err != nil { + return 0, err + } + return count, nil +} + +func (this *KeycloakClient) getUsers(token string, batchSize uint32, first uint32) ([]*KeycloakUser, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users?max=%d&first=%d", this.baseURL.String(), this.realm, batchSize, first), nil) if err != nil { return nil, err } @@ -177,6 +205,62 @@ func (this *KeycloakClient) GetUsers(token string) ([]*KeycloakUser, error) { return keycloakUsers, nil } +func (this *KeycloakClient) usersWorker(token string, batchSize uint32, firstChan chan uint32, results *sync.Map, errorCount *uint64, wg *sync.WaitGroup) { + defer wg.Done() + + for first := range firstChan { + groups, err := this.getUsers(token, batchSize, first) + if err != nil { + atomic.AddUint64(errorCount, 1) + klog.Error(err) + } + results.Store(first, groups) + } +} + +func (this *KeycloakClient) GetUsers(token string) ([]*KeycloakUser, error) { + // This could be a simple straight fetch of all users, but because of https://github.com/keycloak/keycloak/issues/10005 + // we need to do parallel fetches to keep the load times reasonable + count, err := this.getUsersCount(token) + if err != nil { + return nil, err + } + + results := sync.Map{} + var errorCount uint64 + var batchSize uint32 = 50 + + firstChan := make(chan uint32) + wg := new(sync.WaitGroup) + + // creating workers + for i := 0; i < 10; i++ { + wg.Add(1) + go this.usersWorker(token, batchSize, firstChan, &results, &errorCount, wg) + } + + // sending batches to workers + var i uint32 + for i = 0; i <= count/batchSize; i++ { + firstChan <- i * batchSize + } + + close(firstChan) + wg.Wait() + + if errorCount > 0 { + return nil, errors.New("Could not fetch all users") + } + + users := make([]*KeycloakUser, 0) + results.Range(func(k, v interface{}) bool { + users = append(users, v.([]*KeycloakUser)...) + return true + }) + + return users, nil +} + func (this *KeycloakClient) GetGroups(token string) ([]*KeycloakGroup, error) { req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/groups?max=100000&briefRepresentation=false", this.baseURL.String(), this.realm), nil) if err != nil { @@ -235,7 +319,7 @@ func (this *KeycloakClient) findSubgroup(groups []*KeycloakGroup) { } } -func (this *KeycloakClient) GetGroupMembership(token string, user *KeycloakUser) ([]*KeycloakGroup, error) { +func (this *KeycloakClient) getGroupMembership(token string, user *KeycloakUser) ([]*KeycloakGroup, error) { req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users/%s/groups", this.baseURL.String(), this.realm, user.Id), nil) if err != nil { return nil, err @@ -263,11 +347,11 @@ func (this *KeycloakClient) GetGroupMembership(token string, user *KeycloakUser) return groups, nil } -func (this *KeycloakClient) worker(token string, userChan chan *KeycloakUser, results *sync.Map, errorCount *uint64, wg *sync.WaitGroup) { +func (this *KeycloakClient) groupMembershipWorker(token string, userChan chan *KeycloakUser, results *sync.Map, errorCount *uint64, wg *sync.WaitGroup) { defer wg.Done() for user := range userChan { - groups, err := this.GetGroupMembership(token, user) + groups, err := this.getGroupMembership(token, user) if err != nil { atomic.AddUint64(errorCount, 1) klog.Error(err) @@ -286,7 +370,7 @@ func (this *KeycloakClient) GetGroupMemberships(token string, users []*KeycloakU // creating workers for i := 0; i < 10; i++ { wg.Add(1) - go this.worker(token, userChan, &results, &errorCount, wg) + go this.groupMembershipWorker(token, userChan, &results, &errorCount, wg) } // sending users to workers