From a04adbb99fd2fd7d7aa7f88bc362e5e89431a148 Mon Sep 17 00:00:00 2001 From: michaeljguarino Date: Thu, 10 Nov 2022 14:45:09 -0500 Subject: [PATCH] Move docker scan into GenStage (#707) --- apps/core/lib/core/pubsub/protocols/fanout.ex | 2 -- apps/core/test/pubsub/fanout/docker_test.exs | 1 - apps/worker/lib/worker/docker/pipeline.ex | 6 ++---- apps/worker/lib/worker/docker/producer.ex | 11 +++++++++-- apps/worker/test/docker/pipeline_test.exs | 2 +- apps/worker/test/test_helper.exs | 1 + 6 files changed, 13 insertions(+), 10 deletions(-) diff --git a/apps/core/lib/core/pubsub/protocols/fanout.ex b/apps/core/lib/core/pubsub/protocols/fanout.ex index 207762f4e..6bbb46aa4 100644 --- a/apps/core/lib/core/pubsub/protocols/fanout.ex +++ b/apps/core/lib/core/pubsub/protocols/fanout.ex @@ -69,9 +69,7 @@ defimpl Core.PubSub.Fanout, for: Core.PubSub.DockerImageCreated do require Logger def fanout(%{item: img}) do - Logger.info "scheduling scan for image #{img.id}" Core.Buffer.Orchestrator.submit(Core.Buffers.Docker, img.docker_repository.repository_id, img) - Core.Conduit.Broker.publish(%Conduit.Message{body: img}, :dkr) end end diff --git a/apps/core/test/pubsub/fanout/docker_test.exs b/apps/core/test/pubsub/fanout/docker_test.exs index 008abb5b8..f9cab58cd 100644 --- a/apps/core/test/pubsub/fanout/docker_test.exs +++ b/apps/core/test/pubsub/fanout/docker_test.exs @@ -36,7 +36,6 @@ defmodule Core.PubSub.Fanout.DockerTest do describe "DockerImageCreated" do test "it will send to rabbit" do img = insert(:docker_image) - expect(Core.Conduit.Broker, :publish, fn %Conduit.Message{body: ^img}, :dkr -> :ok end) expect(Core.Buffers.Docker, :submit, fn _, ^img -> :ok end) event = %PubSub.DockerImageCreated{item: img} Core.PubSub.Fanout.fanout(event) diff --git a/apps/worker/lib/worker/docker/pipeline.ex b/apps/worker/lib/worker/docker/pipeline.ex index aa84b1084..5d0405b0b 100644 --- a/apps/worker/lib/worker/docker/pipeline.ex +++ b/apps/worker/lib/worker/docker/pipeline.ex @@ -3,14 +3,12 @@ defmodule Worker.Docker.Pipeline do require Logger def start_link(producer) do - Flow.from_stages([producer], stages: 1, max_demand: 10) + Flow.from_stages([producer], stages: 1, max_demand: 5) |> Flow.map(fn img -> Logger.info "Scheduling docker scan for #{img.id}" img end) - |> Flow.map(&defer_scan/1) + |> Flow.map(&Worker.Conduit.Subscribers.Docker.scan_image/1) |> Flow.start_link() end - - defp defer_scan(img), do: Worker.Conduit.Broker.publish(%Conduit.Message{body: img}, :dkr) end diff --git a/apps/worker/lib/worker/docker/producer.ex b/apps/worker/lib/worker/docker/producer.ex index 9faec38c2..8948373d0 100644 --- a/apps/worker/lib/worker/docker/producer.ex +++ b/apps/worker/lib/worker/docker/producer.ex @@ -6,7 +6,6 @@ defmodule Worker.Docker.Producer do @max 20 @scan_interval 7 - @poll :timer.seconds(60) defmodule State, do: defstruct [:demand, :draining] @@ -15,7 +14,7 @@ defmodule Worker.Docker.Producer do end def init(_) do - :timer.send_interval(@poll, :poll) + :timer.send_interval(poll_interval(), :poll) {:producer, %State{demand: 0}} end @@ -40,4 +39,12 @@ defmodule Worker.Docker.Producer do _ -> empty(%{state | demand: demand}) end end + + defp poll_interval() do + case System.get_env("DOCKER_SCAN_POLL_INTERVAL") do + v when is_binary(v) -> String.to_integer(v) + _ -> 60 + end + |> :timer.seconds() + end end diff --git a/apps/worker/test/docker/pipeline_test.exs b/apps/worker/test/docker/pipeline_test.exs index c3c1c45d9..05351024c 100644 --- a/apps/worker/test/docker/pipeline_test.exs +++ b/apps/worker/test/docker/pipeline_test.exs @@ -12,7 +12,7 @@ defmodule Worker.Docker.PipelineTest do insert(:docker_image, scanned_at: Timex.now(), scan_completed_at: Timex.now()) me = self() - expect(Worker.Conduit.Broker, :publish, 3, fn %{body: img}, :dkr -> send me, {:dkr, img} end) + expect(Worker.Conduit.Subscribers.Docker, :scan_image, 3, fn img -> send me, {:dkr, img} end) {:ok, producer} = Docker.Producer.start_link() {:ok, _} = Docker.Pipeline.start_link(producer) diff --git a/apps/worker/test/test_helper.exs b/apps/worker/test/test_helper.exs index e34eea8ff..3325c3013 100644 --- a/apps/worker/test/test_helper.exs +++ b/apps/worker/test/test_helper.exs @@ -3,5 +3,6 @@ Mimic.copy(Goth.Token) Mimic.copy(GoogleApi.CloudResourceManager.V3.Api.Projects) Mimic.copy(Cloudflare.DnsRecord) Mimic.copy(Worker.Conduit.Broker) +Mimic.copy(Worker.Conduit.Subscribers.Docker) ExUnit.start()