Skip to content

Commit

Permalink
Move docker scan into GenStage (#707)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino authored Nov 10, 2022
1 parent 8bc9cb5 commit a04adbb
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 10 deletions.
2 changes: 0 additions & 2 deletions apps/core/lib/core/pubsub/protocols/fanout.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ defimpl Core.PubSub.Fanout, for: Core.PubSub.DockerImageCreated do
require Logger

def fanout(%{item: img}) do
Logger.info "scheduling scan for image #{img.id}"
Core.Buffer.Orchestrator.submit(Core.Buffers.Docker, img.docker_repository.repository_id, img)
Core.Conduit.Broker.publish(%Conduit.Message{body: img}, :dkr)
end
end

Expand Down
1 change: 0 additions & 1 deletion apps/core/test/pubsub/fanout/docker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ defmodule Core.PubSub.Fanout.DockerTest do
describe "DockerImageCreated" do
test "it will send to rabbit" do
img = insert(:docker_image)
expect(Core.Conduit.Broker, :publish, fn %Conduit.Message{body: ^img}, :dkr -> :ok end)
expect(Core.Buffers.Docker, :submit, fn _, ^img -> :ok end)
event = %PubSub.DockerImageCreated{item: img}
Core.PubSub.Fanout.fanout(event)
Expand Down
6 changes: 2 additions & 4 deletions apps/worker/lib/worker/docker/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ defmodule Worker.Docker.Pipeline do
require Logger

def start_link(producer) do
Flow.from_stages([producer], stages: 1, max_demand: 10)
Flow.from_stages([producer], stages: 1, max_demand: 5)
|> Flow.map(fn img ->
Logger.info "Scheduling docker scan for #{img.id}"
img
end)
|> Flow.map(&defer_scan/1)
|> Flow.map(&Worker.Conduit.Subscribers.Docker.scan_image/1)
|> Flow.start_link()
end

defp defer_scan(img), do: Worker.Conduit.Broker.publish(%Conduit.Message{body: img}, :dkr)
end
11 changes: 9 additions & 2 deletions apps/worker/lib/worker/docker/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ defmodule Worker.Docker.Producer do

@max 20
@scan_interval 7
@poll :timer.seconds(60)

defmodule State, do: defstruct [:demand, :draining]

Expand All @@ -15,7 +14,7 @@ defmodule Worker.Docker.Producer do
end

def init(_) do
:timer.send_interval(@poll, :poll)
:timer.send_interval(poll_interval(), :poll)

{:producer, %State{demand: 0}}
end
Expand All @@ -40,4 +39,12 @@ defmodule Worker.Docker.Producer do
_ -> empty(%{state | demand: demand})
end
end

defp poll_interval() do
case System.get_env("DOCKER_SCAN_POLL_INTERVAL") do
v when is_binary(v) -> String.to_integer(v)
_ -> 60
end
|> :timer.seconds()
end
end
2 changes: 1 addition & 1 deletion apps/worker/test/docker/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Worker.Docker.PipelineTest do
insert(:docker_image, scanned_at: Timex.now(), scan_completed_at: Timex.now())

me = self()
expect(Worker.Conduit.Broker, :publish, 3, fn %{body: img}, :dkr -> send me, {:dkr, img} end)
expect(Worker.Conduit.Subscribers.Docker, :scan_image, 3, fn img -> send me, {:dkr, img} end)

{:ok, producer} = Docker.Producer.start_link()
{:ok, _} = Docker.Pipeline.start_link(producer)
Expand Down
1 change: 1 addition & 0 deletions apps/worker/test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ Mimic.copy(Goth.Token)
Mimic.copy(GoogleApi.CloudResourceManager.V3.Api.Projects)
Mimic.copy(Cloudflare.DnsRecord)
Mimic.copy(Worker.Conduit.Broker)
Mimic.copy(Worker.Conduit.Subscribers.Docker)

ExUnit.start()

0 comments on commit a04adbb

Please sign in to comment.