diff --git a/lib/floe/cli.rb b/lib/floe/cli.rb index d8709017..a21a672c 100644 --- a/lib/floe/cli.rb +++ b/lib/floe/cli.rb @@ -1,9 +1,12 @@ +require "floe" +require "floe/container_runner" + module Floe class CLI + include Logging + def initialize require "optimist" - require "floe" - require "floe/container_runner" require "logger" Floe.logger = Logger.new($stdout) @@ -20,12 +23,22 @@ def run(args = ARGV) create_workflow(workflow, opts[:context], input, credentials) end - Floe::Workflow.wait(workflows, &:run_nonblock) + output_streams = create_loggers(workflows, opts[:segment_output]) + + logger.info("Checking #{workflows.count} workflows...") + ready = Floe::Workflow.wait(workflows, &:run_nonblock) + logger.info("Checking #{workflows.count} workflows...Complete - #{ready.count} ready") # Display status workflows.each do |workflow| - puts "", "#{workflow.name}#{" (#{workflow.status})" unless workflow.context.success?}", "===" if workflows.size > 1 - puts workflow.output + if workflows.size > 1 + logger.info("") + logger.info("#{workflow.name}#{" (#{workflow.status})" unless workflow.context.success?}") + logger.info("===") + end + + logger.info(output_streams[workflow].string) if output_streams[workflow] + logger.info(workflow.output) end workflows.all? { |workflow| workflow.context.success? } @@ -49,6 +62,7 @@ def parse_options!(args) opt :context, "JSON payload of the Context", :type => :string opt :credentials, "JSON payload with Credentials", :type => :string opt :credentials_file, "Path to a file with Credentials", :type => :string + opt :segment_output, "Segment output by each worker", :default => false Floe::ContainerRunner.cli_options(self) @@ -89,5 +103,17 @@ def create_workflow(workflow, context_payload, input, credentials) context = Floe::Workflow::Context.new(context_payload, :input => input, :credentials => credentials) Floe::Workflow.load(workflow, context) end + + def create_loggers(workflows, segment_output) + if workflows.size == 1 || !segment_output + # no extra work necessary + {} + else + workflows.each_with_object({}) do |workflow, h| + workflow.context.logger = Logger.new(output = StringIO.new) + h[workflow] = output + end + end + end end end diff --git a/lib/floe/container_runner/docker.rb b/lib/floe/container_runner/docker.rb index 93aa7597..091b249a 100644 --- a/lib/floe/container_runner/docker.rb +++ b/lib/floe/container_runner/docker.rb @@ -30,7 +30,7 @@ def run_async!(resource, env, secrets, context) end begin - runner_context["container_ref"] = run_container(image, env, execution_id, runner_context["secrets_ref"]) + runner_context["container_ref"] = run_container(image, env, execution_id, runner_context["secrets_ref"], context.logger) runner_context rescue AwesomeSpawn::CommandResultError => err cleanup(runner_context) @@ -123,7 +123,7 @@ def output(runner_context) attr_reader :network - def run_container(image, env, execution_id, secrets_file) + def run_container(image, env, execution_id, secrets_file, logger) params = run_container_params(image, env, execution_id, secrets_file) logger.debug("Running #{AwesomeSpawn.build_command_line(self.class::DOCKER_COMMAND, params)}") diff --git a/lib/floe/container_runner/kubernetes.rb b/lib/floe/container_runner/kubernetes.rb index b783e518..cbe5112d 100644 --- a/lib/floe/container_runner/kubernetes.rb +++ b/lib/floe/container_runner/kubernetes.rb @@ -285,7 +285,8 @@ def error_notice?(notice) code = notice.object&.code reason = notice.object&.reason - logger.warn("Received [#{code} #{reason}], [#{message}]") + # This feels like a global concern and not an end user's concern + Floe.logger.warn("Received [#{code} #{reason}], [#{message}]") true end diff --git a/lib/floe/logging.rb b/lib/floe/logging.rb index be66cb4c..bc9c9b04 100644 --- a/lib/floe/logging.rb +++ b/lib/floe/logging.rb @@ -7,7 +7,11 @@ def self.included(base) end def logger - Floe.logger + @logger || Floe.logger + end + + def logger=(logger) + @logger = logger end end end diff --git a/lib/floe/runner.rb b/lib/floe/runner.rb index 4fb89b88..3e014bcd 100644 --- a/lib/floe/runner.rb +++ b/lib/floe/runner.rb @@ -2,8 +2,6 @@ module Floe class Runner - include Logging - OUTPUT_MARKER = "__FLOE_OUTPUT__\n" def initialize(_options = {}) # rubocop:disable Style/RedundantInitialize diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 452f7513..26ef68cf 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -18,7 +18,6 @@ def load(path_or_io, context = nil, credentials = {}, name = nil) def wait(workflows, timeout: nil, &block) workflows = [workflows] if workflows.kind_of?(self) - logger.info("Checking #{workflows.count} workflows...") run_until = Time.now.utc + timeout if timeout.to_i > 0 ready = [] @@ -72,7 +71,6 @@ def wait(workflows, timeout: nil, &block) sleep_thread&.kill end - logger.info("Checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready ensure wait_thread&.kill diff --git a/lib/floe/workflow/context.rb b/lib/floe/workflow/context.rb index 0564202d..3e4e1d56 100644 --- a/lib/floe/workflow/context.rb +++ b/lib/floe/workflow/context.rb @@ -3,11 +3,13 @@ module Floe class Workflow class Context + include Logging + attr_accessor :credentials # @param context [Json|Hash] (default, create another with input and execution params) # @param input [Hash] (default: {}) - def initialize(context = nil, input: nil, credentials: {}) + def initialize(context = nil, input: nil, credentials: {}, logger: nil) context = JSON.parse(context) if context.kind_of?(String) input = JSON.parse(input || "{}") @@ -20,6 +22,8 @@ def initialize(context = nil, input: nil, credentials: {}) self["Task"] ||= {} @credentials = credentials || {} + + self.logger = logger if logger rescue JSON::ParserError => err raise Floe::InvalidExecutionInput, "Invalid State Machine Execution Input: #{err}: was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')" end diff --git a/lib/floe/workflow/state.rb b/lib/floe/workflow/state.rb index 5c972455..02664668 100644 --- a/lib/floe/workflow/state.rb +++ b/lib/floe/workflow/state.rb @@ -3,7 +3,6 @@ module Floe class Workflow class State - include Logging include ValidationMixin class << self @@ -63,7 +62,7 @@ def finish(context) def mark_started(context) context.state["EnteredTime"] = Time.now.utc.iso8601 - logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...") + context.logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...") end def mark_finished(context) @@ -74,7 +73,7 @@ def mark_finished(context) context.state["Duration"] = finished_time - entered_time level = context.failed? ? :error : :info - logger.public_send(level, "Running state: [#{long_name}] with input [#{context.json_input}]...Complete #{context.next_state ? "- next state [#{context.next_state}]" : "workflow -"} output: [#{context.json_output}]") + context.logger.public_send(level, "Running state: [#{long_name}] with input [#{context.json_input}]...Complete #{context.next_state ? "- next state [#{context.next_state}]" : "workflow -"} output: [#{context.json_output}]") 0 end diff --git a/lib/floe/workflow/states/retry_catch_mixin.rb b/lib/floe/workflow/states/retry_catch_mixin.rb index f19ac075..f7b20ddb 100644 --- a/lib/floe/workflow/states/retry_catch_mixin.rb +++ b/lib/floe/workflow/states/retry_catch_mixin.rb @@ -29,7 +29,7 @@ def retry_state!(context, error) wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"])) context.next_state = context.state_name context.output = error - logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}") + context.logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}") true end @@ -39,7 +39,7 @@ def catch_error!(context, error) context.next_state = catcher.next context.output = catcher.result_path.set(context.input, error) - logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]") + context.logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]") true end @@ -49,7 +49,7 @@ def fail_workflow!(context, error) # keeping in here for completeness context.next_state = nil context.output = error - logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]") + context.logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]") end end end diff --git a/spec/cli_spec.rb b/spec/cli_spec.rb index da4191a9..8cafc98a 100644 --- a/spec/cli_spec.rb +++ b/spec/cli_spec.rb @@ -30,7 +30,7 @@ lines = output.lines(:chomp => true) expect(lines.first).to include("Checking 1 workflows...") - expect(lines.last).to eq("{}") + expect(lines.last).to include("{}") end it "with a bare workflow and input" do @@ -39,7 +39,7 @@ lines = output.lines(:chomp => true) expect(lines.first).to include("Checking 1 workflows...") - expect(lines.last).to eq('{"foo":1}') + expect(lines.last).to include('{"foo":1}') end it "with a bare workflow and --input" do @@ -48,7 +48,7 @@ lines = output.lines(:chomp => true) expect(lines.first).to include("Checking 1 workflows...") - expect(lines.last).to eq('{"foo":1}') + expect(lines.last).to include('{"foo":1}') end it "with --workflow and no input" do @@ -57,7 +57,7 @@ lines = output.lines(:chomp => true) expect(lines.first).to include("Checking 1 workflows...") - expect(lines.last).to eq("{}") + expect(lines.last).to include("{}") end it "with --workflow and --input" do @@ -66,7 +66,7 @@ lines = output.lines(:chomp => true) expect(lines.first).to include("Checking 1 workflows...") - expect(lines.last).to eq('{"foo":1}') + expect(lines.last).to include('{"foo":1}') end it "with a bare workflow and --workflow" do @@ -74,7 +74,7 @@ expect(result).to be false lines = error.lines(:chomp => true) - expect(lines.first).to eq("Error: cannot specify both --workflow and bare workflows.") + expect(lines.first).to include("Error: cannot specify both --workflow and bare workflows.") end it "with --input but no workflows" do @@ -82,7 +82,7 @@ expect(result).to be false lines = error.lines(:chomp => true) - expect(lines.first).to eq("Error: workflow(s) must be specified.") + expect(lines.first).to include("Error: workflow(s) must be specified.") end it "with multiple bare workflow/input pairs" do @@ -91,7 +91,7 @@ lines = output.lines(:chomp => true) expect(lines.first).to include("Checking 2 workflows...") - expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp) + expect(lines.last(7).map { |line| line.gsub(/^.* INFO -- : /, "") }.join("\n")).to eq(<<~OUTPUT.chomp) workflow === {"foo":1} @@ -108,7 +108,7 @@ lines = output.lines(:chomp => true) expect(lines.first).to include("Checking 2 workflows...") - expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp) + expect(lines.last(7).map { |line| line.gsub(/^.* INFO -- : /, "") }.join("\n")).to eq(<<~OUTPUT.chomp) workflow === {"foo":1} @@ -124,7 +124,7 @@ expect(result).to be false lines = error.lines(:chomp => true) - expect(lines.first).to eq("Error: workflow/input pairs must be specified.") + expect(lines.first).to include("Error: workflow/input pairs must be specified.") end def run_cli(*args)