diff --git a/components/services/app.rb b/components/services/app.rb index aee83a3e2..fa98ec975 100644 --- a/components/services/app.rb +++ b/components/services/app.rb @@ -30,8 +30,6 @@ def process HandleSQSMessageEvent.call(event:) when :service_action event.service_action.call(**event.parameters) - when :cloudwatch_log_event - HandleLogEvents.call(event:) end end diff --git a/components/services/app/parsers/cloudwatch_log_event_parser.rb b/components/services/app/parsers/cloudwatch_log_event_parser.rb deleted file mode 100644 index 32bf8d285..000000000 --- a/components/services/app/parsers/cloudwatch_log_event_parser.rb +++ /dev/null @@ -1,55 +0,0 @@ - -require "zlib" -require "base64" -require "stringio" - -class CloudWatchLogEventParser - LogEvent = Struct.new( - :timestamp, - :message, - ) - - Event = Struct.new( - :event_type, - :log_group, - :log_events, - keyword_init: true - ) - - attr_reader :event - - def initialize(event) - @event = event - end - - def parse_event - Event.new( - event_type: :cloudwatch_log_event, - log_group:, - log_events: - ) - end - - private - - def raw_data - event.dig("awslogs", "data") - end - - def data_message - @data_message ||= JSON.parse(Zlib::GzipReader.new(StringIO.new(Base64.decode64(raw_data))).read) - end - - def log_group - data_message.fetch("logGroup") - end - - def log_events - data_message.fetch("logEvents").map do |log_event_data| - LogEvent.new( - timestamp: Time.at(log_event_data.fetch("timestamp") / 1000), - message: log_event_data.fetch("message") - ) - end - end -end diff --git a/components/services/app/parsers/event_parser.rb b/components/services/app/parsers/event_parser.rb index 3f2e3ade3..093d22d4a 100644 --- a/components/services/app/parsers/event_parser.rb +++ b/components/services/app/parsers/event_parser.rb @@ -12,8 +12,6 @@ def parse_event SQSMessageEventParser elsif service_action? ServiceActionParser - elsif cloudwatch_log_event? - CloudWatchLogEventParser end parser.new(event).parse_event @@ -32,8 +30,4 @@ def ecs_event? def service_action? event.key?("serviceAction") end - - def cloudwatch_log_event? - event.keys.size == 1 && event.key?("awslogs") - end end diff --git a/components/services/app/parsers/opensips_log_event_parser.rb b/components/services/app/parsers/opensips_log_event_parser.rb deleted file mode 100644 index 33b27622d..000000000 --- a/components/services/app/parsers/opensips_log_event_parser.rb +++ /dev/null @@ -1,17 +0,0 @@ -class OpenSIPSLogEventParser - attr_reader :event - - Event = Struct.new(:level, :message, keyword_init: true) - - def initialize(event) - @event = event - end - - def parse_event - event.log_events.map do |log_event| - log_data = JSON.parse(log_event.message) - - Event.new(level: log_data.fetch("level"), message: log_data.fetch("message")) - end - end -end diff --git a/components/services/app/workflows/find_ecs_task.rb b/components/services/app/workflows/find_ecs_task.rb deleted file mode 100644 index 547202706..000000000 --- a/components/services/app/workflows/find_ecs_task.rb +++ /dev/null @@ -1,63 +0,0 @@ -class FindECSTask < ApplicationWorkflow - attr_reader :task_ip, :service_name, :region, :ecs_client - - Task = Struct.new(:arn, :private_ip, :region, :cluster, keyword_init: true) - - def initialize(**options) - @task_ip = options.fetch(:task_ip) - @service_name = options.fetch(:service_name) - @service_name = @service_name.delete_prefix("service:") - @region = options[:region] - @ecs_client = options.fetch(:ecs_client) { default_ecs_client } - end - - def call - find_task_by_private_ip - end - - private - - def find_task_by_private_ip - clusters.each do |cluster_arn| - task_arns = list_tasks(cluster_arn:) - - next if task_arns.empty? - - describe_tasks(cluster_arn:, task_arns:).each do |task| - next unless private_ip(task) == task_ip - - return Task.new( - region: ecs_client.config.region, - private_ip: task_ip, - arn: task.task_arn, - cluster: task.cluster_arn - ) - end - end - - nil - end - - def clusters - @clusters ||= ecs_client.list_clusters.cluster_arns - end - - def list_tasks(cluster_arn:) - ecs_client.list_tasks(cluster: cluster_arn, service_name:).task_arns - end - - def describe_tasks(task_arns:, cluster_arn:) - ecs_client.describe_tasks(tasks: task_arns, cluster: cluster_arn).tasks - end - - def private_ip(task) - eni = task.attachments.find { |attachment| attachment.type == "ElasticNetworkInterface" } - return if eni.nil? - - IPAddr.new(eni.details.find { |detail| detail.name == "privateIPv4Address" }.value) - end - - def default_ecs_client - region ? Aws::ECS::Client.new(region:) : Aws::ECS::Client.new - end -end diff --git a/components/services/app/workflows/handle_log_events.rb b/components/services/app/workflows/handle_log_events.rb deleted file mode 100644 index a8f2724fb..000000000 --- a/components/services/app/workflows/handle_log_events.rb +++ /dev/null @@ -1,19 +0,0 @@ -class HandleLogEvents < ApplicationWorkflow - attr_reader :event - - def initialize(event:) - @event = event - end - - def call - if opensips_log_groups.include?(event.log_group) - HandleOpenSIPSLogEvent.call(event: OpenSIPSLogEventParser.new(event).parse_event) - end - end - - private - - def opensips_log_groups - [ ENV.fetch("PUBLIC_GATEWAY_LOG_GROUP"), ENV.fetch("CLIENT_GATEWAY_LOG_GROUP") ] - end -end diff --git a/components/services/app/workflows/handle_opensips_log_event.rb b/components/services/app/workflows/handle_opensips_log_event.rb deleted file mode 100644 index a1ceb73d7..000000000 --- a/components/services/app/workflows/handle_opensips_log_event.rb +++ /dev/null @@ -1,65 +0,0 @@ -class HandleOpenSIPSLogEvent < ApplicationWorkflow - LOAD_BALANCER_RESPONSE_ERROR_PATTERN = %r{\A(\d{3})-lb-response-error-(((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4})\z} - TIMEOUT_ERROR_CODE = "408".freeze - - attr_reader :events, :error_tracking_client, :regions, :switch_group, :ecs_client, :ecs_task_finder - - LoadBalancerError = Struct.new(:target_ip, :code, keyword_init: true) - - def initialize(**options) - @events = options.fetch(:event) - @error_tracking_client = options.fetch(:error_tracking_client) { Sentry } - @regions = options.fetch(:regions) { SomlengRegion::Region } - @switch_group = options.fetch(:switch_group) { ENV.fetch("SWITCH_GROUP") } - @ecs_client = options.fetch(:ecs_client) { Aws::ECS::Client.new } - @ecs_task_finder = options.fetch(:ecs_task_finder) { FindECSTask } - end - - def call - load_balancer_timeout_errors = load_balancer_response_errors.select { |error| error.code == TIMEOUT_ERROR_CODE } - affected_tasks = fetch_tasks_by_private_ip(load_balancer_timeout_errors.map(&:target_ip)) - stop_tasks(affected_tasks, reason: "Load balancer timeout detected") - - notify_errors - end - - private - - def notify_errors - error_tracking_client.capture_message(events.map(&:message).uniq.join("\n")) - end - - def load_balancer_response_errors - @load_balancer_response_errors ||= begin - errors = events.select do |log| - log.message.match?(LOAD_BALANCER_RESPONSE_ERROR_PATTERN) - end - - errors.map do |log| - code, ip, = LOAD_BALANCER_RESPONSE_ERROR_PATTERN.match(log.message).captures - LoadBalancerError.new(target_ip: IPAddr.new(ip), code:) - end - end - end - - def fetch_tasks_by_private_ip(target_ips) - Array(target_ips).uniq.map do |target_ip| - region = regions.all.find { |r| IPAddr.new(r.vpc_cidr).include?(target_ip) } - - ecs_task_finder.call(task_ip: target_ip, service_name: switch_group, region: region.identifier) - end - end - - def stop_tasks(tasks, reason:) - Array(tasks).each do |task| - with_aws_client(ecs_client, region: task.region) do |client| - client.stop_task(cluster: task.cluster, task: task.arn, reason:) - end - end - end - - def with_aws_client(client, region:) - client.config.region = region - yield(client) - end -end diff --git a/components/services/config/initializers/sentry.rb b/components/services/config/initializers/sentry.rb index b7a2d118c..7fef2a8e0 100644 --- a/components/services/config/initializers/sentry.rb +++ b/components/services/config/initializers/sentry.rb @@ -3,6 +3,5 @@ Sentry.init do |config| config.dsn = AppSettings[:sentry_dsn] config.environment = AppSettings.env - config.debug = true config.background_worker_threads = 0 end diff --git a/components/services/lib/somleng_region/region.rb b/components/services/lib/somleng_region/region.rb index cb5afa8e6..ac4d0c473 100644 --- a/components/services/lib/somleng_region/region.rb +++ b/components/services/lib/somleng_region/region.rb @@ -10,16 +10,14 @@ class RegionNotFound < StandardError; end alias: "hydrogen", group_id: 1, human_name: "South East Asia (Singapore)", - nat_ip: "13.250.230.15", - vpc_cidr: "10.10.0.0/22" + nat_ip: "13.250.230.15" ), new( identifier: "us-east-1", alias: "helium", group_id: 2, human_name: "North America (N. Virginia, USA)", - nat_ip: "52.4.242.134", - vpc_cidr: "10.20.0.0/20" + nat_ip: "52.4.242.134" ) ] diff --git a/components/services/spec/fixtures/files/cloudwatch_log_event.json b/components/services/spec/fixtures/files/cloudwatch_log_event.json deleted file mode 100644 index dd3b7e710..000000000 --- a/components/services/spec/fixtures/files/cloudwatch_log_event.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "awslogs": { - "data": "H4sIAAAAAAAA/3WQy2rDMBBFf8XM2g4aWYoeO0PdbNpNnV1dipMMRuCHkJSEEPLvxXG77HLuPRwuc4eRYux62t88gYWXal99v9dNU+1qyGG+ThTAQskFV2ZbbiVTkMMw97swnz1Y8OfD4I5F3yW6drcipq53U78yTQrUjWAhUUxrGs+HeAzOJzdPr25IFCLYz/8sX09NfaEpLdgd3GkZoyWWaBBRCKkkMmUUU9wIUyqGUkpWasMEcimYlqhRK8MMcsghuZFi6kYPFhXfGoMcNVM6/3sDWLi3T6wFm7XQkM84z5iyXFi+bSHPWvDutLSaL8dAFxpWuHqrP/Yr8qtbc8F0MRyKQNHPU6SCQphDgYpvUG/YRrbwgMfX4wcsVdvmjAEAAA==" - } -} diff --git a/components/services/spec/requests/cloudwatch_log_events_spec.rb b/components/services/spec/requests/cloudwatch_log_events_spec.rb deleted file mode 100644 index 2e9c19e0c..000000000 --- a/components/services/spec/requests/cloudwatch_log_events_spec.rb +++ /dev/null @@ -1,43 +0,0 @@ -require_relative "../spec_helper" - -RSpec.describe "Handle CloudWatch Log Events" do - it "handles public gateway alerts" do - stub_env("PUBLIC_GATEWAY_LOG_GROUP" => "public-gateway") - payload = build_cloudwatch_log_event_payload( - log_group: "public-gateway", - log_events: [ - build_cloudwatch_log_event( - message: build_opensips_message(message: "408-lb-response-error-10.10.1.180") - ) - ] - ) - - invoke_lambda(payload:) - - stop_task_requests = aws_requests(:stop_task) - expect(stop_task_requests.size).to eq(1) - stop_task_request = stop_task_requests[0] - expect(stop_task_request.context.params.keys).to contain_exactly(:cluster, :task, :reason) - expect(stop_task_request.context.params.fetch(:task)).to include("ap-southeast-1") - end - - def build_opensips_message(data = {}) - data = { - time: "Sep 22 07:24:26", - pid: 82, - level: "CRITICAL", - message: "error" - }.merge(data) - - { - "time" => data.fetch(:time), - "pid" => data.fetch(:pid), - "level" => data.fetch(:level), - "message" => data.fetch(:message) - }.to_json - end - - def aws_requests(operation_name) - AWSRequests.select { |request| request[:operation_name] == operation_name } - end -end diff --git a/components/services/spec/spec_helper.rb b/components/services/spec/spec_helper.rb index f70cfccbf..43de3ef3d 100644 --- a/components/services/spec/spec_helper.rb +++ b/components/services/spec/spec_helper.rb @@ -24,8 +24,6 @@ SimpleCov.formatter = SimpleCov::Formatter::CoberturaFormatter end -ENV["PUBLIC_GATEWAY_LOG_GROUP"] = "public-gateway" -ENV["CLIENT_GATEWAY_LOG_GROUP"] = "client-gateway" ENV["SWITCH_GROUP"] = "service:somleng-switch" ENV["MEDIA_PROXY_GROUP"] = "service:media-proxy" ENV["CLIENT_GATEWAY_GROUP"] = "service:client-gateway" diff --git a/components/services/spec/support/aws_stubs.rb b/components/services/spec/support/aws_stubs.rb index 0c36a10e7..fcfe0689b 100644 --- a/components/services/spec/support/aws_stubs.rb +++ b/components/services/spec/support/aws_stubs.rb @@ -1,6 +1,3 @@ -AWSRequests = Array.new -AWSRequest = Struct.new(:context, :operation_name, keyword_init: true) - Aws.config[:ecs] = { stub_responses: { describe_container_instances: lambda { |_context| @@ -9,40 +6,6 @@ ec2_instance_id: "ec2-instance-id" ] } - }, - list_clusters: ->(context) { - { - cluster_arns: [ "arn:aws:ecs:#{context.client.config.region}:123456789012:cluster/cluster-1" ] - } - }, - list_tasks: ->(context) { - { - task_arns: [ "arn:aws:ecs:#{context.client.config.region}:123456789012:task/cluster-1/#{SecureRandom.uuid.gsub('-', '')}" ] - } - }, - describe_tasks: ->(context) { - { - tasks: [ - { - attachments: [ - { - type: "ElasticNetworkInterface", - details: [ - { - name: "privateIPv4Address", - value: "10.10.1.180" - } - ] - } - ], - task_arn: "arn:aws:ecs:#{context.client.config.region}:123456789012:task/cluster-1/#{SecureRandom.uuid.gsub('-', '')}", - cluster_arn: "arn:aws:ecs:#{context.client.config.region}:123456789012:cluster/cluster-1" - } - ] - } - }, - stop_task: ->(context) { - AWSRequests << AWSRequest.new(context:, operation_name: :stop_task) } } } diff --git a/components/services/spec/support/event_helpers.rb b/components/services/spec/support/event_helpers.rb index cdfa2bcb4..07ffb1088 100644 --- a/components/services/spec/support/event_helpers.rb +++ b/components/services/spec/support/event_helpers.rb @@ -1,48 +1,6 @@ require "json" module EventHelpers - def build_cloudwatch_log_event(data = {}) - data = { - id: SecureRandom.random_number(10**56).to_s.rjust(56, "0"), - timestamp: Time.now.to_i * 1000, - message: "log-message" - }.merge(data) - - { - "id" => data.fetch(:id), - "timestamp" => data.fetch(:timestamp), - "message" => data.fetch(:message) - } - end - - def build_cloudwatch_log_event_payload(data = {}) - data = { - log_group: "testing", - log_events: [ - build_cloudwatch_log_event - ] - }.merge(data) - - payload = JSON.parse(file_fixture("cloudwatch_log_event.json").read) - data_message = JSON.parse(Zlib::GzipReader.new(StringIO.new(Base64.decode64(payload.dig("awslogs", "data")))).read) - - overrides = { - "logGroup" => data.fetch(:log_group), - "logEvents" => data.fetch(:log_events) - } - - compressed_data_message = StringIO.new - gz = Zlib::GzipWriter.new(compressed_data_message) - gz.write(data_message.merge(overrides).to_json) - gz.close - - { - "awslogs" => { - "data" => Base64.encode64(compressed_data_message.string) - } - } - end - def build_sqs_message_event_payload(data = {}) data = { event_source_arn: "arn:aws:sqs:us-east-2:123456789012:somleng-switch-permissions", diff --git a/components/services/spec/workflows/handle_opensips_log_event_spec.rb b/components/services/spec/workflows/handle_opensips_log_event_spec.rb deleted file mode 100644 index c2ed7a9ba..000000000 --- a/components/services/spec/workflows/handle_opensips_log_event_spec.rb +++ /dev/null @@ -1,82 +0,0 @@ -require_relative "../spec_helper" - -RSpec.describe HandleOpenSIPSLogEvent do - it "handles OpenSIPS log events" do - events = [ - build_event(message: "408-lb-response-error-10.10.1.180"), - build_event(message: "408-lb-response-error-10.10.1.180"), - build_event(message: "408-lb-response-error-10.20.1.150"), - build_event(message: "500-lb-response-error-10.10.1.181"), - build_event(message: "Some other error") - ] - - fake_task_finder = stub_task_finder( - { arn: "task-1-arn", region: "ap-southeast-1", cluster: "cluster-1-arn" }, - { arn: "task-2-arn", region: "us-east-1", cluster: "cluster-2-arn" } - ) - - ecs_client, requested_regions = stub_ecs_client - error_tracking_client = class_spy(Sentry) - - HandleOpenSIPSLogEvent.call(event: events, error_tracking_client:, ecs_client:, ecs_task_finder: fake_task_finder) - - stop_task_requests = aws_requests(ecs_client, :stop_task) - expect(stop_task_requests.size).to eq(2) - expect(stop_task_requests[0].fetch(:params)).to eq( - cluster: "cluster-1-arn", - task: "task-1-arn", - reason: "Load balancer timeout detected" - ) - expect(stop_task_requests[1].fetch(:params)).to include( - cluster: "cluster-2-arn", - task: "task-2-arn" - ) - expect(requested_regions).to contain_exactly("ap-southeast-1", "us-east-1") - expect(error_tracking_client).to have_received(:capture_message).with( - [ - "408-lb-response-error-10.10.1.180", - "408-lb-response-error-10.20.1.150", - "500-lb-response-error-10.10.1.181", - "Some other error" - ].join("\n") - ) - end - - def build_event(message:) - OpenSIPSLogEventParser::Event.new(message:) - end - - def stub_task_finder(*task_results) - fake_task_finder = class_double(FindECSTask) - task_results = task_results.map { |data| build_task_result(**data) } - allow(fake_task_finder).to receive(:call).and_return(*task_results) - fake_task_finder - end - - def stub_ecs_client - requested_regions = [] - client = Aws::ECS::Client.new( - stub_responses: { - stop_task: ->(context) { - requested_regions << context.client.config.region - } - } - ) - - [ client, requested_regions ] - end - - def build_task_result(region: "us-east-1", **options) - FindECSTask::Task.new( - region:, - arn: "arn:aws:ecs:#{region}:123456789012:task/cluster-1/#{SecureRandom.uuid.gsub('-', '')}", - cluster: "arn:aws:ecs:#{region}:123456789012:cluster/cluster-1", - private_ip: "10.0.0.1", - **options - ) - end - - def aws_requests(client, operation_name) - client.api_requests.select { |request| request[:operation_name] == operation_name } - end -end