Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for transaction ID wraparound when comparing incoming transaction xid to the snapshot xmin #2217

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions packages/sync-service/lib/electric/postgres/xid.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Electric.Postgres.Xid do
import Bitwise

@int32_max 0xFFFFFFFF
@int32_half_max 0x7FFFFFFF

@type anyxid :: pos_integer
@type cmp_result :: :lt | :eq | :gt

defguardp int32?(int) when abs(int) <= @int32_max

# This is a specialized guard for that specifically determines whether the 32-bit first
# argument is less than the xid8 argument. For the general principle this is based on, look
# at the implementation of `compare/2` below.
defguard xid_lt_xid8(xid, xid8)
when int32?(xid) and
((xid - band(xid8, @int32_max) <= @int32_half_max and
xid < band(xid8, @int32_max)) or
(xid - band(xid8, @int32_max) > @int32_half_max and
xid > band(xid8, @int32_max)))

@spec compare(anyxid, anyxid) :: cmp_result

def compare(xid, xid), do: :eq

# When both arguments are 32-bit integers or both have values that don't fit in 32 bits, use the
# direct comparison.
def compare(xid_l, xid_r)
when (int32?(xid_l) and int32?(xid_r)) or not (int32?(xid_l) or int32?(xid_r)),
do: direct_cmp(xid_l, xid_r)

# When one of the arguments is 32-bit and the other one has a value that doesn't fit in 32 bits,
# perform the comparison on masked values.
#
# In Postgres, any xid has ~2 billion values preceding it and ~2 billion values following it.
# Regular autovacuuming maintains this invariant. So when we see a difference between two
# xids that is larger than 2^31, it means the 32-bit argument is a wrapped value, so it
# must be the most recent one.
def compare(xid8, xid) when int32?(xid) do
compare(xid, xid8)
|> reverse_cmp_result()
end

def compare(xid, xid8) when int32?(xid) do
xid8_masked = band(xid8, @int32_max)

diff = xid - xid8_masked
wrapped? = diff > @int32_half_max

diff_to_cmp_result(wrapped?, diff)
end

@spec diff_to_cmp_result(wrapped? :: boolean, diff :: integer) :: cmp_result
defp diff_to_cmp_result(false, diff) when diff > 0, do: :gt
defp diff_to_cmp_result(false, diff) when diff < 0, do: :lt
defp diff_to_cmp_result(true, diff) when diff > 0, do: :lt
defp diff_to_cmp_result(true, diff) when diff < 0, do: :gt

###

defp direct_cmp(xid_l, xid_r) when xid_l < xid_r, do: :lt
defp direct_cmp(xid_l, xid_r) when xid_l > xid_r, do: :gt

defp reverse_cmp_result(:lt), do: :gt
defp reverse_cmp_result(:gt), do: :lt
end
5 changes: 4 additions & 1 deletion packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule Electric.Shapes.Consumer do
restart: :temporary,
significant: true

import Electric.Postgres.Xid, only: [xid_lt_xid8: 2]

alias Electric.ShapeCache.LogChunker
alias Electric.LogItems
alias Electric.Replication.Changes
Expand Down Expand Up @@ -231,7 +233,8 @@ defmodule Electric.Shapes.Consumer do
end
end

defp handle_txn(%Transaction{xid: xid}, %{snapshot_xmin: xmin} = state) when xid < xmin do
defp handle_txn(%Transaction{xid: xid}, %{snapshot_xmin: xmin} = state)
when xid_lt_xid8(xid, xmin) do
{:cont, state}
end

Expand Down
Loading