Skip to content

Commit

Permalink
♻️ Move ConsumerForm to its own LiveView
Browse files Browse the repository at this point in the history
  • Loading branch information
acco committed Dec 21, 2024
1 parent c64b41a commit 7bb7c6f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 189 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule SequinWeb.Components.ConsumerForm do
defmodule SequinWeb.SinkConsumersLive.Form do
@moduledoc false
use SequinWeb, :live_component
use SequinWeb, :live_view

alias Sequin.Consumers
alias Sequin.Consumers.Backfill
Expand Down Expand Up @@ -38,7 +38,7 @@ defmodule SequinWeb.Components.ConsumerForm do

require Logger

@impl Phoenix.LiveComponent
@impl Phoenix.LiveView
def render(assigns) do
encoded_errors =
if assigns.show_errors? do
Expand Down Expand Up @@ -80,53 +80,89 @@ defmodule SequinWeb.Components.ConsumerForm do
"""
end

@impl Phoenix.LiveComponent
def update(%{event: :database_tables_updated}, socket) do
socket = assign_databases(socket)
{:ok, socket}
@impl Phoenix.LiveView
def mount(params, _session, socket) do
case load_or_init_consumer(params, socket) do
{:ok, consumer} ->
is_create = is_nil(consumer.id)

if is_create do
# Refresh tables for all databases in the account for creates
account = current_account(socket)
databases = Databases.list_dbs_for_account(account.id)
Enum.each(databases, &DatabaseUpdateWorker.enqueue(&1.id))
end

socket =
socket
|> assign(
consumer: Repo.preload(consumer, [:sequence, :postgres_database]),
show_errors?: false,
submit_error: nil,
changeset: nil,
sequence_changeset: nil,
prev_params: %{},
page_title: if(is_create, do: "New Sink", else: "Edit Sink")
)
|> assign_databases()
|> assign_http_endpoints()
|> reset_changeset()

:syn.join(:account, {:database_tables_updated, current_account_id(socket)}, self())

{:ok, socket}

{:error, _error} ->
{:ok,
socket
|> put_flash(:error, "Consumer not found")
|> push_navigate(to: ~p"/sinks")}
end
end

# If the changeset is not nil, we avoid re-updating the assigns
# the parent will cause update to fire if the props change
# if you need to handle props changing, refactor - be sure to preserve the changeset
@impl Phoenix.LiveComponent
def update(_assigns, %{assigns: %{changeset: %Ecto.Changeset{}}} = socket) do
{:ok, socket}
defp load_or_init_consumer(%{"id" => id}, socket) do
with {:ok, consumer} <- Consumers.get_sink_consumer_for_account(current_account_id(socket), id) do
consumer =
Repo.preload(consumer, [:postgres_database, :sequence])

{:ok, consumer}
end
end

@impl Phoenix.LiveComponent
def update(assigns, socket) do
consumer = assigns[:consumer]
defp load_or_init_consumer(%{"kind" => kind}, _socket) do
case kind do
"http_push" ->
{:ok, %SinkConsumer{type: :http_push, sink: %HttpPushSink{}}}

component = "consumers/SinkConsumerForm"
"sqs" ->
{:ok, %SinkConsumer{type: :sqs, sink: %SqsSink{}, batch_size: 10}}

socket =
socket
|> assign(assigns)
|> assign(
consumer: Repo.preload(consumer, [:sequence, :postgres_database]),
show_errors?: false,
submit_error: nil,
changeset: nil,
sequence_changeset: nil,
component: component,
prev_params: %{}
)
|> assign_databases()
|> assign_http_endpoints()
|> reset_changeset()
"kafka" ->
{:ok, %SinkConsumer{type: :kafka, sink: %KafkaSink{tls: false}}}

"redis" ->
{:ok, %SinkConsumer{type: :redis, batch_size: 100, sink: %RedisSink{}}}

"sequin_stream" ->
{:ok, %SinkConsumer{type: :sequin_stream, sink: %SequinStreamSink{}}}

:syn.join(:account, {:database_tables_updated, current_account_id(socket)}, self())
"gcp_pubsub" ->
{:ok, %SinkConsumer{type: :gcp_pubsub, sink: %GcpPubsubSink{}}}

{:ok, socket}
"nats" ->
{:ok, %SinkConsumer{type: :nats, sink: %NatsSink{}}}

"rabbitmq" ->
{:ok, %SinkConsumer{type: :rabbitmq, sink: %RabbitMqSink{virtual_host: "/"}}}
end
end

@impl Phoenix.LiveComponent
@impl Phoenix.LiveView
def handle_event("validate", _params, socket) do
{:noreply, socket}
end

@impl Phoenix.LiveComponent
@impl Phoenix.LiveView
def handle_event("form_updated", %{"form" => form}, socket) do
params = form |> decode_params(socket) |> maybe_put_replication_slot_id(socket)

Expand Down Expand Up @@ -177,12 +213,12 @@ defmodule SequinWeb.Components.ConsumerForm do
{:noreply, socket}
end

@impl Phoenix.LiveComponent
@impl Phoenix.LiveView
def handle_event("refresh_databases", _params, socket) do
{:noreply, assign_databases(socket)}
end

@impl Phoenix.LiveComponent
@impl Phoenix.LiveView
def handle_event("refresh_tables", %{"database_id" => database_id}, socket) do
with index when not is_nil(index) <- Enum.find_index(socket.assigns.databases, &(&1.id == database_id)),
database = Enum.at(socket.assigns.databases, index),
Expand All @@ -195,7 +231,7 @@ defmodule SequinWeb.Components.ConsumerForm do
end
end

@impl Phoenix.LiveComponent
@impl Phoenix.LiveView
def handle_event("refresh_sequences", %{"database_id" => database_id}, socket) do
with {:ok, database} <- Databases.get_db(database_id),
{:ok, _updated_database} <- Databases.update_tables(database) do
Expand Down Expand Up @@ -263,6 +299,11 @@ defmodule SequinWeb.Components.ConsumerForm do
end
end

@impl Phoenix.LiveView
def handle_info({:database_tables_updated, _updated_database}, socket) do
{:noreply, assign_databases(socket)}
end

defp test_sqs_connection(socket) do
sink_changeset =
socket.assigns.changeset
Expand Down
138 changes: 0 additions & 138 deletions lib/sequin_web/live/sink_consumers/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,9 @@ defmodule SequinWeb.SinkConsumersLive.Index do
use SequinWeb, :live_view

alias Sequin.Consumers
alias Sequin.Consumers.GcpPubsubSink
alias Sequin.Consumers.HttpPushSink
alias Sequin.Consumers.KafkaSink
alias Sequin.Consumers.NatsSink
alias Sequin.Consumers.RabbitMqSink
alias Sequin.Consumers.RedisSink
alias Sequin.Consumers.SequinStreamSink
alias Sequin.Consumers.SinkConsumer
alias Sequin.Consumers.SqsSink
alias Sequin.Databases
alias Sequin.Databases.DatabaseUpdateWorker
alias Sequin.Health
alias SequinWeb.Components.ConsumerForm
alias SequinWeb.RouteHelpers

@impl Phoenix.LiveView
Expand Down Expand Up @@ -59,14 +49,6 @@ defmodule SequinWeb.SinkConsumersLive.Index do
end

@impl Phoenix.LiveView
def render(%{live_action: :new} = assigns) do
~H"""
<div id="consumers-index">
<%= render_consumer_form(assigns) %>
</div>
"""
end

def render(assigns) do
consumers = Enum.filter(assigns.consumers, &is_struct(&1, SinkConsumer))

Expand Down Expand Up @@ -103,132 +85,12 @@ defmodule SequinWeb.SinkConsumersLive.Index do
|> assign(:live_action, :index)
end

defp apply_action(socket, :new, %{"kind" => kind}) do
# Refresh tables for all databases in the account
account = current_account(socket)
databases = Databases.list_dbs_for_account(account.id)
Enum.each(databases, &DatabaseUpdateWorker.enqueue(&1.id))

socket
|> assign(:page_title, "New Sink")
|> assign(:live_action, :new)
|> assign(:form_kind, kind)
end

defp render_consumer_form(%{form_kind: "http_push"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={%SinkConsumer{type: :http_push, sink: %HttpPushSink{}}}
/>
"""
end

