Skip to content

Commit

Permalink
Merge pull request #223 from kbrock/logging
Browse files Browse the repository at this point in the history
Separate workflow/context with different loggers
  • Loading branch information
agrare authored Oct 31, 2024
2 parents ba16eb9 + 94cc497 commit 1e674ae
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 30 deletions.
36 changes: 31 additions & 5 deletions lib/floe/cli.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
require "floe"
require "floe/container_runner"

module Floe
class CLI
include Logging

def initialize
require "optimist"
require "floe"
require "floe/container_runner"
require "logger"

Floe.logger = Logger.new($stdout)
Expand All @@ -20,12 +23,22 @@ def run(args = ARGV)
create_workflow(workflow, opts[:context], input, credentials)
end

Floe::Workflow.wait(workflows, &:run_nonblock)
output_streams = create_loggers(workflows, opts[:segment_output])

logger.info("Checking #{workflows.count} workflows...")
ready = Floe::Workflow.wait(workflows, &:run_nonblock)
logger.info("Checking #{workflows.count} workflows...Complete - #{ready.count} ready")

# Display status
workflows.each do |workflow|
puts "", "#{workflow.name}#{" (#{workflow.status})" unless workflow.context.success?}", "===" if workflows.size > 1
puts workflow.output
if workflows.size > 1
logger.info("")
logger.info("#{workflow.name}#{" (#{workflow.status})" unless workflow.context.success?}")
logger.info("===")
end

logger.info(output_streams[workflow].string) if output_streams[workflow]
logger.info(workflow.output)
end

workflows.all? { |workflow| workflow.context.success? }
Expand All @@ -49,6 +62,7 @@ def parse_options!(args)
opt :context, "JSON payload of the Context", :type => :string
opt :credentials, "JSON payload with Credentials", :type => :string
opt :credentials_file, "Path to a file with Credentials", :type => :string
opt :segment_output, "Segment output by each worker", :default => false

Floe::ContainerRunner.cli_options(self)

Expand Down Expand Up @@ -89,5 +103,17 @@ def create_workflow(workflow, context_payload, input, credentials)
context = Floe::Workflow::Context.new(context_payload, :input => input, :credentials => credentials)
Floe::Workflow.load(workflow, context)
end

def create_loggers(workflows, segment_output)
if workflows.size == 1 || !segment_output
# no extra work necessary
{}
else
workflows.each_with_object({}) do |workflow, h|
workflow.context.logger = Logger.new(output = StringIO.new)
h[workflow] = output
end
end
end
end
end
4 changes: 2 additions & 2 deletions lib/floe/container_runner/docker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def run_async!(resource, env, secrets, context)
end

begin
runner_context["container_ref"] = run_container(image, env, execution_id, runner_context["secrets_ref"])
runner_context["container_ref"] = run_container(image, env, execution_id, runner_context["secrets_ref"], context.logger)
runner_context
rescue AwesomeSpawn::CommandResultError => err
cleanup(runner_context)
Expand Down Expand Up @@ -123,7 +123,7 @@ def output(runner_context)

attr_reader :network

def run_container(image, env, execution_id, secrets_file)
def run_container(image, env, execution_id, secrets_file, logger)
params = run_container_params(image, env, execution_id, secrets_file)

logger.debug("Running #{AwesomeSpawn.build_command_line(self.class::DOCKER_COMMAND, params)}")
Expand Down
3 changes: 2 additions & 1 deletion lib/floe/container_runner/kubernetes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ def error_notice?(notice)
code = notice.object&.code
reason = notice.object&.reason

logger.warn("Received [#{code} #{reason}], [#{message}]")
# This feels like a global concern and not an end user's concern
Floe.logger.warn("Received [#{code} #{reason}], [#{message}]")

true
end
Expand Down
6 changes: 5 additions & 1 deletion lib/floe/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ def self.included(base)
end

def logger
Floe.logger
@logger || Floe.logger
end

def logger=(logger)
@logger = logger
end
end
end
2 changes: 0 additions & 2 deletions lib/floe/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

module Floe
class Runner
include Logging

OUTPUT_MARKER = "__FLOE_OUTPUT__\n"

def initialize(_options = {}) # rubocop:disable Style/RedundantInitialize
Expand Down
2 changes: 0 additions & 2 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def load(path_or_io, context = nil, credentials = {}, name = nil)

def wait(workflows, timeout: nil, &block)
workflows = [workflows] if workflows.kind_of?(self)
logger.info("Checking #{workflows.count} workflows...")

run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
Expand Down Expand Up @@ -72,7 +71,6 @@ def wait(workflows, timeout: nil, &block)
sleep_thread&.kill
end

logger.info("Checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
ensure
wait_thread&.kill
Expand Down
6 changes: 5 additions & 1 deletion lib/floe/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
module Floe
class Workflow
class Context
include Logging

attr_accessor :credentials

# @param context [Json|Hash] (default, create another with input and execution params)
# @param input [Hash] (default: {})
def initialize(context = nil, input: nil, credentials: {})
def initialize(context = nil, input: nil, credentials: {}, logger: nil)
context = JSON.parse(context) if context.kind_of?(String)
input = JSON.parse(input || "{}")

Expand All @@ -20,6 +22,8 @@ def initialize(context = nil, input: nil, credentials: {})
self["Task"] ||= {}

@credentials = credentials || {}

self.logger = logger if logger
rescue JSON::ParserError => err
raise Floe::InvalidExecutionInput, "Invalid State Machine Execution Input: #{err}: was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"
end
Expand Down
5 changes: 2 additions & 3 deletions lib/floe/workflow/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
module Floe
class Workflow
class State
include Logging
include ValidationMixin

class << self
Expand Down Expand Up @@ -63,7 +62,7 @@ def finish(context)
def mark_started(context)
context.state["EnteredTime"] = Time.now.utc.iso8601

logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...")
context.logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...")
end

def mark_finished(context)
Expand All @@ -74,7 +73,7 @@ def mark_finished(context)
context.state["Duration"] = finished_time - entered_time

level = context.failed? ? :error : :info
logger.public_send(level, "Running state: [#{long_name}] with input [#{context.json_input}]...Complete #{context.next_state ? "- next state [#{context.next_state}]" : "workflow -"} output: [#{context.json_output}]")
context.logger.public_send(level, "Running state: [#{long_name}] with input [#{context.json_input}]...Complete #{context.next_state ? "- next state [#{context.next_state}]" : "workflow -"} output: [#{context.json_output}]")

0
end
Expand Down
6 changes: 3 additions & 3 deletions lib/floe/workflow/states/retry_catch_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def retry_state!(context, error)
wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
context.logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
true
end

Expand All @@ -39,7 +39,7 @@ def catch_error!(context, error)

context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")
context.logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")

true
end
Expand All @@ -49,7 +49,7 @@ def fail_workflow!(context, error)
# keeping in here for completeness
context.next_state = nil
context.output = error
logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
context.logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
end
end
end
Expand Down
20 changes: 10 additions & 10 deletions spec/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("Checking 1 workflows...")
expect(lines.last).to eq("{}")
expect(lines.last).to include("{}")
end

it "with a bare workflow and input" do
Expand All @@ -39,7 +39,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("Checking 1 workflows...")
expect(lines.last).to eq('{"foo":1}')
expect(lines.last).to include('{"foo":1}')
end

it "with a bare workflow and --input" do
Expand All @@ -48,7 +48,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("Checking 1 workflows...")
expect(lines.last).to eq('{"foo":1}')
expect(lines.last).to include('{"foo":1}')
end

it "with --workflow and no input" do
Expand All @@ -57,7 +57,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("Checking 1 workflows...")
expect(lines.last).to eq("{}")
expect(lines.last).to include("{}")
end

it "with --workflow and --input" do
Expand All @@ -66,23 +66,23 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("Checking 1 workflows...")
expect(lines.last).to eq('{"foo":1}')
expect(lines.last).to include('{"foo":1}')
end

it "with a bare workflow and --workflow" do
_output, error, result = run_cli(workflow, "--workflow", workflow)
expect(result).to be false

lines = error.lines(:chomp => true)
expect(lines.first).to eq("Error: cannot specify both --workflow and bare workflows.")
expect(lines.first).to include("Error: cannot specify both --workflow and bare workflows.")
end

it "with --input but no workflows" do
_output, error, result = run_cli("--input", '{"foo":1}')
expect(result).to be false

lines = error.lines(:chomp => true)
expect(lines.first).to eq("Error: workflow(s) must be specified.")
expect(lines.first).to include("Error: workflow(s) must be specified.")
end

it "with multiple bare workflow/input pairs" do
Expand All @@ -91,7 +91,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("Checking 2 workflows...")
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
expect(lines.last(7).map { |line| line.gsub(/^.* INFO -- : /, "") }.join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo":1}
Expand All @@ -108,7 +108,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("Checking 2 workflows...")
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
expect(lines.last(7).map { |line| line.gsub(/^.* INFO -- : /, "") }.join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo":1}
Expand All @@ -124,7 +124,7 @@
expect(result).to be false

lines = error.lines(:chomp => true)
expect(lines.first).to eq("Error: workflow/input pairs must be specified.")
expect(lines.first).to include("Error: workflow/input pairs must be specified.")
end

def run_cli(*args)
Expand Down

0 comments on commit 1e674ae

Please sign in to comment.