diff --git a/lib/plausible/data_migration/backfill_teams.ex b/lib/plausible/data_migration/backfill_teams.ex index af93f1e17a54..bf98cbec7e2b 100644 --- a/lib/plausible/data_migration/backfill_teams.ex +++ b/lib/plausible/data_migration/backfill_teams.ex @@ -5,10 +5,7 @@ defmodule Plausible.DataMigration.BackfillTeams do import Ecto.Query - alias Plausible.Teams - - @repo Plausible.DataMigration.PostgresRepo - @max_concurrency 12 + alias Plausible.{Repo, Teams} defmacrop is_distinct(f1, f2) do quote do @@ -18,17 +15,7 @@ defmodule Plausible.DataMigration.BackfillTeams do def run(opts \\ []) do dry_run? = Keyword.get(opts, :dry_run?, true) - - # Teams backfill - db_url = - System.get_env( - "TEAMS_MIGRATION_DB_URL", - Application.get_env(:plausible, Plausible.Repo)[:url] - ) - - @repo.start(db_url, pool_size: 2 * @max_concurrency) - - backfill(dry_run?) + Repo.transaction(fn -> backfill(dry_run?) end, timeout: :infinity) end defp backfill(dry_run?) do @@ -42,7 +29,7 @@ defmodule Plausible.DataMigration.BackfillTeams do left_join: s in assoc(t, :sites), where: is_nil(s.id) ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(orphaned_teams)} orphaned teams...") @@ -65,7 +52,7 @@ defmodule Plausible.DataMigration.BackfillTeams do where: is_nil(s.team_id), select: %{s | memberships: [%{user: o, role: :owner}]} ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(sites_without_teams)} sites without teams...") @@ -89,7 +76,7 @@ defmodule Plausible.DataMigration.BackfillTeams do where: tm.user_id == parent_as(:user).id ) ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(users_on_trial_without_team)} users on trial without team...") @@ -110,7 +97,7 @@ defmodule Plausible.DataMigration.BackfillTeams do inner_join: s in assoc(gm, :site), where: tm.team_id != s.team_id ) - |> @repo.all() + |> Repo.all() log( "Found #{length(mismatched_guest_memberships_to_remove)} guest memberships with mismatched team to remove..." @@ -122,7 +109,7 @@ defmodule Plausible.DataMigration.BackfillTeams do log("Pruning guest team memberships for #{length(team_ids_to_prune)} teams...") from(t in Teams.Team, where: t.id in ^team_ids_to_prune) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) |> Enum.each(fn team -> Plausible.Teams.Memberships.prune_guests(team) end) @@ -149,7 +136,7 @@ defmodule Plausible.DataMigration.BackfillTeams do as: :team_membership, where: not exists(site_memberships_query) ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(guest_memberships_to_remove)} guest memberships to remove...") @@ -159,7 +146,7 @@ defmodule Plausible.DataMigration.BackfillTeams do log("Pruning guest team memberships for #{length(team_ids_to_prune)} teams...") from(t in Teams.Team, where: t.id in ^team_ids_to_prune) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) |> Enum.each(fn team -> Plausible.Teams.Memberships.prune_guests(team) end) @@ -198,7 +185,7 @@ defmodule Plausible.DataMigration.BackfillTeams do role: sm.role } ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log( "Found #{length(site_memberships_to_backfill)} site memberships without guest membership..." @@ -225,7 +212,7 @@ defmodule Plausible.DataMigration.BackfillTeams do (gm.role == :editor and sm.role == "viewer"), select: {gm, sm.role} ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(stale_guest_memberships)} guest memberships with role out of sync...") @@ -256,7 +243,7 @@ defmodule Plausible.DataMigration.BackfillTeams do as: :team_invitation, where: not exists(site_invitations_query) ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(guest_invitations_to_remove)} guest invitations to remove...") @@ -266,7 +253,7 @@ defmodule Plausible.DataMigration.BackfillTeams do log("Pruning guest team invitations for #{length(team_ids_to_prune)} teams...") from(t in Teams.Team, where: t.id in ^team_ids_to_prune) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) |> Enum.each(fn team -> Plausible.Teams.Invitations.prune_guest_invitations(team) end) @@ -307,7 +294,7 @@ defmodule Plausible.DataMigration.BackfillTeams do inviter: inv } ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log( "Found #{length(site_invitations_to_backfill)} site invitations without guest invitation..." @@ -335,7 +322,7 @@ defmodule Plausible.DataMigration.BackfillTeams do is_distinct(gi.invitation_id, si.invitation_id), select: {gi, %{role: si.role, invitation_id: si.invitation_id}} ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(stale_guest_invitations)} guest invitations with role out of sync...") @@ -362,7 +349,7 @@ defmodule Plausible.DataMigration.BackfillTeams do as: :site_transfer, where: not exists(site_invitations_query) ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log("Found #{length(site_transfers_to_remove)} site transfers to remove...") @@ -402,7 +389,7 @@ defmodule Plausible.DataMigration.BackfillTeams do inviter: inv } ) - |> @repo.all(timeout: :infinity) + |> Repo.all(timeout: :infinity) log( "Found #{length(site_invitations_to_backfill)} ownership transfers without site transfer..." @@ -418,7 +405,7 @@ defmodule Plausible.DataMigration.BackfillTeams do end def delete_orphaned_teams(teams) do - Enum.each(teams, &@repo.delete!(&1)) + Enum.each(teams, &Repo.delete!/1) end defp backfill_teams(sites) do @@ -438,35 +425,27 @@ defmodule Plausible.DataMigration.BackfillTeams do _ -> :pass end) - |> Enum.with_index() - |> Task.async_stream( - fn {{owner, site_ids}, idx} -> - @repo.transaction( - fn -> - {:ok, team} = Teams.get_or_create(owner) - - team = - team - |> Ecto.Changeset.change() - |> Ecto.Changeset.put_change(:trial_expiry_date, owner.trial_expiry_date) - |> Ecto.Changeset.force_change(:updated_at, owner.updated_at) - |> @repo.update!() - - @repo.update_all(from(s in Plausible.Site, where: s.id in ^site_ids), - set: [team_id: team.id] - ) - end, - timeout: :infinity, - max_concurrency: @max_concurrency - ) + |> Enum.map(fn {owner, site_ids} -> + Repo.transaction( + fn -> + {:ok, team} = Teams.get_or_create(owner) + + team = + team + |> Ecto.Changeset.change() + |> Ecto.Changeset.put_change(:trial_expiry_date, owner.trial_expiry_date) + |> Ecto.Changeset.force_change(:updated_at, owner.updated_at) + |> Repo.update!() + + Repo.update_all(from(s in Plausible.Site, where: s.id in ^site_ids), + set: [team_id: team.id] + ) + end, + timeout: :infinity + ) - if rem(idx, 10) == 0 do - IO.write(".") - end - end, - timeout: :infinity - ) - |> Enum.to_list() + IO.write(".") + end) |> length() end @@ -474,7 +453,7 @@ defmodule Plausible.DataMigration.BackfillTeams do ids = Enum.map(guest_memberships, & &1.id) {_, team_ids} = - @repo.delete_all( + Repo.delete_all( from( gm in Teams.GuestMembership, inner_join: tm in assoc(gm, :team_membership), @@ -500,55 +479,43 @@ defmodule Plausible.DataMigration.BackfillTeams do _ -> :pass end) - |> Enum.with_index() - |> Task.async_stream( - fn {{{team, user}, site_memberships}, idx} -> - first_site_membership = - Enum.min_by(site_memberships, & &1.inserted_at) - - team_membership = - team - |> Teams.Membership.changeset(user, :guest) - |> Ecto.Changeset.put_change(:inserted_at, first_site_membership.inserted_at) - |> Ecto.Changeset.put_change(:updated_at, first_site_membership.updated_at) - |> @repo.insert!( - on_conflict: [set: [updated_at: first_site_membership.updated_at]], - conflict_target: [:team_id, :user_id] - ) + |> Enum.each(fn {{team, user}, site_memberships} -> + first_site_membership = + Enum.min_by(site_memberships, & &1.inserted_at) - Enum.each(site_memberships, fn site_membership -> - team_membership - |> Teams.GuestMembership.changeset( - site_membership.site, - translate_role(site_membership.role) - ) - |> Ecto.Changeset.put_change(:inserted_at, site_membership.inserted_at) - |> Ecto.Changeset.put_change(:updated_at, site_membership.updated_at) - |> @repo.insert!() - end) - - if rem(idx, 1000) == 0 do - IO.write(".") - end - end, - timeout: :infinity, - max_concurrency: @max_concurrency - ) - |> Stream.run() + team_membership = + team + |> Teams.Membership.changeset(user, :guest) + |> Ecto.Changeset.put_change(:inserted_at, first_site_membership.inserted_at) + |> Ecto.Changeset.put_change(:updated_at, first_site_membership.updated_at) + |> Repo.insert!( + on_conflict: [set: [updated_at: first_site_membership.updated_at]], + conflict_target: [:team_id, :user_id] + ) + + Enum.each(site_memberships, fn site_membership -> + team_membership + |> Teams.GuestMembership.changeset( + site_membership.site, + translate_role(site_membership.role) + ) + |> Ecto.Changeset.put_change(:inserted_at, site_membership.inserted_at) + |> Ecto.Changeset.put_change(:updated_at, site_membership.updated_at) + |> Repo.insert!() + end) + + IO.write(".") + end) end defp sync_guest_memberships(guest_memberships_and_roles) do - guest_memberships_and_roles - |> Enum.with_index() - |> Enum.each(fn {{guest_membership, role}, idx} -> + Enum.each(guest_memberships_and_roles, fn {guest_membership, role} -> guest_membership |> Ecto.Changeset.change(role: translate_role(role)) |> Ecto.Changeset.put_change(:updated_at, guest_membership.updated_at) - |> @repo.update!() + |> Repo.update!() - if rem(idx, 1000) == 0 do - IO.write(".") - end + IO.write(".") end) end @@ -556,7 +523,7 @@ defmodule Plausible.DataMigration.BackfillTeams do ids = Enum.map(guest_invitations, & &1.id) {_, team_ids} = - @repo.delete_all( + Repo.delete_all( from( gi in Teams.GuestInvitation, inner_join: ti in assoc(gi, :team_invitation), @@ -571,8 +538,7 @@ defmodule Plausible.DataMigration.BackfillTeams do defp backfill_guest_invitations(site_invitations) do site_invitations |> Enum.group_by(&{&1.site.team, &1.email}, & &1) - |> Enum.with_index() - |> Enum.each(fn {{{team, email}, site_invitations}, idx} -> + |> Enum.each(fn {{team, email}, site_invitations} -> first_site_invitation = List.first(site_invitations) team_invitation = @@ -585,7 +551,7 @@ defmodule Plausible.DataMigration.BackfillTeams do ) |> Ecto.Changeset.put_change(:inserted_at, first_site_invitation.inserted_at) |> Ecto.Changeset.put_change(:updated_at, first_site_invitation.updated_at) - |> @repo.insert!( + |> Repo.insert!( on_conflict: [set: [updated_at: first_site_invitation.updated_at]], conflict_target: [:team_id, :email] ) @@ -599,42 +565,34 @@ defmodule Plausible.DataMigration.BackfillTeams do |> Ecto.Changeset.put_change(:invitation_id, site_invitation.invitation_id) |> Ecto.Changeset.put_change(:inserted_at, site_invitation.inserted_at) |> Ecto.Changeset.put_change(:updated_at, site_invitation.updated_at) - |> @repo.insert!() + |> Repo.insert!() end) - if rem(idx, 1000) == 0 do - IO.write(".") - end + IO.write(".") end) end defp sync_guest_invitations(guest_and_site_invitations) do - guest_and_site_invitations - |> Enum.with_index() - |> Enum.each(fn {{guest_invitation, site_invitation}, idx} -> + Enum.each(guest_and_site_invitations, fn {guest_invitation, site_invitation} -> guest_invitation |> Ecto.Changeset.change() |> Ecto.Changeset.put_change(:role, translate_role(site_invitation.role)) |> Ecto.Changeset.put_change(:invitation_id, site_invitation.invitation_id) |> Ecto.Changeset.put_change(:updated_at, guest_invitation.updated_at) - |> @repo.update!() + |> Repo.update!() - if rem(idx, 1000) == 0 do - IO.write(".") - end + IO.write(".") end) end defp remove_site_transfers(site_transfers) do ids = Enum.map(site_transfers, & &1.id) - @repo.delete_all(from(st in Teams.SiteTransfer, where: st.id in ^ids)) + Repo.delete_all(from(st in Teams.SiteTransfer, where: st.id in ^ids)) end defp backfill_site_transfers(site_invitations) do - site_invitations - |> Enum.with_index() - |> Enum.each(fn {site_invitation, idx} -> + Enum.each(site_invitations, fn site_invitation -> site_invitation.site |> Teams.SiteTransfer.changeset( initiator: site_invitation.inviter, @@ -643,11 +601,9 @@ defmodule Plausible.DataMigration.BackfillTeams do |> Ecto.Changeset.put_change(:transfer_id, site_invitation.invitation_id) |> Ecto.Changeset.put_change(:inserted_at, site_invitation.inserted_at) |> Ecto.Changeset.put_change(:updated_at, site_invitation.updated_at) - |> @repo.insert!() + |> Repo.insert!() - if rem(idx, 1000) == 0 do - IO.write(".") - end + IO.write(".") end) end @@ -655,6 +611,6 @@ defmodule Plausible.DataMigration.BackfillTeams do defp translate_role("viewer"), do: :viewer defp log(msg) do - IO.puts("[#{NaiveDateTime.utc_now(:second)}] #{msg}") + IO.puts("[#{DateTime.utc_now(:second)}] #{msg}") end end diff --git a/priv/repo/migrations/20250117122435_backfill_teams.exs b/priv/repo/migrations/20250117122435_backfill_teams.exs new file mode 100644 index 000000000000..bdcc227bd6fd --- /dev/null +++ b/priv/repo/migrations/20250117122435_backfill_teams.exs @@ -0,0 +1,13 @@ +defmodule Plausible.Repo.Migrations.BackfillTeams do + use Ecto.Migration + + def up do + if Plausible.ce?() do + Plausible.DataMigration.BackfillTeams.run(dry_run?: false) + end + end + + def down do + raise "Irreversible" + end +end