diff --git a/lib/sequin_web/live/components/consumer_form.ex b/lib/sequin_web/live/sink_consumers/form.ex similarity index 91% rename from lib/sequin_web/live/components/consumer_form.ex rename to lib/sequin_web/live/sink_consumers/form.ex index 5dcf785b..7906160b 100644 --- a/lib/sequin_web/live/components/consumer_form.ex +++ b/lib/sequin_web/live/sink_consumers/form.ex @@ -1,6 +1,6 @@ -defmodule SequinWeb.Components.ConsumerForm do +defmodule SequinWeb.SinkConsumersLive.Form do @moduledoc false - use SequinWeb, :live_component + use SequinWeb, :live_view alias Sequin.Consumers alias Sequin.Consumers.Backfill @@ -38,7 +38,7 @@ defmodule SequinWeb.Components.ConsumerForm do require Logger - @impl Phoenix.LiveComponent + @impl Phoenix.LiveView def render(assigns) do encoded_errors = if assigns.show_errors? do @@ -80,53 +80,89 @@ defmodule SequinWeb.Components.ConsumerForm do """ end - @impl Phoenix.LiveComponent - def update(%{event: :database_tables_updated}, socket) do - socket = assign_databases(socket) - {:ok, socket} + @impl Phoenix.LiveView + def mount(params, _session, socket) do + case load_or_init_consumer(params, socket) do + {:ok, consumer} -> + is_create = is_nil(consumer.id) + + if is_create do + # Refresh tables for all databases in the account for creates + account = current_account(socket) + databases = Databases.list_dbs_for_account(account.id) + Enum.each(databases, &DatabaseUpdateWorker.enqueue(&1.id)) + end + + socket = + socket + |> assign( + consumer: Repo.preload(consumer, [:sequence, :postgres_database]), + show_errors?: false, + submit_error: nil, + changeset: nil, + sequence_changeset: nil, + prev_params: %{}, + page_title: if(is_create, do: "New Sink", else: "Edit Sink") + ) + |> assign_databases() + |> assign_http_endpoints() + |> reset_changeset() + + :syn.join(:account, {:database_tables_updated, current_account_id(socket)}, self()) + + {:ok, socket} + + {:error, _error} -> + {:ok, + socket + |> put_flash(:error, "Consumer not found") + |> push_navigate(to: ~p"/sinks")} + end end - # If the changeset is not nil, we avoid re-updating the assigns - # the parent will cause update to fire if the props change - # if you need to handle props changing, refactor - be sure to preserve the changeset - @impl Phoenix.LiveComponent - def update(_assigns, %{assigns: %{changeset: %Ecto.Changeset{}}} = socket) do - {:ok, socket} + defp load_or_init_consumer(%{"id" => id}, socket) do + with {:ok, consumer} <- Consumers.get_sink_consumer_for_account(current_account_id(socket), id) do + consumer = + Repo.preload(consumer, [:postgres_database, :sequence]) + + {:ok, consumer} + end end - @impl Phoenix.LiveComponent - def update(assigns, socket) do - consumer = assigns[:consumer] + defp load_or_init_consumer(%{"kind" => kind}, _socket) do + case kind do + "http_push" -> + {:ok, %SinkConsumer{type: :http_push, sink: %HttpPushSink{}}} - component = "consumers/SinkConsumerForm" + "sqs" -> + {:ok, %SinkConsumer{type: :sqs, sink: %SqsSink{}, batch_size: 10}} - socket = - socket - |> assign(assigns) - |> assign( - consumer: Repo.preload(consumer, [:sequence, :postgres_database]), - show_errors?: false, - submit_error: nil, - changeset: nil, - sequence_changeset: nil, - component: component, - prev_params: %{} - ) - |> assign_databases() - |> assign_http_endpoints() - |> reset_changeset() + "kafka" -> + {:ok, %SinkConsumer{type: :kafka, sink: %KafkaSink{tls: false}}} + + "redis" -> + {:ok, %SinkConsumer{type: :redis, batch_size: 100, sink: %RedisSink{}}} + + "sequin_stream" -> + {:ok, %SinkConsumer{type: :sequin_stream, sink: %SequinStreamSink{}}} - :syn.join(:account, {:database_tables_updated, current_account_id(socket)}, self()) + "gcp_pubsub" -> + {:ok, %SinkConsumer{type: :gcp_pubsub, sink: %GcpPubsubSink{}}} - {:ok, socket} + "nats" -> + {:ok, %SinkConsumer{type: :nats, sink: %NatsSink{}}} + + "rabbitmq" -> + {:ok, %SinkConsumer{type: :rabbitmq, sink: %RabbitMqSink{virtual_host: "/"}}} + end end - @impl Phoenix.LiveComponent + @impl Phoenix.LiveView def handle_event("validate", _params, socket) do {:noreply, socket} end - @impl Phoenix.LiveComponent + @impl Phoenix.LiveView def handle_event("form_updated", %{"form" => form}, socket) do params = form |> decode_params(socket) |> maybe_put_replication_slot_id(socket) @@ -177,12 +213,12 @@ defmodule SequinWeb.Components.ConsumerForm do {:noreply, socket} end - @impl Phoenix.LiveComponent + @impl Phoenix.LiveView def handle_event("refresh_databases", _params, socket) do {:noreply, assign_databases(socket)} end - @impl Phoenix.LiveComponent + @impl Phoenix.LiveView def handle_event("refresh_tables", %{"database_id" => database_id}, socket) do with index when not is_nil(index) <- Enum.find_index(socket.assigns.databases, &(&1.id == database_id)), database = Enum.at(socket.assigns.databases, index), @@ -195,7 +231,7 @@ defmodule SequinWeb.Components.ConsumerForm do end end - @impl Phoenix.LiveComponent + @impl Phoenix.LiveView def handle_event("refresh_sequences", %{"database_id" => database_id}, socket) do with {:ok, database} <- Databases.get_db(database_id), {:ok, _updated_database} <- Databases.update_tables(database) do @@ -263,6 +299,11 @@ defmodule SequinWeb.Components.ConsumerForm do end end + @impl Phoenix.LiveView + def handle_info({:database_tables_updated, _updated_database}, socket) do + {:noreply, assign_databases(socket)} + end + defp test_sqs_connection(socket) do sink_changeset = socket.assigns.changeset diff --git a/lib/sequin_web/live/sink_consumers/index.ex b/lib/sequin_web/live/sink_consumers/index.ex index b1ebc31c..c0475c48 100644 --- a/lib/sequin_web/live/sink_consumers/index.ex +++ b/lib/sequin_web/live/sink_consumers/index.ex @@ -3,19 +3,9 @@ defmodule SequinWeb.SinkConsumersLive.Index do use SequinWeb, :live_view alias Sequin.Consumers - alias Sequin.Consumers.GcpPubsubSink - alias Sequin.Consumers.HttpPushSink - alias Sequin.Consumers.KafkaSink - alias Sequin.Consumers.NatsSink - alias Sequin.Consumers.RabbitMqSink - alias Sequin.Consumers.RedisSink - alias Sequin.Consumers.SequinStreamSink alias Sequin.Consumers.SinkConsumer - alias Sequin.Consumers.SqsSink alias Sequin.Databases - alias Sequin.Databases.DatabaseUpdateWorker alias Sequin.Health - alias SequinWeb.Components.ConsumerForm alias SequinWeb.RouteHelpers @impl Phoenix.LiveView @@ -59,14 +49,6 @@ defmodule SequinWeb.SinkConsumersLive.Index do end @impl Phoenix.LiveView - def render(%{live_action: :new} = assigns) do - ~H""" -