From 62b929927468bc574446ba686d8caf38bc75db9e Mon Sep 17 00:00:00 2001 From: Derek Horton Date: Fri, 12 Jul 2024 11:47:44 -0500 Subject: [PATCH] Configuration and cleanup --- .../inventory_stale_timestamp_updater.go | 28 +- internal/config/config.go | 506 +++++++++--------- .../sql_connection_registrar.go | 9 +- .../stale_connection_locator.go | 14 +- internal/controller/account_resolver.go | 5 +- 5 files changed, 289 insertions(+), 273 deletions(-) diff --git a/cmd/cloud-connector/inventory_stale_timestamp_updater.go b/cmd/cloud-connector/inventory_stale_timestamp_updater.go index 79227357..31600250 100644 --- a/cmd/cloud-connector/inventory_stale_timestamp_updater.go +++ b/cmd/cloud-connector/inventory_stale_timestamp_updater.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "time" "github.com/RedHatInsights/cloud-connector/internal/config" @@ -36,6 +37,11 @@ func startInventoryStaleTimestampUpdater() { logger.LogFatalError("Failed to connect to the database", err) } + connectionRegistrar, err := connection_repository.NewSqlConnectionRegistrar(cfg, databaseConn) + if err != nil { + logger.LogFatalError("Failed to create connection registrar", err) + } + accountResolver, err := controller.NewAccountIdResolver(cfg.ClientIdToAccountIdImpl, cfg) if err != nil { logger.LogFatalError("Failed to create Account ID Resolver", err) @@ -83,28 +89,32 @@ func startInventoryStaleTimestampUpdater() { log.Debug("Processing stale connection") - identity, _, _, err := accountResolver.MapClientIdToAccountId(ctx, rhcClient.ClientID) + if rhcClient.TenantLookupFailureCount >= cfg.PurgeConnectionOnFailedTenantLookupCount { + logger.LogErrorWithAccountAndClientId("Tenant lookup failed for connection too many times. Purging connection...", err, rhcClient.Account, rhcClient.OrgID, rhcClient.ClientID) + connectionRegistrar.Unregister(ctx, rhcClient.ClientID) + return fmt.Errorf("Tenant lookup failed to many times") + } + + identity, _, _, err := accountResolver.MapClientIdToAccountId(context.Background(), rhcClient.ClientID) if err != nil { - rhcClient.TenantLookupFailureCount++ + logger.LogErrorWithAccountAndClientId("Unable to retrieve identity for connection", err, rhcClient.Account, rhcClient.OrgID, rhcClient.ClientID) - connection_repository.RecordFailedTenantLookup(ctx, databaseConn, sqlTimeout, rhcClient) + dberr := connection_repository.RecordFailedTenantLookup(ctx, databaseConn, sqlTimeout, rhcClient) + if dberr != nil { + logger.LogErrorWithAccountAndClientId("Unable to record failed tenant lookup for connection", err, rhcClient.Account, rhcClient.OrgID, rhcClient.ClientID) + } - // FIXME: Send disconnect here?? Need to determine the type of failure! - logger.LogErrorWithAccountAndClientId("Unable to retrieve identity for connection", err, rhcClient.Account, rhcClient.OrgID, rhcClient.ClientID) return err } - // FIXME: reset the tenant lookup failure count - rhcClient.TenantLookupFailureCount = 0 - err = connectedClientRecorder.RecordConnectedClient(ctx, identity, rhcClient) if err != nil { logger.LogErrorWithAccountAndClientId("Unable to sent host info to inventory", err, rhcClient.Account, rhcClient.OrgID, rhcClient.ClientID) return err } - connection_repository.UpdateStaleTimestampInDB(ctx, databaseConn, sqlTimeout, rhcClient) + connection_repository.RecordUpdatedStaleTimestamp(ctx, databaseConn, sqlTimeout, rhcClient) return nil }) diff --git a/internal/config/config.go b/internal/config/config.go index 6262ba1e..cb2b53a1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,177 +12,179 @@ import ( const ( ENV_PREFIX = "CLOUD_CONNECTOR" - URL_APP_NAME = "URL_App_Name" - URL_PATH_PREFIX = "URL_Path_Prefix" - URL_BASE_PATH = "URL_Base_Path" - OPENAPI_SPEC_FILE_PATH = "OpenAPI_Spec_File_Path" - HTTP_SHUTDOWN_TIMEOUT = "HTTP_Shutdown_Timeout" - SERVICE_TO_SERVICE_CREDENTIALS = "Service_To_Service_Credentials" - PROFILE = "Enable_Profile" - MQTT_BROKER_ADDRESS = "MQTT_Broker_Address" - MQTT_BROKER_ADDRESS_DEFAULT = "ssl://localhost:8883" - MQTT_CLIENT_ID = "MQTT_Client_Id" - MQTT_USE_HOSTNAME_AS_CLIENT_ID = "MQTT_Use_Hostname_As_Client_Id" - MQTT_CLEAN_SESSION = "MQTT_Clean_Session" - MQTT_RESUME_SUBS = "MQTT_Resume_Subs" - MQTT_BROKER_TLS_CERT_FILE = "MQTT_Broker_Tls_Cert_File" - MQTT_BROKER_TLS_KEY_FILE = "MQTT_Broker_Tls_Key_File" - MQTT_BROKER_TLS_CA_CERT_FILE = "MQTT_Broker_Tls_CA_Cert_File" - MQTT_BROKER_TLS_SKIP_VERIFY = "MQTT_Broker_Tls_Skip_Verify" - MQTT_BROKER_AUTH_TYPE = "MQTT_Broker_Auth_Type" - MQTT_BROKER_USERNAME = "MQTT_Broker_Username" - MQTT_BROKER_PASSWORD = "MQTT_Broker_Password" - MQTT_BROKER_JWT_GENERATOR_IMPL = "MQTT_Broker_JWT_Generator_Impl" - MQTT_BROKER_JWT_FILE = "MQTT_Broker_JWT_File" - MQTT_TOPIC_PREFIX = "MQTT_Topic_Prefix" - MQTT_CONTROL_SUBSCRIPTION_QOS = "MQTT_Control_Subscription_QoS" - MQTT_CONTROL_PUBLISH_QOS = "MQTT_Control_Publish_QoS" - MQTT_DATA_SUBSCRIPTION_QOS = "MQTT_Data_Subscription_QoS" - MQTT_DATA_PUBLISH_QOS = "MQTT_Data_Publish_QoS" - MQTT_DISCONNECT_QUIESCE_TIME = "MQTT_Disconnect_Quiesce_Time" - MQTT_PUBLISH_TIMEOUT = "MQTT_Publish_Timeout" - MQTT_CONSUMER_SHUTDOWN_SLEEP_TIME = "MQTT_Consumer_Shutdown_Sleep_Time" - SHUTDOWN_ON_MQTT_CONNECTION_LOST = "Shutdown_On_MQTT_Connection_Lost" - INVALID_HANDSHAKE_RECONNECT_DELAY = "Invalid_Handshake_Reconnect_Delay" - CLIENT_ID_TO_ACCOUNT_ID_IMPL = "Client_Id_To_Account_Id_Impl" - CLIENT_ID_TO_ACCOUNT_ID_CONFIG_FILE = "Client_Id_To_Account_Id_Config_File" - CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ACCOUNT_ID = "Client_Id_To_Account_Id_Default_Account_Id" - CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ORG_ID = "Client_Id_To_Account_Id_Default_Org_Id" - CLIENT_ID_TO_ACCOUNT_ID_CACHE_SIZE = "Client_Id_To_Account_Id_Cache_Size" - CLIENT_ID_TO_ACCOUNT_ID_CACHE_VALID_RESP_TTL = "Client_Id_To_Account_Id_Cache_Valid_Response_TTL" - CLIENT_ID_TO_ACCOUNT_ID_CACHE_ERROR_RESP_TTL = "Client_Id_To_Account_Id_Cache_Error_Response_TTL" - CONNECTION_DATABASE_IMPL = "Connection_Database_Impl" - CONNECTION_DATABASE_HOST = "Connection_Database_Host" - CONNECTION_DATABASE_PORT = "Connection_Database_Port" - CONNECTION_DATABASE_USER = "Connection_Database_User" - CONNECTION_DATABASE_PASSWORD = "Connection_Database_Password" - CONNECTION_DATABASE_NAME = "Connection_Database_Name" - CONNECTION_DATABASE_SSL_MODE = "Connection_Database_SSL_Mode" - CONNECTION_DATABASE_SSL_ROOT_CERT = "Connection_Database_SSL_Root_Cert" - CONNECTION_DATABASE_QUERY_TIMEOUT = "Connection_Database_Query_Timeout" - AUTH_GATEWAY_URL = "Auth_Gateway_Url" - AUTH_GATEWAY_HTTP_CLIENT_TIMEOUT = "Auth_Gateway_HTTP_Client_Timeout" - DEFAULT_KAFKA_BROKER_ADDRESS = "kafka:29092" - KAFKA_CA = "Kafka_CA" - KAFKA_USERNAME = "Kafka_Username" - KAFKA_PASSWORD = "Kafka_Password" - KAFKA_SASL_MECHANISM = "Kafka_SASL_Mechanism" - CONNECTED_CLIENT_RECORDER_IMPL = "Connected_Client_Recorder_Impl" - INVENTORY_KAFKA_BROKERS = "Inventory_Kafka_Brokers" - INVENTORY_KAFKA_TOPIC = "Inventory_Kafka_Topic" - INVENTORY_KAFKA_BATCH_SIZE = "Inventory_Kafka_Batch_Size" - INVENTORY_KAFKA_BATCH_BYTES = "Inventory_Kafka_Batch_Bytes" - INVENTORY_STALE_TIMESTAMP_OFFSET = "Inventory_Stale_Timestamp_Offset" - INVENTORY_STALE_TIMESTAMP_UPDATER_CHUNK_SIZE = "Inventory_Stale_Timestamp_Updater_Chunk_Size" - INVENTORY_REPORTER_NAME = "Inventory_Reporter_Name" - SOURCES_RECORDER_IMPL = "Sources_Recorder_Impl" - SOURCES_BASE_URL = "Sources_Base_Url" - SOURCES_HTTP_CLIENT_TIMEOUT = "Sources_HTTP_Client_Timeout" - JWT_TOKEN_EXPIRY = "JWT_Token_Expiry_Minutes" - JWT_PRIVATE_KEY_FILE = "JWT_Private_Key_File" - JWT_PUBLIC_KEY_FILE = "JWT_Public_Key_File" - RHC_MESSAGE_KAFKA_BROKERS = "RHC_Message_Kafka_Brokers" - RHC_MESSAGE_KAFKA_TOPIC = "RHC_Message_Kafka_Topic" - RHC_MESSAGE_KAFKA_TOPIC_DEFAULT = "platform.cloud-connector.rhc-message-ingress" - RHC_MESSAGE_KAFKA_BATCH_SIZE = "RHC_Message_Kafka_Batch_Size" - RHC_MESSAGE_KAFKA_BATCH_BYTES = "RHC_Message_Kafka_Batch_Bytes" - RHC_MESSAGE_KAFKA_CONSUMER_GROUP = "RHC_Message_Kafka_Consumer_Group" - PENDO_API_ENDPOINT = "Pendo_Api_Endpoint" - PENDO_REQUEST_TIMEOUT = "Pendo_Request_Timeout" - PENDO_INTEGRATION_KEY = "Pendo_Integration_Key" - PENDO_REQUEST_SIZE = "Pendo_Request_Size" - PROMETHEUS_PUSH_GATEWAY = "Prometheus_Push_Gateway" - API_SERVER_CONNECTION_LOOKUP_IMPL = "API_Server_Connection_Lookup_Impl" - TENANT_TRANSLATOR_IMPL = "Tenant_Translator_Impl" - TENANT_TRANSLATOR_MOCK_MAPPING = "Tenant_Translator_Mock_Mapping" - TENANT_TRANSLATOR_URL = "Tenant_Translator_URL" - TENANT_TRANSLATOR_TIMEOUT = "Tenant_Translator_Timeout" + URL_APP_NAME = "URL_App_Name" + URL_PATH_PREFIX = "URL_Path_Prefix" + URL_BASE_PATH = "URL_Base_Path" + OPENAPI_SPEC_FILE_PATH = "OpenAPI_Spec_File_Path" + HTTP_SHUTDOWN_TIMEOUT = "HTTP_Shutdown_Timeout" + SERVICE_TO_SERVICE_CREDENTIALS = "Service_To_Service_Credentials" + PROFILE = "Enable_Profile" + MQTT_BROKER_ADDRESS = "MQTT_Broker_Address" + MQTT_BROKER_ADDRESS_DEFAULT = "ssl://localhost:8883" + MQTT_CLIENT_ID = "MQTT_Client_Id" + MQTT_USE_HOSTNAME_AS_CLIENT_ID = "MQTT_Use_Hostname_As_Client_Id" + MQTT_CLEAN_SESSION = "MQTT_Clean_Session" + MQTT_RESUME_SUBS = "MQTT_Resume_Subs" + MQTT_BROKER_TLS_CERT_FILE = "MQTT_Broker_Tls_Cert_File" + MQTT_BROKER_TLS_KEY_FILE = "MQTT_Broker_Tls_Key_File" + MQTT_BROKER_TLS_CA_CERT_FILE = "MQTT_Broker_Tls_CA_Cert_File" + MQTT_BROKER_TLS_SKIP_VERIFY = "MQTT_Broker_Tls_Skip_Verify" + MQTT_BROKER_AUTH_TYPE = "MQTT_Broker_Auth_Type" + MQTT_BROKER_USERNAME = "MQTT_Broker_Username" + MQTT_BROKER_PASSWORD = "MQTT_Broker_Password" + MQTT_BROKER_JWT_GENERATOR_IMPL = "MQTT_Broker_JWT_Generator_Impl" + MQTT_BROKER_JWT_FILE = "MQTT_Broker_JWT_File" + MQTT_TOPIC_PREFIX = "MQTT_Topic_Prefix" + MQTT_CONTROL_SUBSCRIPTION_QOS = "MQTT_Control_Subscription_QoS" + MQTT_CONTROL_PUBLISH_QOS = "MQTT_Control_Publish_QoS" + MQTT_DATA_SUBSCRIPTION_QOS = "MQTT_Data_Subscription_QoS" + MQTT_DATA_PUBLISH_QOS = "MQTT_Data_Publish_QoS" + MQTT_DISCONNECT_QUIESCE_TIME = "MQTT_Disconnect_Quiesce_Time" + MQTT_PUBLISH_TIMEOUT = "MQTT_Publish_Timeout" + MQTT_CONSUMER_SHUTDOWN_SLEEP_TIME = "MQTT_Consumer_Shutdown_Sleep_Time" + SHUTDOWN_ON_MQTT_CONNECTION_LOST = "Shutdown_On_MQTT_Connection_Lost" + INVALID_HANDSHAKE_RECONNECT_DELAY = "Invalid_Handshake_Reconnect_Delay" + CLIENT_ID_TO_ACCOUNT_ID_IMPL = "Client_Id_To_Account_Id_Impl" + CLIENT_ID_TO_ACCOUNT_ID_CONFIG_FILE = "Client_Id_To_Account_Id_Config_File" + CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ACCOUNT_ID = "Client_Id_To_Account_Id_Default_Account_Id" + CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ORG_ID = "Client_Id_To_Account_Id_Default_Org_Id" + CLIENT_ID_TO_ACCOUNT_ID_CACHE_SIZE = "Client_Id_To_Account_Id_Cache_Size" + CLIENT_ID_TO_ACCOUNT_ID_CACHE_VALID_RESP_TTL = "Client_Id_To_Account_Id_Cache_Valid_Response_TTL" + CLIENT_ID_TO_ACCOUNT_ID_CACHE_ERROR_RESP_TTL = "Client_Id_To_Account_Id_Cache_Error_Response_TTL" + CONNECTION_DATABASE_IMPL = "Connection_Database_Impl" + CONNECTION_DATABASE_HOST = "Connection_Database_Host" + CONNECTION_DATABASE_PORT = "Connection_Database_Port" + CONNECTION_DATABASE_USER = "Connection_Database_User" + CONNECTION_DATABASE_PASSWORD = "Connection_Database_Password" + CONNECTION_DATABASE_NAME = "Connection_Database_Name" + CONNECTION_DATABASE_SSL_MODE = "Connection_Database_SSL_Mode" + CONNECTION_DATABASE_SSL_ROOT_CERT = "Connection_Database_SSL_Root_Cert" + CONNECTION_DATABASE_QUERY_TIMEOUT = "Connection_Database_Query_Timeout" + AUTH_GATEWAY_URL = "Auth_Gateway_Url" + AUTH_GATEWAY_HTTP_CLIENT_TIMEOUT = "Auth_Gateway_HTTP_Client_Timeout" + DEFAULT_KAFKA_BROKER_ADDRESS = "kafka:29092" + KAFKA_CA = "Kafka_CA" + KAFKA_USERNAME = "Kafka_Username" + KAFKA_PASSWORD = "Kafka_Password" + KAFKA_SASL_MECHANISM = "Kafka_SASL_Mechanism" + CONNECTED_CLIENT_RECORDER_IMPL = "Connected_Client_Recorder_Impl" + INVENTORY_KAFKA_BROKERS = "Inventory_Kafka_Brokers" + INVENTORY_KAFKA_TOPIC = "Inventory_Kafka_Topic" + INVENTORY_KAFKA_BATCH_SIZE = "Inventory_Kafka_Batch_Size" + INVENTORY_KAFKA_BATCH_BYTES = "Inventory_Kafka_Batch_Bytes" + INVENTORY_STALE_TIMESTAMP_OFFSET = "Inventory_Stale_Timestamp_Offset" + INVENTORY_STALE_TIMESTAMP_UPDATER_CHUNK_SIZE = "Inventory_Stale_Timestamp_Updater_Chunk_Size" + INVENTORY_REPORTER_NAME = "Inventory_Reporter_Name" + SOURCES_RECORDER_IMPL = "Sources_Recorder_Impl" + SOURCES_BASE_URL = "Sources_Base_Url" + SOURCES_HTTP_CLIENT_TIMEOUT = "Sources_HTTP_Client_Timeout" + JWT_TOKEN_EXPIRY = "JWT_Token_Expiry_Minutes" + JWT_PRIVATE_KEY_FILE = "JWT_Private_Key_File" + JWT_PUBLIC_KEY_FILE = "JWT_Public_Key_File" + RHC_MESSAGE_KAFKA_BROKERS = "RHC_Message_Kafka_Brokers" + RHC_MESSAGE_KAFKA_TOPIC = "RHC_Message_Kafka_Topic" + RHC_MESSAGE_KAFKA_TOPIC_DEFAULT = "platform.cloud-connector.rhc-message-ingress" + RHC_MESSAGE_KAFKA_BATCH_SIZE = "RHC_Message_Kafka_Batch_Size" + RHC_MESSAGE_KAFKA_BATCH_BYTES = "RHC_Message_Kafka_Batch_Bytes" + RHC_MESSAGE_KAFKA_CONSUMER_GROUP = "RHC_Message_Kafka_Consumer_Group" + PENDO_API_ENDPOINT = "Pendo_Api_Endpoint" + PENDO_REQUEST_TIMEOUT = "Pendo_Request_Timeout" + PENDO_INTEGRATION_KEY = "Pendo_Integration_Key" + PENDO_REQUEST_SIZE = "Pendo_Request_Size" + PROMETHEUS_PUSH_GATEWAY = "Prometheus_Push_Gateway" + API_SERVER_CONNECTION_LOOKUP_IMPL = "API_Server_Connection_Lookup_Impl" + TENANT_TRANSLATOR_IMPL = "Tenant_Translator_Impl" + TENANT_TRANSLATOR_MOCK_MAPPING = "Tenant_Translator_Mock_Mapping" + TENANT_TRANSLATOR_URL = "Tenant_Translator_URL" + TENANT_TRANSLATOR_TIMEOUT = "Tenant_Translator_Timeout" + PURGE_CONNECTION_ON_FAILED_TENANT_LOOKUP_COUNT = "Purge_Connection_On_Failed_Tenant_Lookup_Count" ) type Config struct { - UrlAppName string - UrlPathPrefix string - UrlBasePath string - OpenApiSpecFilePath string - HttpShutdownTimeout time.Duration - ServiceToServiceCredentials map[string]interface{} - Profile bool - MqttBrokerAddress string - MqttClientId string - MqttUseHostnameAsClientId bool - MqttCleanSession bool - MqttResumeSubs bool - MqttBrokerTlsCertFile string - MqttBrokerTlsKeyFile string - MqttBrokerTlsCACertFile string - MqttBrokerTlsSkipVerify bool - MqttBrokerAuthType string - MqttBrokerUsername string - MqttBrokerPassword string - MqttBrokerJwtGeneratorImpl string - MqttBrokerJwtFile string - MqttTopicPrefix string - MqttControlSubscriptionQoS byte - MqttControlPublishQoS byte - MqttDataSubscriptionQoS byte - MqttDataPublishQoS byte - MqttDisconnectQuiesceTime uint - MqttPublishTimeout time.Duration - MqttConsumerShutdownSleepTime time.Duration - ShutdownOnMqttConnectionLost bool - InvalidHandshakeReconnectDelay int - KafkaBrokers []string - KafkaCA string - KafkaUsername string - KafkaPassword string - KafkaSASLMechanism string - ClientIdToAccountIdImpl string - ClientIdToAccountIdConfigFile string - ClientIdToAccountIdDefaultAccountId string - ClientIdToAccountIdDefaultOrgId string - ClientIdToAccountIdCacheSize int - ClientIdToAccountIdCacheValidRespTTL time.Duration - ClientIdToAccountIdCacheErrorRespTTL time.Duration - ConnectionDatabaseImpl string - ConnectionDatabaseHost string - ConnectionDatabasePort int - ConnectionDatabaseUser string - ConnectionDatabasePassword string - ConnectionDatabaseName string - ConnectionDatabaseSslMode string - ConnectionDatabaseSslRootCert string - ConnectionDatabaseQueryTimeout time.Duration - AuthGatewayUrl string - AuthGatewayHttpClientTimeout time.Duration - ConnectedClientRecorderImpl string - InventoryKafkaBrokers []string - InventoryKafkaTopic string - InventoryKafkaBatchSize int - InventoryKafkaBatchBytes int - InventoryStaleTimestampOffset time.Duration - InventoryStaleTimestampUpdaterChunkSize int - InventoryReporterName string - SourcesRecorderImpl string - SourcesBaseUrl string - SourcesHttpClientTimeout time.Duration - JwtTokenExpiry int - JwtPrivateKeyFile string - JwtPublicKeyFile string - RhcMessageKafkaBrokers []string - RhcMessageKafkaTopic string - RhcMessageKafkaBatchSize int - RhcMessageKafkaBatchBytes int - RhcMessageKafkaConsumerGroup string - PendoApiEndpoint string - PendoRequestTimeout time.Duration - PendoIntegrationKey string - PendoRequestSize int - PrometheusPushGateway string - ApiServerConnectionLookupImpl string - TenantTranslatorImpl string - TenantTranslatorMockMapping map[string]interface{} - TenantTranslatorURL string - TenantTranslatorTimeout time.Duration + UrlAppName string + UrlPathPrefix string + UrlBasePath string + OpenApiSpecFilePath string + HttpShutdownTimeout time.Duration + ServiceToServiceCredentials map[string]interface{} + Profile bool + MqttBrokerAddress string + MqttClientId string + MqttUseHostnameAsClientId bool + MqttCleanSession bool + MqttResumeSubs bool + MqttBrokerTlsCertFile string + MqttBrokerTlsKeyFile string + MqttBrokerTlsCACertFile string + MqttBrokerTlsSkipVerify bool + MqttBrokerAuthType string + MqttBrokerUsername string + MqttBrokerPassword string + MqttBrokerJwtGeneratorImpl string + MqttBrokerJwtFile string + MqttTopicPrefix string + MqttControlSubscriptionQoS byte + MqttControlPublishQoS byte + MqttDataSubscriptionQoS byte + MqttDataPublishQoS byte + MqttDisconnectQuiesceTime uint + MqttPublishTimeout time.Duration + MqttConsumerShutdownSleepTime time.Duration + ShutdownOnMqttConnectionLost bool + InvalidHandshakeReconnectDelay int + KafkaBrokers []string + KafkaCA string + KafkaUsername string + KafkaPassword string + KafkaSASLMechanism string + ClientIdToAccountIdImpl string + ClientIdToAccountIdConfigFile string + ClientIdToAccountIdDefaultAccountId string + ClientIdToAccountIdDefaultOrgId string + ClientIdToAccountIdCacheSize int + ClientIdToAccountIdCacheValidRespTTL time.Duration + ClientIdToAccountIdCacheErrorRespTTL time.Duration + ConnectionDatabaseImpl string + ConnectionDatabaseHost string + ConnectionDatabasePort int + ConnectionDatabaseUser string + ConnectionDatabasePassword string + ConnectionDatabaseName string + ConnectionDatabaseSslMode string + ConnectionDatabaseSslRootCert string + ConnectionDatabaseQueryTimeout time.Duration + AuthGatewayUrl string + AuthGatewayHttpClientTimeout time.Duration + ConnectedClientRecorderImpl string + InventoryKafkaBrokers []string + InventoryKafkaTopic string + InventoryKafkaBatchSize int + InventoryKafkaBatchBytes int + InventoryStaleTimestampOffset time.Duration + InventoryStaleTimestampUpdaterChunkSize int + InventoryReporterName string + SourcesRecorderImpl string + SourcesBaseUrl string + SourcesHttpClientTimeout time.Duration + JwtTokenExpiry int + JwtPrivateKeyFile string + JwtPublicKeyFile string + RhcMessageKafkaBrokers []string + RhcMessageKafkaTopic string + RhcMessageKafkaBatchSize int + RhcMessageKafkaBatchBytes int + RhcMessageKafkaConsumerGroup string + PendoApiEndpoint string + PendoRequestTimeout time.Duration + PendoIntegrationKey string + PendoRequestSize int + PrometheusPushGateway string + ApiServerConnectionLookupImpl string + TenantTranslatorImpl string + TenantTranslatorMockMapping map[string]interface{} + TenantTranslatorURL string + TenantTranslatorTimeout time.Duration + PurgeConnectionOnFailedTenantLookupCount int } func (c Config) String() string { @@ -260,6 +262,8 @@ func (c Config) String() string { fmt.Fprintf(&b, "%s: %s\n", TENANT_TRANSLATOR_URL, c.TenantTranslatorURL) fmt.Fprintf(&b, "%s: %s\n", TENANT_TRANSLATOR_TIMEOUT, c.TenantTranslatorTimeout) fmt.Fprintf(&b, "%s: %s\n", PROMETHEUS_PUSH_GATEWAY, c.PrometheusPushGateway) + fmt.Fprintf(&b, "%s: %d\n", PURGE_CONNECTION_ON_FAILED_TENANT_LOOKUP_COUNT, c.PurgeConnectionOnFailedTenantLookupCount) + return b.String() } @@ -337,92 +341,94 @@ func GetConfig() *Config { options.SetDefault(TENANT_TRANSLATOR_URL, "http://gateway.3scale-dev.svc.cluster.local:8892") options.SetDefault(TENANT_TRANSLATOR_TIMEOUT, 5) options.SetDefault(PROMETHEUS_PUSH_GATEWAY, "prometheus-push.insights-push-stage.svc.cluster.local:9091") + options.SetDefault(PURGE_CONNECTION_ON_FAILED_TENANT_LOOKUP_COUNT, 6*24) // Check runs every 10min ...wait 24 hours before purging a bad connection options.SetEnvPrefix(ENV_PREFIX) options.AutomaticEnv() config := &Config{ - UrlPathPrefix: options.GetString(URL_PATH_PREFIX), - UrlAppName: options.GetString(URL_APP_NAME), - UrlBasePath: buildUrlBasePath(options.GetString(URL_PATH_PREFIX), options.GetString(URL_APP_NAME)), - OpenApiSpecFilePath: options.GetString(OPENAPI_SPEC_FILE_PATH), - HttpShutdownTimeout: options.GetDuration(HTTP_SHUTDOWN_TIMEOUT) * time.Second, - ServiceToServiceCredentials: options.GetStringMap(SERVICE_TO_SERVICE_CREDENTIALS), - Profile: options.GetBool(PROFILE), - MqttBrokerAddress: options.GetString(MQTT_BROKER_ADDRESS), - MqttClientId: options.GetString(MQTT_CLIENT_ID), - MqttUseHostnameAsClientId: options.GetBool(MQTT_USE_HOSTNAME_AS_CLIENT_ID), - MqttCleanSession: options.GetBool(MQTT_CLEAN_SESSION), - MqttResumeSubs: options.GetBool(MQTT_RESUME_SUBS), - MqttBrokerTlsCertFile: options.GetString(MQTT_BROKER_TLS_CERT_FILE), - MqttBrokerTlsKeyFile: options.GetString(MQTT_BROKER_TLS_KEY_FILE), - MqttBrokerTlsCACertFile: options.GetString(MQTT_BROKER_TLS_CA_CERT_FILE), - MqttBrokerTlsSkipVerify: options.GetBool(MQTT_BROKER_TLS_SKIP_VERIFY), - MqttBrokerAuthType: options.GetString(MQTT_BROKER_AUTH_TYPE), - MqttBrokerUsername: options.GetString(MQTT_BROKER_USERNAME), - MqttBrokerPassword: options.GetString(MQTT_BROKER_PASSWORD), - MqttBrokerJwtGeneratorImpl: options.GetString(MQTT_BROKER_JWT_GENERATOR_IMPL), - MqttBrokerJwtFile: options.GetString(MQTT_BROKER_JWT_FILE), - MqttTopicPrefix: options.GetString(MQTT_TOPIC_PREFIX), - MqttControlSubscriptionQoS: byte(options.GetInt(MQTT_CONTROL_SUBSCRIPTION_QOS)), - MqttControlPublishQoS: byte(options.GetInt(MQTT_CONTROL_PUBLISH_QOS)), - MqttDataSubscriptionQoS: byte(options.GetInt(MQTT_DATA_SUBSCRIPTION_QOS)), - MqttDataPublishQoS: byte(options.GetInt(MQTT_DATA_PUBLISH_QOS)), - MqttDisconnectQuiesceTime: options.GetUint(MQTT_DISCONNECT_QUIESCE_TIME), - MqttPublishTimeout: options.GetDuration(MQTT_PUBLISH_TIMEOUT) * time.Second, - MqttConsumerShutdownSleepTime: options.GetDuration(MQTT_CONSUMER_SHUTDOWN_SLEEP_TIME) * time.Second, - ShutdownOnMqttConnectionLost: options.GetBool(SHUTDOWN_ON_MQTT_CONNECTION_LOST), - InvalidHandshakeReconnectDelay: options.GetInt(INVALID_HANDSHAKE_RECONNECT_DELAY), - ClientIdToAccountIdImpl: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_IMPL), - ClientIdToAccountIdConfigFile: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_CONFIG_FILE), - ClientIdToAccountIdDefaultAccountId: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ACCOUNT_ID), - ClientIdToAccountIdDefaultOrgId: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ORG_ID), - ClientIdToAccountIdCacheSize: options.GetInt(CLIENT_ID_TO_ACCOUNT_ID_CACHE_SIZE), - ClientIdToAccountIdCacheValidRespTTL: options.GetDuration(CLIENT_ID_TO_ACCOUNT_ID_CACHE_VALID_RESP_TTL), - ClientIdToAccountIdCacheErrorRespTTL: options.GetDuration(CLIENT_ID_TO_ACCOUNT_ID_CACHE_ERROR_RESP_TTL), - ConnectionDatabaseImpl: options.GetString(CONNECTION_DATABASE_IMPL), - ConnectionDatabaseHost: options.GetString(CONNECTION_DATABASE_HOST), - ConnectionDatabasePort: options.GetInt(CONNECTION_DATABASE_PORT), - ConnectionDatabaseUser: options.GetString(CONNECTION_DATABASE_USER), - ConnectionDatabasePassword: options.GetString(CONNECTION_DATABASE_PASSWORD), - ConnectionDatabaseName: options.GetString(CONNECTION_DATABASE_NAME), - ConnectionDatabaseSslMode: options.GetString(CONNECTION_DATABASE_SSL_MODE), - ConnectionDatabaseSslRootCert: options.GetString(CONNECTION_DATABASE_SSL_ROOT_CERT), - ConnectionDatabaseQueryTimeout: options.GetDuration(CONNECTION_DATABASE_QUERY_TIMEOUT) * time.Second, - AuthGatewayUrl: options.GetString(AUTH_GATEWAY_URL), - AuthGatewayHttpClientTimeout: options.GetDuration(AUTH_GATEWAY_HTTP_CLIENT_TIMEOUT) * time.Second, - ConnectedClientRecorderImpl: options.GetString(CONNECTED_CLIENT_RECORDER_IMPL), - KafkaCA: options.GetString(KAFKA_CA), - KafkaUsername: options.GetString(KAFKA_USERNAME), - KafkaPassword: options.GetString(KAFKA_PASSWORD), - KafkaSASLMechanism: options.GetString(KAFKA_SASL_MECHANISM), - InventoryKafkaBrokers: options.GetStringSlice(INVENTORY_KAFKA_BROKERS), - InventoryKafkaTopic: options.GetString(INVENTORY_KAFKA_TOPIC), - InventoryKafkaBatchSize: options.GetInt(INVENTORY_KAFKA_BATCH_SIZE), - InventoryKafkaBatchBytes: options.GetInt(INVENTORY_KAFKA_BATCH_BYTES), - InventoryStaleTimestampOffset: options.GetDuration(INVENTORY_STALE_TIMESTAMP_OFFSET) * time.Hour, - InventoryStaleTimestampUpdaterChunkSize: options.GetInt(INVENTORY_STALE_TIMESTAMP_UPDATER_CHUNK_SIZE), - InventoryReporterName: options.GetString(INVENTORY_REPORTER_NAME), - SourcesRecorderImpl: options.GetString(SOURCES_RECORDER_IMPL), - SourcesBaseUrl: options.GetString(SOURCES_BASE_URL), - SourcesHttpClientTimeout: options.GetDuration(SOURCES_HTTP_CLIENT_TIMEOUT) * time.Second, - JwtTokenExpiry: options.GetInt(JWT_TOKEN_EXPIRY), - JwtPrivateKeyFile: options.GetString(JWT_PRIVATE_KEY_FILE), - JwtPublicKeyFile: options.GetString(JWT_PUBLIC_KEY_FILE), - RhcMessageKafkaBrokers: options.GetStringSlice(RHC_MESSAGE_KAFKA_BROKERS), - RhcMessageKafkaTopic: options.GetString(RHC_MESSAGE_KAFKA_TOPIC), - RhcMessageKafkaBatchSize: options.GetInt(RHC_MESSAGE_KAFKA_BATCH_SIZE), - RhcMessageKafkaBatchBytes: options.GetInt(RHC_MESSAGE_KAFKA_BATCH_BYTES), - RhcMessageKafkaConsumerGroup: options.GetString(RHC_MESSAGE_KAFKA_CONSUMER_GROUP), - PendoApiEndpoint: options.GetString(PENDO_API_ENDPOINT), - PendoRequestTimeout: options.GetDuration(PENDO_REQUEST_TIMEOUT) * time.Second, - PendoIntegrationKey: options.GetString(PENDO_INTEGRATION_KEY), - PendoRequestSize: options.GetInt(PENDO_REQUEST_SIZE), - PrometheusPushGateway: options.GetString(PROMETHEUS_PUSH_GATEWAY), - ApiServerConnectionLookupImpl: options.GetString(API_SERVER_CONNECTION_LOOKUP_IMPL), - TenantTranslatorImpl: options.GetString(TENANT_TRANSLATOR_IMPL), - TenantTranslatorMockMapping: options.GetStringMap(TENANT_TRANSLATOR_MOCK_MAPPING), - TenantTranslatorURL: options.GetString(TENANT_TRANSLATOR_URL), - TenantTranslatorTimeout: options.GetDuration(TENANT_TRANSLATOR_TIMEOUT) * time.Second, + UrlPathPrefix: options.GetString(URL_PATH_PREFIX), + UrlAppName: options.GetString(URL_APP_NAME), + UrlBasePath: buildUrlBasePath(options.GetString(URL_PATH_PREFIX), options.GetString(URL_APP_NAME)), + OpenApiSpecFilePath: options.GetString(OPENAPI_SPEC_FILE_PATH), + HttpShutdownTimeout: options.GetDuration(HTTP_SHUTDOWN_TIMEOUT) * time.Second, + ServiceToServiceCredentials: options.GetStringMap(SERVICE_TO_SERVICE_CREDENTIALS), + Profile: options.GetBool(PROFILE), + MqttBrokerAddress: options.GetString(MQTT_BROKER_ADDRESS), + MqttClientId: options.GetString(MQTT_CLIENT_ID), + MqttUseHostnameAsClientId: options.GetBool(MQTT_USE_HOSTNAME_AS_CLIENT_ID), + MqttCleanSession: options.GetBool(MQTT_CLEAN_SESSION), + MqttResumeSubs: options.GetBool(MQTT_RESUME_SUBS), + MqttBrokerTlsCertFile: options.GetString(MQTT_BROKER_TLS_CERT_FILE), + MqttBrokerTlsKeyFile: options.GetString(MQTT_BROKER_TLS_KEY_FILE), + MqttBrokerTlsCACertFile: options.GetString(MQTT_BROKER_TLS_CA_CERT_FILE), + MqttBrokerTlsSkipVerify: options.GetBool(MQTT_BROKER_TLS_SKIP_VERIFY), + MqttBrokerAuthType: options.GetString(MQTT_BROKER_AUTH_TYPE), + MqttBrokerUsername: options.GetString(MQTT_BROKER_USERNAME), + MqttBrokerPassword: options.GetString(MQTT_BROKER_PASSWORD), + MqttBrokerJwtGeneratorImpl: options.GetString(MQTT_BROKER_JWT_GENERATOR_IMPL), + MqttBrokerJwtFile: options.GetString(MQTT_BROKER_JWT_FILE), + MqttTopicPrefix: options.GetString(MQTT_TOPIC_PREFIX), + MqttControlSubscriptionQoS: byte(options.GetInt(MQTT_CONTROL_SUBSCRIPTION_QOS)), + MqttControlPublishQoS: byte(options.GetInt(MQTT_CONTROL_PUBLISH_QOS)), + MqttDataSubscriptionQoS: byte(options.GetInt(MQTT_DATA_SUBSCRIPTION_QOS)), + MqttDataPublishQoS: byte(options.GetInt(MQTT_DATA_PUBLISH_QOS)), + MqttDisconnectQuiesceTime: options.GetUint(MQTT_DISCONNECT_QUIESCE_TIME), + MqttPublishTimeout: options.GetDuration(MQTT_PUBLISH_TIMEOUT) * time.Second, + MqttConsumerShutdownSleepTime: options.GetDuration(MQTT_CONSUMER_SHUTDOWN_SLEEP_TIME) * time.Second, + ShutdownOnMqttConnectionLost: options.GetBool(SHUTDOWN_ON_MQTT_CONNECTION_LOST), + InvalidHandshakeReconnectDelay: options.GetInt(INVALID_HANDSHAKE_RECONNECT_DELAY), + ClientIdToAccountIdImpl: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_IMPL), + ClientIdToAccountIdConfigFile: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_CONFIG_FILE), + ClientIdToAccountIdDefaultAccountId: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ACCOUNT_ID), + ClientIdToAccountIdDefaultOrgId: options.GetString(CLIENT_ID_TO_ACCOUNT_ID_DEFAULT_ORG_ID), + ClientIdToAccountIdCacheSize: options.GetInt(CLIENT_ID_TO_ACCOUNT_ID_CACHE_SIZE), + ClientIdToAccountIdCacheValidRespTTL: options.GetDuration(CLIENT_ID_TO_ACCOUNT_ID_CACHE_VALID_RESP_TTL), + ClientIdToAccountIdCacheErrorRespTTL: options.GetDuration(CLIENT_ID_TO_ACCOUNT_ID_CACHE_ERROR_RESP_TTL), + ConnectionDatabaseImpl: options.GetString(CONNECTION_DATABASE_IMPL), + ConnectionDatabaseHost: options.GetString(CONNECTION_DATABASE_HOST), + ConnectionDatabasePort: options.GetInt(CONNECTION_DATABASE_PORT), + ConnectionDatabaseUser: options.GetString(CONNECTION_DATABASE_USER), + ConnectionDatabasePassword: options.GetString(CONNECTION_DATABASE_PASSWORD), + ConnectionDatabaseName: options.GetString(CONNECTION_DATABASE_NAME), + ConnectionDatabaseSslMode: options.GetString(CONNECTION_DATABASE_SSL_MODE), + ConnectionDatabaseSslRootCert: options.GetString(CONNECTION_DATABASE_SSL_ROOT_CERT), + ConnectionDatabaseQueryTimeout: options.GetDuration(CONNECTION_DATABASE_QUERY_TIMEOUT) * time.Second, + AuthGatewayUrl: options.GetString(AUTH_GATEWAY_URL), + AuthGatewayHttpClientTimeout: options.GetDuration(AUTH_GATEWAY_HTTP_CLIENT_TIMEOUT) * time.Second, + ConnectedClientRecorderImpl: options.GetString(CONNECTED_CLIENT_RECORDER_IMPL), + KafkaCA: options.GetString(KAFKA_CA), + KafkaUsername: options.GetString(KAFKA_USERNAME), + KafkaPassword: options.GetString(KAFKA_PASSWORD), + KafkaSASLMechanism: options.GetString(KAFKA_SASL_MECHANISM), + InventoryKafkaBrokers: options.GetStringSlice(INVENTORY_KAFKA_BROKERS), + InventoryKafkaTopic: options.GetString(INVENTORY_KAFKA_TOPIC), + InventoryKafkaBatchSize: options.GetInt(INVENTORY_KAFKA_BATCH_SIZE), + InventoryKafkaBatchBytes: options.GetInt(INVENTORY_KAFKA_BATCH_BYTES), + InventoryStaleTimestampOffset: options.GetDuration(INVENTORY_STALE_TIMESTAMP_OFFSET) * time.Hour, + InventoryStaleTimestampUpdaterChunkSize: options.GetInt(INVENTORY_STALE_TIMESTAMP_UPDATER_CHUNK_SIZE), + InventoryReporterName: options.GetString(INVENTORY_REPORTER_NAME), + SourcesRecorderImpl: options.GetString(SOURCES_RECORDER_IMPL), + SourcesBaseUrl: options.GetString(SOURCES_BASE_URL), + SourcesHttpClientTimeout: options.GetDuration(SOURCES_HTTP_CLIENT_TIMEOUT) * time.Second, + JwtTokenExpiry: options.GetInt(JWT_TOKEN_EXPIRY), + JwtPrivateKeyFile: options.GetString(JWT_PRIVATE_KEY_FILE), + JwtPublicKeyFile: options.GetString(JWT_PUBLIC_KEY_FILE), + RhcMessageKafkaBrokers: options.GetStringSlice(RHC_MESSAGE_KAFKA_BROKERS), + RhcMessageKafkaTopic: options.GetString(RHC_MESSAGE_KAFKA_TOPIC), + RhcMessageKafkaBatchSize: options.GetInt(RHC_MESSAGE_KAFKA_BATCH_SIZE), + RhcMessageKafkaBatchBytes: options.GetInt(RHC_MESSAGE_KAFKA_BATCH_BYTES), + RhcMessageKafkaConsumerGroup: options.GetString(RHC_MESSAGE_KAFKA_CONSUMER_GROUP), + PendoApiEndpoint: options.GetString(PENDO_API_ENDPOINT), + PendoRequestTimeout: options.GetDuration(PENDO_REQUEST_TIMEOUT) * time.Second, + PendoIntegrationKey: options.GetString(PENDO_INTEGRATION_KEY), + PendoRequestSize: options.GetInt(PENDO_REQUEST_SIZE), + PrometheusPushGateway: options.GetString(PROMETHEUS_PUSH_GATEWAY), + ApiServerConnectionLookupImpl: options.GetString(API_SERVER_CONNECTION_LOOKUP_IMPL), + TenantTranslatorImpl: options.GetString(TENANT_TRANSLATOR_IMPL), + TenantTranslatorMockMapping: options.GetStringMap(TENANT_TRANSLATOR_MOCK_MAPPING), + TenantTranslatorURL: options.GetString(TENANT_TRANSLATOR_URL), + TenantTranslatorTimeout: options.GetDuration(TENANT_TRANSLATOR_TIMEOUT) * time.Second, + PurgeConnectionOnFailedTenantLookupCount: options.GetInt(PURGE_CONNECTION_ON_FAILED_TENANT_LOOKUP_COUNT), } if clowder.IsClowderEnabled() { diff --git a/internal/connection_repository/sql_connection_registrar.go b/internal/connection_repository/sql_connection_registrar.go index 1c0935f6..04fe8a5c 100644 --- a/internal/connection_repository/sql_connection_registrar.go +++ b/internal/connection_repository/sql_connection_registrar.go @@ -42,8 +42,13 @@ func (scm *SqlConnectionRegistrar) Register(ctx context.Context, rhcClient domai logger := logger.Log.WithFields(logrus.Fields{"account": account, "org_id": org_id, "client_id": client_id}) - update := "UPDATE connections SET dispatchers=$1, tags = $2, updated_at = NOW(), message_id = $3, message_sent = $4, tenant_lookup_failure_count = 0 WHERE account=$5 AND client_id=$6" - insert := "INSERT INTO connections (account, org_id, client_id, dispatchers, canonical_facts, tags, message_id, message_sent, tenant_lookup_failure_count) SELECT $7, $8, $9, $10, $11, $12, $13, $14, 0" + update := "UPDATE connections SET dispatchers=$1, tags = $2, updated_at = NOW(), message_id = $3, message_sent = $4 WHERE account=$5 AND client_id=$6" + insert := "INSERT INTO connections (account, org_id, client_id, dispatchers, canonical_facts, tags, message_id, message_sent) SELECT $7, $8, $9, $10, $11, $12, $13, $14" + + /* + update := "UPDATE connections SET dispatchers=$1, tags = $2, updated_at = NOW(), message_id = $3, message_sent = $4, tenant_lookup_failure_count = 0 WHERE account=$5 AND client_id=$6" + insert := "INSERT INTO connections (account, org_id, client_id, dispatchers, canonical_facts, tags, message_id, message_sent, tenant_lookup_failure_count) SELECT $7, $8, $9, $10, $11, $12, $13, $14, 0" + */ insertOrUpdate := fmt.Sprintf("WITH upsert AS (%s RETURNING *) %s WHERE NOT EXISTS (SELECT * FROM upsert)", update, insert) diff --git a/internal/connection_repository/stale_connection_locator.go b/internal/connection_repository/stale_connection_locator.go index 46589fe6..3bcd21bd 100644 --- a/internal/connection_repository/stale_connection_locator.go +++ b/internal/connection_repository/stale_connection_locator.go @@ -19,7 +19,7 @@ func ProcessStaleConnections(ctx context.Context, databaseConn *sql.DB, sqlTimeo defer cancel() statement, err := databaseConn.Prepare( - `SELECT account, org_id, client_id, canonical_facts, tags, dispatchers FROM connections + `SELECT account, org_id, client_id, canonical_facts, tags, dispatchers, failed_tenant_lookup_count FROM connections WHERE canonical_facts != '{}' AND ( dispatchers ? 'rhc-worker-playbook' OR dispatchers ? 'package-manager' ) AND stale_timestamp < $1 @@ -77,7 +77,7 @@ func ProcessStaleConnections(ctx context.Context, databaseConn *sql.DB, sqlTimeo return nil } -func UpdateStaleTimestampInDB(ctx context.Context, databaseConn *sql.DB, sqlTimeout time.Duration, rhcClient domain.ConnectorClientState) { +func RecordUpdatedStaleTimestamp(ctx context.Context, databaseConn *sql.DB, sqlTimeout time.Duration, rhcClient domain.ConnectorClientState) { log := logger.Log.WithFields(logrus.Fields{"account": rhcClient.Account, "org_id": rhcClient.OrgID, "client_id": rhcClient.ClientID}) @@ -86,9 +86,7 @@ func UpdateStaleTimestampInDB(ctx context.Context, databaseConn *sql.DB, sqlTime ctx, cancel := context.WithTimeout(ctx, sqlTimeout) defer cancel() - // FIXME: set the tenant failure count to zero - - update := "UPDATE connections SET stale_timestamp = NOW() WHERE org_id=$1 AND client_id=$2" + update := "UPDATE connections SET stale_timestamp = NOW(), tenant_lookup_failure_count = 0 WHERE org_id=$1 AND client_id=$2" statement, err := databaseConn.Prepare(update) if err != nil { @@ -118,9 +116,7 @@ func RecordFailedTenantLookup(ctx context.Context, databaseConn *sql.DB, sqlTime ctx, cancel := context.WithTimeout(ctx, sqlTimeout) defer cancel() - // FIXME: set the tenant failure count to zero - - update := "UPDATE connections SET failed_tentant_lookups = $1 WHERE org_id=$2 AND client_id=$3" + update := "UPDATE connections SET failed_tenant_lookup_count = failed_tenant_lookup_count + 1 WHERE org_id=$1 AND client_id=$2" statement, err := databaseConn.Prepare(update) if err != nil { @@ -128,7 +124,7 @@ func RecordFailedTenantLookup(ctx context.Context, databaseConn *sql.DB, sqlTime } defer statement.Close() - results, err := statement.ExecContext(ctx, rhcClient.TenantLookupFailureCount, rhcClient.OrgID, rhcClient.ClientID) + results, err := statement.ExecContext(ctx, rhcClient.OrgID, rhcClient.ClientID) if err != nil { return err } diff --git a/internal/controller/account_resolver.go b/internal/controller/account_resolver.go index 8a45d947..d69fd66c 100644 --- a/internal/controller/account_resolver.go +++ b/internal/controller/account_resolver.go @@ -74,7 +74,7 @@ func NewAccountIdResolver(accountIdResolverImpl string, cfg *config.Config) (Acc wrappedResolver := &BOPAccountIdResolver{cfg} return NewExpirableCachedAccountIdResolver(wrappedResolver, cfg.ClientIdToAccountIdCacheSize, cfg.ClientIdToAccountIdCacheValidRespTTL, cfg.ClientIdToAccountIdCacheErrorRespTTL) case "errant": - return &ErrantAccountIdResolver{}, nil + return &ErrantAccountIdResolver{}, nil default: return nil, errors.New("Invalid AccountIdResolver impl requested") } @@ -303,6 +303,5 @@ type ErrantAccountIdResolver struct { } func (this *ErrantAccountIdResolver) MapClientIdToAccountId(ctx context.Context, clientID domain.ClientID) (domain.Identity, domain.AccountID, domain.OrgID, error) { - return "", "", "", fmt.Errorf("Ugh...not found! Go away!") + return "", "", "", fmt.Errorf("Ugh...not found! Go away!") } -