diff --git a/config/config.go b/config/config.go index 6ec62ad..39a32ea 100644 --- a/config/config.go +++ b/config/config.go @@ -11,21 +11,22 @@ import ( // SuperKeyWorkerConfig is the struct for storing runtime configuration type SuperKeyWorkerConfig struct { - Hostname string - KafkaBrokerConfig []clowder.BrokerConfig - KafkaTopics map[string]string - KafkaGroupID string - MetricsPort int - LogLevel string - LogGroup string - LogHandler string - AwsRegion string - AwsAccessKeyID string - AwsSecretAccessKey string - SourcesHost string - SourcesScheme string - SourcesPort int - SourcesPSK string + Hostname string + KafkaBrokerConfig []clowder.BrokerConfig + KafkaTopics map[string]string + KafkaGroupID string + MetricsPort int + LogLevel string + LogGroup string + LogHandler string + AwsRegion string + AwsAccessKeyID string + AwsSecretAccessKey string + SourcesHost string + SourcesScheme string + SourcesPort int + SourcesPSK string + SourcesRequestsMaxAttempts int } // Get - returns the config parsed from runtime vars @@ -81,6 +82,20 @@ func Get() *SuperKeyWorkerConfig { options.SetDefault("SourcesPort", os.Getenv("SOURCES_PORT")) options.SetDefault("SourcesPSK", os.Getenv("SOURCES_PSK")) + // Get the number of maximum request attempts we want to make to the Sources' API. + sourcesRequestsMaxAttempts, err := strconv.Atoi(os.Getenv("SOURCES_MAX_ATTEMPTS")) + if err != nil { + log.Printf(`Warning: the provided max attempts value \"%s\" is not an integer. Setting default value of 1.`, os.Getenv("SOURCES_MAX_ATTEMPTS")) + sourcesRequestsMaxAttempts = 1 + } + + if sourcesRequestsMaxAttempts < 1 { + log.Printf(`Warning: the provided max attempts value \"%s\" is lower than 1, and we need to at least make one attempt when calling Sources. Setting default value of 1.`, os.Getenv("SOURCES_MAX_ATTEMPTS")) + sourcesRequestsMaxAttempts = 1 + } + + options.SetDefault("SourcesRequestsMaxAttempts", sourcesRequestsMaxAttempts) + hostname, _ := os.Hostname() options.SetDefault("Hostname", hostname) @@ -94,21 +109,22 @@ func Get() *SuperKeyWorkerConfig { options.AutomaticEnv() return &SuperKeyWorkerConfig{ - Hostname: options.GetString("Hostname"), - KafkaBrokerConfig: brokerConfig, - KafkaTopics: options.GetStringMapString("KafkaTopics"), - KafkaGroupID: options.GetString("KafkaGroupID"), - MetricsPort: options.GetInt("MetricsPort"), - LogLevel: options.GetString("LogLevel"), - LogHandler: options.GetString("LogHandler"), - LogGroup: options.GetString("LogGroup"), - AwsRegion: options.GetString("AwsRegion"), - AwsAccessKeyID: options.GetString("AwsAccessKeyID"), - AwsSecretAccessKey: options.GetString("AwsSecretAccessKey"), - SourcesHost: options.GetString("SourcesHost"), - SourcesScheme: options.GetString("SourcesScheme"), - SourcesPort: options.GetInt("SourcesPort"), - SourcesPSK: options.GetString("SourcesPSK"), + Hostname: options.GetString("Hostname"), + KafkaBrokerConfig: brokerConfig, + KafkaTopics: options.GetStringMapString("KafkaTopics"), + KafkaGroupID: options.GetString("KafkaGroupID"), + MetricsPort: options.GetInt("MetricsPort"), + LogLevel: options.GetString("LogLevel"), + LogHandler: options.GetString("LogHandler"), + LogGroup: options.GetString("LogGroup"), + AwsRegion: options.GetString("AwsRegion"), + AwsAccessKeyID: options.GetString("AwsAccessKeyID"), + AwsSecretAccessKey: options.GetString("AwsSecretAccessKey"), + SourcesHost: options.GetString("SourcesHost"), + SourcesScheme: options.GetString("SourcesScheme"), + SourcesPort: options.GetInt("SourcesPort"), + SourcesPSK: options.GetString("SourcesPSK"), + SourcesRequestsMaxAttempts: options.GetInt("SourcesRequestsMaxAttempts"), } } diff --git a/provider/forge.go b/provider/forge.go index 50fddca..d4ec774 100644 --- a/provider/forge.go +++ b/provider/forge.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/redhatinsights/sources-superkey-worker/amazon" + "github.com/redhatinsights/sources-superkey-worker/config" "github.com/redhatinsights/sources-superkey-worker/sources" "github.com/redhatinsights/sources-superkey-worker/superkey" ) @@ -45,8 +46,14 @@ func TearDown(ctx context.Context, f *superkey.ForgedApplication) []error { // getProvider returns a provider based on create request's provider + credentials func getProvider(ctx context.Context, request *superkey.CreateRequest) (superkey.Provider, error) { - client := sources.SourcesClient{AccountNumber: request.TenantID, IdentityHeader: request.IdentityHeader, OrgId: request.OrgIdHeader} - auth, err := client.GetInternalAuthentication(ctx, request.SuperKey) + sourcesRestClient := sources.NewSourcesClient(config.Get()) + + authData := sources.AuthenticationData{ + IdentityHeader: request.IdentityHeader, + OrgId: request.OrgIdHeader, + } + + auth, err := sourcesRestClient.GetInternalAuthentication(ctx, &authData, request.SuperKey) if err != nil { return nil, fmt.Errorf(`error while fetching internal authentication "%s" from Sources: %w`, request.SuperKey, err) } diff --git a/sources/api_client.go b/sources/api_client.go index e5315e9..a99b27a 100644 --- a/sources/api_client.go +++ b/sources/api_client.go @@ -16,269 +16,225 @@ import ( "github.com/sirupsen/logrus" ) -var conf = config.Get() +// sourcesClient holds the required information to be able to send requests back to the Sources API. +type sourcesClient struct { + baseV31URL *url.URL + baseV20InternalUrl *url.URL + config *config.SuperKeyWorkerConfig +} -type SourcesClient struct { +// AuthenticationData +type AuthenticationData struct { IdentityHeader string OrgId string AccountNumber string } -func (sc *SourcesClient) CheckAvailability(ctx context.Context, sourceId string) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/sources/%v/check_availability", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, sourceId, - )) - - req := &http.Request{ - Method: http.MethodPost, - URL: reqURL, - Header: sc.headers(), - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("unable to send request: %w", err) - } - - l.LogWithContext(ctx).WithField("request_url", reqURL).Debugf("Requesting an availability check") +// PatchApplicationRequest represents the fields that we might want to update when updating the application's details. +// +// The AvailabilityStatus field represents the current application's availability status. +// The AvailabilityStatusError field gives information about why the status might not be "available". +// The Extra field allows adding extra fields to the application, such as the Superkey key. +type PatchApplicationRequest struct { + AvailabilityStatus *string `json:"availability_status"` + AvailabilityStatusError *string `json:"availability_status_error"` + Extra map[string]interface{} `json:"extra"` +} - defer resp.Body.Close() +// PatchSourceRequest represents the availability status field that we might want to update in a Source. +// +// The AvailabilityStatus field represents the current sources' availability status. +type PatchSourceRequest struct { + AvailabilityStatus *string `json:"availability_status"` +} - if resp.StatusCode != 202 { - return fmt.Errorf(`expecting a 202 status code, got "%d"`, resp.StatusCode) +// NewSourcesClient initializes a new SourcesClient to be able to communicate with the Sources API. +func NewSourcesClient(config *config.SuperKeyWorkerConfig) *sourcesClient { + return &sourcesClient{ + baseV20InternalUrl: &url.URL{ + Host: fmt.Sprintf("%s:%d", config.SourcesHost, config.SourcesPort), + Path: "/internal/v2.0/", + Scheme: config.SourcesScheme, + }, + baseV31URL: &url.URL{ + Host: fmt.Sprintf("%s:%d", config.SourcesHost, config.SourcesPort), + Path: "/api/sources/v3.1", + Scheme: config.SourcesScheme, + }, + config: config, } - - return nil } -func (sc *SourcesClient) CreateAuthentication(ctx context.Context, auth *model.AuthenticationCreateRequest) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/authentications", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, - )) - - body, err := json.Marshal(auth) - if err != nil { - return fmt.Errorf("failed to marshal request body: %w", err) - } +func (sc *sourcesClient) TriggerSourceAvailabilityCheck(ctx context.Context, authData *AuthenticationData, sourceId string) error { + checkAvailabilityUrl := sc.baseV31URL.JoinPath("/sources/", url.PathEscape(sourceId), "/check_availability") - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Creating authentication in Sources") + return sc.sendRequest(ctx, http.MethodPost, checkAvailabilityUrl, authData, nil, nil) +} - req := &http.Request{ - Method: http.MethodPost, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), - } +func (sc *sourcesClient) CreateAuthentication(ctx context.Context, authData *AuthenticationData, sourcesAuthentication *model.AuthenticationCreateRequest) (*model.AuthenticationResponse, error) { + createAuthenticationUrl := sc.baseV31URL.JoinPath("/authentications") - resp, err := http.DefaultClient.Do(req) + var createdAuthentication model.AuthenticationResponse + err := sc.sendRequest(ctx, http.MethodPost, createAuthenticationUrl, authData, sourcesAuthentication, createdAuthentication) if err != nil { - return fmt.Errorf("unable to send request: %w", err) + return nil, fmt.Errorf("error while creating authentication: %w", err) } - defer resp.Body.Close() - - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } + return &createdAuthentication, nil +} - bytes, _ := io.ReadAll(resp.Body) - var createdAuth model.AuthenticationResponse - err = json.Unmarshal(bytes, &createdAuth) - if err != nil { - return fmt.Errorf("unable to unmarshal authentication creation response from Sources: %w", err) - } +func (sc *sourcesClient) CreateApplicationAuthentication(ctx context.Context, authData *AuthenticationData, appAuthCreateRequest *model.ApplicationAuthenticationCreateRequest) error { + createApplicationAuthenticationUrl := sc.baseV31URL.JoinPath("/application_authentications") - err = sc.createApplicationAuthentication(ctx, &model.ApplicationAuthenticationCreateRequest{ - ApplicationIDRaw: auth.ResourceIDRaw, - AuthenticationIDRaw: createdAuth.ID, - }) + err := sc.sendRequest(ctx, http.MethodPost, createApplicationAuthenticationUrl, authData, appAuthCreateRequest, nil) if err != nil { - return err + return fmt.Errorf("error while creating the application authentication: %w", err) } return nil } -func (sc *SourcesClient) PatchApplication(ctx context.Context, appID string, payload map[string]interface{}) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/applications/%v", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, appID, - )) +func (sc *sourcesClient) PatchApplication(ctx context.Context, authData *AuthenticationData, appId string, patchApplicationRequest *PatchApplicationRequest) error { + patchApplicationUrl := sc.baseV31URL.JoinPath("/applications/", url.PathEscape(appId)) - body, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("failed to marshal request body: %w", err) - } - - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Patching application in Sources") + return sc.sendRequest(ctx, http.MethodPatch, patchApplicationUrl, authData, patchApplicationRequest, nil) +} - req := &http.Request{ - Method: http.MethodPatch, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), - } +func (sc *sourcesClient) PatchSource(ctx context.Context, authData *AuthenticationData, sourceId string, patchSourceRequest *PatchSourceRequest) error { + patchSourceUrl := sc.baseV31URL.JoinPath("/sources/" + url.PathEscape(sourceId)) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("unable to send request: %w", err) - } + return sc.sendRequest(ctx, http.MethodPatch, patchSourceUrl, authData, patchSourceRequest, nil) +} - defer resp.Body.Close() +func (sc *sourcesClient) GetInternalAuthentication(ctx context.Context, authData *AuthenticationData, authId string) (*model.AuthenticationInternalResponse, error) { + getInternalAuthUrl := sc.baseV20InternalUrl.JoinPath("/authentications/", url.PathEscape(authId), "/?expose_encrypted_attribute[]=password") - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } + var authInternalResponse *model.AuthenticationInternalResponse = nil + err := sc.sendRequest(ctx, http.MethodGet, getInternalAuthUrl, authData, nil, &authInternalResponse) - return nil + return authInternalResponse, err } -func (sc *SourcesClient) PatchSource(ctx context.Context, sourceId string, payload map[string]interface{}) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/sources/%v", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, sourceId, - )) - - body, err := json.Marshal(payload) - if err != nil { - return err - } +// sendRequest sends a request with the provided method and body to the given url, performing a maximum number of +// attempts and marshaling the incoming response's body. You can leave the body and the marshalTarget arguments empty +// if you do not require them. +func (sc *sourcesClient) sendRequest(ctx context.Context, httpMethod string, url *url.URL, authData *AuthenticationData, body interface{}, marshalTarget interface{}) error { + // Set up a timeout so that the requests don't hang up forever. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // When a body is specified, attempt to marshal it as JSON. + var requestBody *bytes.Buffer = nil + if body != nil { + tmp, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("failed to marshal request body: %w", err) + } - req := &http.Request{ - Method: http.MethodPatch, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), + requestBody = bytes.NewBuffer(tmp) } - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Patching source in Sources") - - resp, err := http.DefaultClient.Do(req) + // Create the request. + request, err := http.NewRequestWithContext(ctx, httpMethod, url.String(), requestBody) if err != nil { - return fmt.Errorf("unable to send request: %w", err) + return fmt.Errorf(`failed to create request: %w`, err) } - defer resp.Body.Close() + // Include the headers in the request. + sc.addAuthenticationHeaders(request, authData) - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } + // Perform the actual request. + var response *http.Response + for attempt := 0; attempt < sc.config.SourcesRequestsMaxAttempts; attempt++ { + response, err = http.DefaultClient.Do(request) - return nil -} + // The "err" check is to avoid nil dereference errors, since if we attempt checking for the status code + // directly when an error has occurred, the "response" struct might be nil. + if err == nil && sc.isStatusCodeFamilyOf2xx(response.StatusCode) { + break + } -// GetInternalAuthentication requests an authentication via the internal sources api -// that way we can expose the password. -// returns: populated sources api Authentication object, error -func (sc *SourcesClient) GetInternalAuthentication(ctx context.Context, authID string) (*model.AuthenticationInternalResponse, error) { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/internal/v2.0/authentications/%v?expose_encrypted_attribute[]=password", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, authID, - )) - - req := &http.Request{ - Method: http.MethodGet, - URL: reqURL, - Header: sc.headers(), - } + // When there are no errors but the status code is not the expected one, we attempt to drain the body so that + // the default client can reuse the connection, and then we close the body to avoid memory leaks. + if err == nil && !sc.isStatusCodeFamilyOf2xx(response.StatusCode) { + _, drainErr := io.Copy(io.Discard, response.Body) - var res *http.Response - var err error - for retry := 0; retry < 5; retry++ { - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "authentication_id": authID}).Debugf("Getting internal authentication from Sources") + if drainErr != nil { + l.Log.WithFields(logrus.Fields{}).Warnf("Unable to drain response body. The connection will not be reused by the default HTTP client: %s", drainErr) + } - res, err = http.DefaultClient.Do(req) + if closeErr := response.Body.Close(); closeErr != nil { + l.Log.WithFields(logrus.Fields{}).Errorf("Failed to close incoming response's body: %s", closeErr) + } - if err != nil || res.StatusCode == 200 { - defer res.Body.Close() - break - } else { - l.LogWithContext(ctx).WithField("authentication_id", authID).Warn("Unable to fetch internal authentication. Retrying...") - time.Sleep(3 * time.Second) + l.Log.WithFields(logrus.Fields{}).Debugf(`Unexpected status code received. Want "2xx", got "%d"`, response.StatusCode) + continue + } + + if err != nil { + l.Log.WithFields(logrus.Fields{}).Warn("Failed to send request. Retrying...") + l.Log.WithFields(logrus.Fields{}).Debugf("Failed to send request. Retrying... Cause: %s", err) } } - if err != nil || res.StatusCode != 200 { - return nil, fmt.Errorf(`unable to fetch internal authentication "%s" after 5 retries: %w`, authID, err) + // In the case in which we deplete all the attempts, we have to return the error and stop the execution here. + if err != nil || response == nil { + return fmt.Errorf("failed to send request: %w", err) } - data, _ := io.ReadAll(res.Body) - auth := model.AuthenticationInternalResponse{} + // Always read the response body, in case we need to return it in an error or marshal it to a struct. + responseBody, err := io.ReadAll(response.Body) + if err != nil { + return fmt.Errorf(`failed to read response body: %w`, err) + } - // unmarshaling the data from the request, the id comes back as a string which fills `err` - // we can safely ignore that as long as username/pass are there. - err = json.Unmarshal(data, &auth) - if err != nil && (auth.Username == "" || auth.Password == "") { - return nil, fmt.Errorf(`internal authentication "%s"'s username or password are empty'`, authID) + // Make sure that the status code is a "2xx" one. + if !sc.isStatusCodeFamilyOf2xx(response.StatusCode) { + return fmt.Errorf(`unexpected status code received. Want "2xx", got "%d". Response body: %s`, response.StatusCode, string(responseBody)) } - return &auth, nil + // We might need to marshal the incoming response in the specified struct. + if marshalTarget != nil { + err = json.Unmarshal(responseBody, &marshalTarget) + if err != nil { + return fmt.Errorf(`failed to unmarshal response body: %w`, err) + } + } + + return nil } -func (sc *SourcesClient) headers() map[string][]string { - var headers = make(map[string][]string) +// isStatusCodeFamilyOf2xx returns true if the given status code is a 2xx status code. +func (sc *sourcesClient) isStatusCodeFamilyOf2xx(statusCode int) bool { + return statusCode >= 200 && statusCode < 300 +} - headers["Content-Type"] = []string{"application/json"} +func (sc *sourcesClient) addAuthenticationHeaders(request *http.Request, authData *AuthenticationData) { + request.Header.Add("Content-Type", "application/json") - if conf.SourcesPSK == "" { + if sc.config.SourcesPSK == "" { var xRhId string - if sc.IdentityHeader == "" { - xRhId = encodeIdentity(sc.AccountNumber, sc.OrgId) + if authData.IdentityHeader == "" { + xRhId = encodeIdentity(authData.AccountNumber, authData.OrgId) } else { - xRhId = sc.IdentityHeader + xRhId = authData.IdentityHeader } - headers["x-rh-identity"] = []string{xRhId} + request.Header.Add("x-rh-identity", xRhId) } else { - headers["x-rh-sources-psk"] = []string{conf.SourcesPSK} + request.Header.Add("x-rh-sources-psk", sc.config.SourcesPSK) - if sc.AccountNumber != "" { - headers["x-rh-sources-account-number"] = []string{sc.AccountNumber} + if authData.AccountNumber != "" { + request.Header.Add("x-rh-sources-account-number", authData.AccountNumber) } - if sc.IdentityHeader != "" { - headers["x-rh-identity"] = []string{sc.IdentityHeader} + if authData.IdentityHeader != "" { + request.Header.Add("x-rh-identity", authData.IdentityHeader) } - if sc.OrgId != "" { - headers["x-rh-org-id"] = []string{sc.OrgId} + if authData.OrgId != "" { + request.Header.Add("x-rh-org-id", authData.OrgId) } } - - return headers -} - -func (sc *SourcesClient) createApplicationAuthentication(ctx context.Context, appAuth *model.ApplicationAuthenticationCreateRequest) error { - reqURL, _ := url.Parse(fmt.Sprintf( - "%v://%v:%v/api/sources/v3.1/application_authentications", conf.SourcesScheme, conf.SourcesHost, conf.SourcesPort, - )) - - body, err := json.Marshal(appAuth) - if err != nil { - return err - } - - req := &http.Request{ - Method: http.MethodPost, - URL: reqURL, - Header: sc.headers(), - Body: io.NopCloser(bytes.NewBuffer(body)), - } - - l.LogWithContext(ctx).WithFields(logrus.Fields{"request_url": reqURL, "body": string(body)}).Debugf("Creating application authentication in Sources") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("unable to send request: %w", err) - } - - defer resp.Body.Close() - - if resp.StatusCode > 299 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf(`expecting a 200 status code, got "%d" with body "%s"`, resp.StatusCode, string(b)) - } - - return nil } diff --git a/sources/api_client_interface.go b/sources/api_client_interface.go new file mode 100644 index 0000000..17a1728 --- /dev/null +++ b/sources/api_client_interface.go @@ -0,0 +1,24 @@ +package sources + +import ( + "context" + + "github.com/RedHatInsights/sources-api-go/model" +) + +// RestClient represents the Sources' endpoints that are required for the Superkey to be able to talk to the Sources' +// API. +type RestClient interface { + // TriggerSourceAvailabilityCheck triggers an availability status check in the Sources API for the given source. + TriggerSourceAvailabilityCheck(ctx context.Context, authData *AuthenticationData, sourceId string) error + // CreateAuthentication creates an authentication in Sources. + CreateAuthentication(ctx context.Context, authData *AuthenticationData, sourcesAuthentication *model.AuthenticationCreateRequest) (*model.AuthenticationResponse, error) + // CreateApplicationAuthentication links the created authentication with an application in Sources. + CreateApplicationAuthentication(ctx context.Context, authData *AuthenticationData, appAuthCreateRequest *model.ApplicationAuthenticationCreateRequest) error + // PatchApplication modifies an application in Sources. + PatchApplication(ctx context.Context, authData *AuthenticationData, appId string, patchApplicationRequest *PatchApplicationRequest) error + // PatchSource modifies an application in Sources. + PatchSource(ctx context.Context, authData *AuthenticationData, sourceId string, patchSourceRequest *PatchSourceRequest) error + // GetInternalAuthentication fetches an authentication using the internal Sources' endpoint, which ensure that the authentication will have the password as well. + GetInternalAuthentication(ctx context.Context, authData *AuthenticationData, authId string) (*model.AuthenticationInternalResponse, error) +} diff --git a/superkey/create_request.go b/superkey/create_request.go index c3854b7..7fa727c 100644 --- a/superkey/create_request.go +++ b/superkey/create_request.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/redhatinsights/sources-superkey-worker/config" l "github.com/redhatinsights/sources-superkey-worker/logger" "github.com/redhatinsights/sources-superkey-worker/sources" ) @@ -26,24 +27,28 @@ func (req *CreateRequest) MarkSourceUnavailable(ctx context.Context, incomingErr newApplication = &ForgedApplication{} } - if newApplication.SourcesClient == nil { - newApplication.SourcesClient = &sources.SourcesClient{IdentityHeader: req.IdentityHeader, OrgId: req.OrgIdHeader, AccountNumber: req.TenantID} + sourcesClient := sources.NewSourcesClient(config.Get()) + + authData := &sources.AuthenticationData{ + IdentityHeader: req.IdentityHeader, + OrgId: req.OrgIdHeader, + } + + patchAppRequestBody := &sources.PatchApplicationRequest{ + AvailabilityStatus: &availabilityStatus, + AvailabilityStatusError: &availabilityStatusError, + Extra: extra, } - err := newApplication.SourcesClient.PatchApplication(ctx, req.ApplicationID, map[string]interface{}{ - "availability_status": availabilityStatus, - "availability_status_error": availabilityStatusError, - "extra": extra, - }) + err := sourcesClient.PatchApplication(ctx, authData, req.ApplicationID, patchAppRequestBody) if err != nil { return fmt.Errorf("error while updating the application: %w", err) } l.LogWithContext(ctx).Info(`Application marked as "unavailable"`) - err = newApplication.SourcesClient.PatchSource(ctx, req.SourceID, map[string]interface{}{ - "availability_status": availabilityStatus, - }) + patchSourceRequestBody := &sources.PatchSourceRequest{AvailabilityStatus: &availabilityStatus} + err = sourcesClient.PatchSource(ctx, authData, req.SourceID, patchSourceRequestBody) if err != nil { return fmt.Errorf("error while updating the source: %w", err) } diff --git a/superkey/forged_application.go b/superkey/forged_application.go index 00c0869..6fcc1bc 100644 --- a/superkey/forged_application.go +++ b/superkey/forged_application.go @@ -8,6 +8,7 @@ import ( "time" "github.com/RedHatInsights/sources-api-go/model" + "github.com/redhatinsights/sources-superkey-worker/config" l "github.com/redhatinsights/sources-superkey-worker/logger" "github.com/redhatinsights/sources-superkey-worker/sources" ) @@ -40,27 +41,24 @@ func (f *ForgedApplication) CreateInSourcesAPI(ctx context.Context) error { // before it's ready. time.Sleep(waitTime() * time.Second) - // create a sources client for our identity + account number - if f.SourcesClient == nil { - f.SourcesClient = &sources.SourcesClient{IdentityHeader: f.Request.IdentityHeader, OrgId: f.Request.OrgIdHeader, AccountNumber: f.Request.TenantID} - } + sourcesClient := sources.NewSourcesClient(config.Get()) l.LogWithContext(ctx).Debugf("Posting resources back to Sources API: %v", f) - err := f.storeSuperKeyData(ctx) + err := f.storeSuperKeyData(ctx, sourcesClient) if err != nil { return fmt.Errorf("error while storing the superkey data in Sources: %w", err) } l.LogWithContext(ctx).Info("Superkey data stored in Sources") - err = f.createAuthentications(ctx) + err = f.createAuthentications(ctx, sourcesClient) if err != nil { return fmt.Errorf("error while creating the authentications in Sources: %w", err) } l.LogWithContext(ctx).Info("Authentications created in Sources") - err = f.checkAvailability(ctx) + err = f.checkAvailability(ctx, sourcesClient) if err != nil { return fmt.Errorf("error while triggering an availability check in Sources: %w", err) } @@ -71,13 +69,18 @@ func (f *ForgedApplication) CreateInSourcesAPI(ctx context.Context) error { return nil } -func (f *ForgedApplication) createAuthentications(ctx context.Context) error { +func (f *ForgedApplication) createAuthentications(ctx context.Context, sourcesRestClient sources.RestClient) error { extra := map[string]interface{}{} externalID, ok := f.Request.Extra["external_id"] if ok { extra["external_id"] = externalID } + authData := sources.AuthenticationData{ + IdentityHeader: f.Request.IdentityHeader, + OrgId: f.Request.OrgIdHeader, + } + auth := model.AuthenticationCreateRequest{ AuthType: f.Product.AuthPayload.AuthType, Username: f.Product.AuthPayload.Username, @@ -86,19 +89,31 @@ func (f *ForgedApplication) createAuthentications(ctx context.Context) error { Extra: extra, } - err := f.SourcesClient.CreateAuthentication(ctx, &auth) + createdAuthentication, err := sourcesRestClient.CreateAuthentication(ctx, &authData, &auth) if err != nil { return fmt.Errorf("error while creating the authentication in Sources: %w", err) } + appAuthBody := model.ApplicationAuthenticationCreateRequest{ + ApplicationIDRaw: f.Request.ApplicationID, + AuthenticationIDRaw: createdAuthentication.ID, + } + + err = sourcesRestClient.CreateApplicationAuthentication(ctx, &authData, &appAuthBody) + if err != nil { + return fmt.Errorf("error while associating the authentication with an application in Sources: %w", err) + } + return nil } -func (f *ForgedApplication) storeSuperKeyData(ctx context.Context) error { - err := f.SourcesClient.PatchApplication(ctx, f.Request.ApplicationID, map[string]interface{}{ - "extra": f.Product.Extra, - }) +func (f *ForgedApplication) storeSuperKeyData(ctx context.Context, sourcesRestClient sources.RestClient) error { + authData := sources.AuthenticationData{ + IdentityHeader: f.Request.IdentityHeader, + OrgId: f.Request.OrgIdHeader, + } + err := sourcesRestClient.PatchApplication(ctx, &authData, f.Request.ApplicationID, &sources.PatchApplicationRequest{Extra: f.Product.Extra}) if err != nil { return fmt.Errorf("failed to update application with superkey data: %w", err) } @@ -106,8 +121,13 @@ func (f *ForgedApplication) storeSuperKeyData(ctx context.Context) error { return nil } -func (f *ForgedApplication) checkAvailability(ctx context.Context) error { - err := f.SourcesClient.CheckAvailability(ctx, f.Product.SourceID) +func (f *ForgedApplication) checkAvailability(ctx context.Context, sourcesRestClient sources.RestClient) error { + authData := sources.AuthenticationData{ + IdentityHeader: f.Request.IdentityHeader, + OrgId: f.Request.OrgIdHeader, + } + + err := sourcesRestClient.TriggerSourceAvailabilityCheck(ctx, &authData, f.Product.SourceID) if err != nil { return err } diff --git a/superkey/types.go b/superkey/types.go index e96b46a..53cebf7 100644 --- a/superkey/types.go +++ b/superkey/types.go @@ -4,7 +4,6 @@ import ( "context" "github.com/RedHatInsights/sources-api-go/model" - "github.com/redhatinsights/sources-superkey-worker/sources" ) // CreateRequest - struct representing a request for a superkey @@ -56,7 +55,6 @@ type ForgedApplication struct { Request *CreateRequest Client Provider GUID string - SourcesClient *sources.SourcesClient } // Provider the interface for all of the superkey providers currently just a