Skip to content

Commit

Permalink
simplify download/update cohorts for deployment_runner, fix desc to c…
Browse files Browse the repository at this point in the history
…ohort config, add consts for cohort polling and timeout
  • Loading branch information
tyiuhc committed Aug 5, 2024
1 parent 1146fa3 commit 53eb235
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 73 deletions.
3 changes: 2 additions & 1 deletion lib/experiment/cohort/cohort_download_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
module AmplitudeExperiment
# CohortDownloadApi
class CohortDownloadApi
COHORT_REQUEST_TIMEOUT_MILLIS = 5000
def get_cohort(cohort_id, cohort = nil)
raise NotImplementedError
end
Expand Down Expand Up @@ -75,7 +76,7 @@ def get_cohort_members_request(cohort_id, last_modified)
url += "&lastModified=#{last_modified}" if last_modified

request = Net::HTTP::Get.new(URI(url), headers)
http = PersistentHttpClient.get(@server_url, { read_timeout: @cohort_request_delay_millis }, basic_auth)
http = PersistentHttpClient.get(@server_url, { read_timeout: COHORT_REQUEST_TIMEOUT_MILLIS }, basic_auth)
http.request(request)
end

Expand Down
27 changes: 0 additions & 27 deletions lib/experiment/cohort/cohort_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ def initialize(cohort_download_api, cohort_storage)
@cohort_storage = cohort_storage
@jobs = {}
@lock_jobs = Mutex.new
@executor = Concurrent::ThreadPoolExecutor.new(
max_threads: 32,
name: 'CohortLoaderExecutor'
)
end

def load_cohort(cohort_id)
Expand All @@ -26,29 +22,6 @@ def load_cohort(cohort_id)
end
end

def update_stored_cohorts
errors = []

Concurrent::Promises.future_on(@executor) do
futures = @cohort_storage.cohort_ids.map do |cohort_id|
Concurrent::Promises.future_on(@executor) do
load_cohort_internal(cohort_id)
rescue StandardError => e
[cohort_id, e] # Return the cohort_id and the error
end
end

results = Concurrent::Promises.zip(*futures).value!

# Collect errors from the results
results.each do |result|
errors << result if result.is_a?(Array) && result[1].is_a?(StandardError)
end

raise CohortUpdateError, errors unless errors.empty?
end
end

private

def load_cohort_internal(cohort_id)
Expand Down
3 changes: 2 additions & 1 deletion lib/experiment/cohort/cohort_sync_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ class CohortSyncConfig
# api_key (str): The project API Key
# secret_key (str): The project Secret Key
# max_cohort_size (int): The maximum cohort size that can be downloaded
# cohort_request_delay_millis (int): The delay in milliseconds between cohort download requests
# cohort_request_delay_millis (int): The delay in milliseconds between each cohort download request,
# applied upon failure
# cohort_server_url (str): The server endpoint from which to request cohorts

attr_accessor :api_key, :secret_key, :max_cohort_size, :cohort_request_delay_millis, :cohort_server_url
Expand Down
49 changes: 27 additions & 22 deletions lib/experiment/deployment/deployment_runner.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'set'

module AmplitudeExperiment
COHORT_POLLING_INTERVAL_MILLIS = 60_000
# DeploymentRunner
class DeploymentRunner
def initialize(
Expand All @@ -18,6 +19,10 @@ def initialize(
@cohort_loader = cohort_loader
@lock = Mutex.new
@logger = logger
@executor = Concurrent::ThreadPoolExecutor.new(
max_threads: 10,
name: 'DeploymentRunnerExecutor'
)
end

def start
Expand All @@ -30,7 +35,7 @@ def start
@flag_poller.start
if @cohort_loader
@cohort_poller = Poller.new(
@config.flag_config_polling_interval_millis / 1000.0,
COHORT_POLLING_INTERVAL_MILLIS / 1000.0,
method(:update_cohorts)
)
@cohort_poller.start
Expand Down Expand Up @@ -60,36 +65,22 @@ def update_flag_configs
@flag_config_storage.remove_if { |f| !flag_keys.include?(f['key']) }

unless @cohort_loader
flag_configs.each do |flag_config|
flag_config = flag_config[1]
@logger.debug("Putting non-cohort flag #{flag_config['key']}")
flag_configs.each do |flag_key, flag_config|
@logger.debug("Putting non-cohort flag #{flag_key}")
@flag_config_storage.put_flag_config(flag_config)
end
return
end

new_cohort_ids = Set.new
flag_configs.each do |flag_config|
flag_config = flag_config[1]
flag_configs.each do |_, flag_config|
new_cohort_ids.merge(AmplitudeExperiment.get_all_cohort_ids_from_flag(flag_config))
end

existing_cohort_ids = @cohort_storage.cohort_ids
cohort_ids_to_download = new_cohort_ids - existing_cohort_ids

futures = cohort_ids_to_download.map do |cohort_id|
future = @cohort_loader.load_cohort(cohort_id)
future.on_rejection do |reason|
@logger.warn("Download cohort #{cohort_id} failed: #{reason}")
end
future
end

begin
Concurrent::Promises.zip(*futures).value!
rescue StandardError => e
@logger.error("Failed to download cohorts: #{e}")
end
download_cohorts(cohort_ids_to_download)

updated_cohort_ids = @cohort_storage.cohort_ids

Expand All @@ -106,10 +97,24 @@ def update_flag_configs
@logger.debug("Refreshed #{flag_configs.size} flag configs.")
end

def download_cohorts(cohort_ids)
futures = cohort_ids.map do |cohort_id|
Concurrent::Promises.future_on(@executor) do
future = @cohort_loader.load_cohort(cohort_id)
future.value!
rescue StandardError => e
@logger.error("Failed to download cohort #{cohort_id}: #{e.message}")
nil

end
end

Concurrent::Promises.zip(*futures).value!
end

def update_cohorts
@cohort_loader.update_stored_cohorts.value!
rescue StandardError => e
@logger.error("Error while updating cohorts: #{e}")
@logger.debug('Updating cohorts in storage')
download_cohorts(@cohort_storage.cohort_ids)
end

def delete_unused_cohorts
Expand Down
22 changes: 0 additions & 22 deletions lib/experiment/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,6 @@ def initialize(status_code, cohort_id, message)
end
end

# CohortUpdateError
class CohortUpdateError < StandardError
attr_reader :errors

def initialize(errors)
@errors = errors
super(to_s)
end

def to_s
error_messages = @errors.map do |item|
if item.is_a?(Array) && item.size == 2
cohort_id, error = item
"Cohort #{cohort_id}: #{error}"
else
item.to_s
end
end
"#{@errors.length} cohort(s) failed to update:\n#{error_messages.join("\n")}"
end
end

class CycleError < StandardError
# Raised when topological sorting encounters a cycle between flag dependencies.
attr_reader :path
Expand Down

0 comments on commit 53eb235

Please sign in to comment.