defp render_consumer_form(%{form_kind: "sqs"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={%SinkConsumer{type: :sqs, sink: %SqsSink{}, batch_size: 10}}
/>
"""
end

defp render_consumer_form(%{form_kind: "kafka"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={%SinkConsumer{type: :kafka, sink: %KafkaSink{tls: false}}}
/>
"""
end

defp render_consumer_form(%{form_kind: "redis"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={%SinkConsumer{type: :redis, batch_size: 100, sink: %RedisSink{}}}
/>
"""
end

defp render_consumer_form(%{form_kind: "sequin_stream"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={%SinkConsumer{type: :sequin_stream, sink: %SequinStreamSink{}}}
/>
"""
end

defp render_consumer_form(%{form_kind: "gcp_pubsub"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={
%SinkConsumer{
type: :gcp_pubsub,
sink: %GcpPubsubSink{}
}
}
/>
"""
end

defp render_consumer_form(%{form_kind: "nats"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={%SinkConsumer{type: :nats, sink: %NatsSink{}}}
/>
"""
end

defp render_consumer_form(%{form_kind: "rabbitmq"} = assigns) do
~H"""
<.live_component
current_user={@current_user}
module={ConsumerForm}
id="new-consumer"
action={:new}
consumer={%SinkConsumer{type: :rabbitmq, sink: %RabbitMqSink{virtual_host: "/"}}}
/>
"""
end

@impl Phoenix.LiveView
def handle_info(:update_health, socket) do
Process.send_after(self(), :update_health, 1000)
{:noreply, assign(socket, :consumers, load_consumer_health(socket.assigns.consumers))}
end

def handle_info({:database_tables_updated, _updated_database}, socket) do
# Proxy down to ConsumerForm
send_update(ConsumerForm, id: "new-consumer", event: :database_tables_updated)

{:noreply, socket}
end

defp load_consumer_health(consumers) do
Enum.map(consumers, fn consumer ->
case Health.get(consumer) do
Expand Down
10 changes: 0 additions & 10 deletions lib/sequin_web/live/sink_consumers/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ defmodule SequinWeb.SinkConsumersLive.Show do
alias Sequin.Health
alias Sequin.Metrics
alias Sequin.Repo
alias SequinWeb.Components.ConsumerForm
alias SequinWeb.RouteHelpers

require Logger
Expand Down Expand Up @@ -140,15 +139,6 @@ defmodule SequinWeb.SinkConsumersLive.Show do
<!-- Main content area that fills the remaining space -->
<div class="flex-1 overflow-auto">
<%= case {@live_action, @consumer} do %>
<% {:edit, _consumer} -> %>
<!-- Edit component -->
<.live_component
module={ConsumerForm}
id="edit-consumer"
consumer={@consumer}
on_finish={&handle_edit_finish/1}
current_user={@current_user}
/>
<% {:show, %SinkConsumer{}} -> %>
<!-- ShowHttpPush component -->
<.svelte
Expand Down
4 changes: 2 additions & 2 deletions lib/sequin_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ defmodule SequinWeb.Router do

live_session :default, on_mount: [{SequinWeb.UserAuth, :ensure_authenticated}, {SequinWeb.LiveHooks, :global}] do
live "/sinks", SinkConsumersLive.Index, :index
live "/sinks/new", SinkConsumersLive.Index, :new
live "/sinks/new/:kind", SinkConsumersLive.Form, :new
live "/sinks/:type/:id/edit", SinkConsumersLive.Form, :edit
live "/sinks/:type/:id", SinkConsumersLive.Show, :show
live "/sinks/:type/:id/messages", SinkConsumersLive.Show, :messages
live "/sinks/:type/:id/edit", SinkConsumersLive.Show, :edit

live "/databases", DatabasesLive.Index, :index
live "/databases/new", DatabasesLive.Form, :new
Expand Down

0 comments on commit 7bb7c6f

Please sign in to comment.