Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
addressed rob's feedback
Browse files Browse the repository at this point in the history
icehaunter committed Jan 20, 2025
1 parent bbb3f41 commit 673ae82
Showing 16 changed files with 1,102 additions and 922 deletions.
107 changes: 48 additions & 59 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
@@ -2,10 +2,7 @@ defmodule Electric.ShapeCache.FileStorage do
use Retry
require Logger

alias Electric.LogItems
alias Electric.ShapeCache.LogChunker
alias Electric.Utils
alias Electric.ShapeCache.FileStorage.OnDisk
alias Electric.Telemetry.OpenTelemetry
alias Electric.Replication.LogOffset
import Electric.Replication.LogOffset, only: :macros
@@ -409,68 +406,52 @@ defmodule Electric.ShapeCache.FileStorage do
{:ok, {_, ^upper_bound}} ->
:ok

{:ok, {log_file_path, _}} ->
{:ok, {old_log, _}} ->
# compact further
new_log_file_path =
Path.join(
opts.log_dir,
"compact_log_#{DateTime.utc_now() |> DateTime.to_unix(:millisecond)}.electric"
)

CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end)
|> OnDisk.write_log_file(new_log_file_path)

Utils.concat_files([log_file_path, new_log_file_path], new_log_file_path <> ".merged")

key_index_path = OnDisk.create_sorted_key_index(new_log_file_path <> ".merged")

action_file_path =
OnDisk.create_action_file(new_log_file_path <> ".merged", key_index_path)

OnDisk.apply_actions(
new_log_file_path <> ".merged",
action_file_path,
&LogItems.merge_updates/2,
opts.chunk_bytes_threshold
)
new_log =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end)
|> FS.LogFile.write_log_file(new_log_file_path <> ".new")

merged_log =
FS.Compaction.merge_and_compact(
old_log,
new_log,
new_log_file_path,
opts.chunk_bytes_threshold
)

File.rename!(new_log_file_path <> ".merged", new_log_file_path)
CubDB.put(opts.db, @compaction_info_key, {new_log_file_path, upper_bound})
CubDB.put(opts.db, @compaction_info_key, {merged_log, upper_bound})
delete_compacted_keys(opts, upper_bound)
File.rm!(action_file_path)
File.rm!(key_index_path)
FS.Compaction.rm_log(new_log)
FS.Compaction.rm_log(old_log)
:ok

:error ->
log_file_path = Path.join(opts.log_dir, "compact_log.electric")

CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end)
|> OnDisk.write_log_file(log_file_path)

key_index_path = OnDisk.create_sorted_key_index(log_file_path)
action_file_path = OnDisk.create_action_file(log_file_path, key_index_path)

OnDisk.apply_actions(
log_file_path,
action_file_path,
&LogItems.merge_updates/2,
opts.chunk_bytes_threshold
)
log =
CubDB.select(opts.db,
min_key: log_start(),
max_key: log_key(upper_bound),
max_key_inclusive: true
)
|> Stream.map(fn {key, {op_key, type, json}} -> {offset(key), op_key, type, json} end)
|> FS.LogFile.write_log_file(log_file_path)
|> FS.Compaction.compact_in_place(opts.chunk_bytes_threshold)

CubDB.put(opts.db, @compaction_info_key, {log_file_path, upper_bound})
CubDB.put(opts.db, @compaction_info_key, {log, upper_bound})
delete_compacted_keys(opts, upper_bound)
File.rm!(action_file_path)
File.rm!(key_index_path)
:ok
end
end
@@ -551,8 +532,9 @@ defmodule Electric.ShapeCache.FileStorage do

defp stream_log_chunk(%LogOffset{} = offset, max_offset, %FS{} = opts) do
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {log_file_path, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
OnDisk.read_json_chunk(log_file_path, offset)
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
FS.ChunkIndex.fetch_chunk(elem(log, 1), offset)
FS.LogFile.read_chunk(log, offset)

_ ->
opts.db
@@ -624,14 +606,21 @@ defmodule Electric.ShapeCache.FileStorage do
def get_chunk_end_log_offset(offset, %FS{} = opts), do: get_chunk_end_for_log(offset, opts)

defp get_chunk_end_for_log(offset, %FS{} = opts) do
CubDB.select(opts.db,
min_key: chunk_checkpoint_key(offset),
max_key: chunk_checkpoint_end(),
min_key_inclusive: false
)
|> Stream.map(fn {key, _} -> offset(key) end)
|> Enum.take(1)
|> Enum.at(0)
case CubDB.fetch(opts.db, @compaction_info_key) do
{:ok, {log, upper_bound}} when is_log_offset_lt(offset, upper_bound) ->
{:ok, max_offset, _} = FS.ChunkIndex.fetch_chunk(elem(log, 1), offset)
max_offset

:error ->
CubDB.select(opts.db,
min_key: chunk_checkpoint_key(offset),
max_key: chunk_checkpoint_end(),
min_key_inclusive: false
)
|> Stream.map(fn {key, _} -> offset(key) end)
|> Enum.take(1)
|> Enum.at(0)
end
end

defp get_last_snapshot_offset(%FS{} = opts) do
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
defmodule Electric.ShapeCache.FileStorage.ActionFile do
@moduledoc false
alias Electric.Utils
alias Electric.ShapeCache.FileStorage.LogFile
alias Electric.ShapeCache.FileStorage.KeyIndex
import KeyIndex, only: :macros

@doc """
Convert a sorted key index to a sorted action file.
Action file is line-for-line mapping of log file offsets to actions of "keep", "skip" or "compact".
It's ordering should be the same as the log file to allow for sequential reads of both.
For "keep" lines, we keep the original, for "skip" lines, we skip the original, and for "compact" lines,
we read all specified JSONs from the log file and merge them into one. Multiple updates to the the same
key are mapped to be "skipped" for all but the last one, which is then mapped to "compact"
Action file format is, in elixir binary:
<<operation_offset::128, operation_type::binary>>
Where `operation_type` is one of:
<<?k::8>> #- Keep
<<?s::8>> #- Skip
<<?c::8, json_offsets_count::16, json_offsets::binary>> #- Compact
And `json_offsets` is `json_offsets_count` of `<<json_start_position::64, json_size::64>>`
"""
def create_from_key_index(key_index_path, action_file_path) do
KeyIndex.stream(key_index_path)
|> Stream.chunk_by(&key_index_item(&1, :key))
|> Stream.flat_map(fn chunk ->
# Chunk contains all operations for a given key in order

chunk
|> Enum.chunk_by(&key_index_item(&1, :op_type))
|> Enum.flat_map(fn
# Keep any single operation, since inserts/deletes won't be duplicated, and one update can't be compacted
[key_index_item(offset: offset)] -> [<<LogFile.offset(offset)::binary, ?k::8>>]
# If more than one, then it's definitely an update
updates -> updates_to_actions(updates)
end)
end)
|> Stream.into(File.stream!(action_file_path))
|> Stream.run()

Utils.external_merge_sort(action_file_path, &stream_for_sorting/1)
end

@doc """
Read the action file and return a stream of tuples `{offset, action}`.
"""
@spec stream(path :: String.t()) ::
Enumerable.t(
{LogFile.offset(),
:keep | :skip | {:compact, [{non_neg_integer(), non_neg_integer()}, ...]}}
)
def stream(action_file_path) do
Stream.resource(
fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end,
fn file ->
case IO.binread(file, 17) do
:eof ->
{:halt, file}

<<tx_offset::64, op_offset::64, ?c::8>> ->
<<count::16>> = IO.binread(file, 2)
offsets = for <<pos::64, size::64 <- IO.binread(file, 16 * count)>>, do: {pos, size}
{[{{tx_offset, op_offset}, {:compact, offsets}}], file}

<<tx_offset::64, op_offset::64, ?k::8>> ->
{[{{tx_offset, op_offset}, :keep}], file}

<<tx_offset::64, op_offset::64, ?s::8>> ->
{[{{tx_offset, op_offset}, :skip}], file}
end
end,
&File.close/1
)
end

# acc format: {positions_len, positions, actions}
defp updates_to_actions(updates, acc \\ {0, [], []})
# We don't care about order being reversed because it's going to be sorted.
defp updates_to_actions([], {_, _, acc}), do: acc

# The compaction target is either last one, or after we hit 65535 updates. Technically makes it suboptimal,
# but saves us a lot of memory because the position list will take up at most 65535 * 16 = 1048560 bytes ~ 1MB of memory,
# as opposed to 65536MB if we allow int32 positions.
defp updates_to_actions(
[key_index_item(offset: offset, json: last) | rest],
{total_positions, positions, actions}
)
when rest == []
when total_positions > 65534 do
actions =
[
[
<<LogFile.offset(offset)::binary, ?c::8, length(positions) + 1::16>>,
Utils.list_reverse_map([last | positions], fn {pos, size} -> <<pos::64, size::64>> end)
]
| actions
]

updates_to_actions(rest, {0, [], actions})
end

defp updates_to_actions(
[key_index_item(offset: offset, json: position) | rest],
{total_positions, all_positions, actions}
) do
updates_to_actions(
rest,
{total_positions + 1, [position | all_positions],
[[<<LogFile.offset(offset)::binary, ?s::8>>] | actions]}
)
end

@spec stream_for_sorting(String.t()) ::
Enumerable.t(Utils.sortable_binary({non_neg_integer(), non_neg_integer()}))
defp stream_for_sorting(action_file_path) do
Stream.resource(
fn -> File.open!(action_file_path, [:read, :raw, :read_ahead]) end,
fn file ->
case IO.binread(file, 17) do
:eof ->
{:halt, file}

<<tx_offset::64, op_offset::64, ?c::8>> = line ->
<<count::16>> = IO.binread(file, 2)

{[{{tx_offset, op_offset}, line <> <<count::16>> <> IO.binread(file, count * 16)}],
file}

<<tx_offset::64, op_offset::64, _::8>> = line ->
{[{{tx_offset, op_offset}, line}], file}
end
end,
&File.close/1
)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule Electric.ShapeCache.FileStorage.ChunkIndex do
@moduledoc false

alias Electric.Replication.LogOffset
alias Electric.ShapeCache.LogChunker
alias Electric.Utils
alias Electric.ShapeCache.FileStorage.LogFile

# 16 bytes offset + 8 bytes position + 16 bytes offset + 8 bytes position = 48 bytes
@chunk_entry_size 48

@doc """
Write a chunk index from the stream of log items to the given path.
A chunk index serves two purposes: it acts as a sparse index for the log file
and chunks are used to align client reads to benefit CDN cache hits.
The format of the file is:
<<first_offset::128, first_position::64, last_offset::128, last_position::64>>
Fixed byte width entries give us an opportunity to use binary search.
"""
@spec write_from_stream(
Enumerable.t(LogFile.log_item_with_sizes()),
path :: String.t(),
chunk_size :: non_neg_integer
) :: Enumerable.t(LogFile.log_item_with_sizes())
def write_from_stream(stream, path, chunk_size) do
Utils.stream_add_side_effect(
stream,
# agg is {file, write_position, byte_count, last_seen_offset}
fn -> {File.open!(path, [:write, :raw]), 0, 0, nil} end,
fn {offset, _, _, _, json_size, _} = line,
{file, write_position, byte_count, last_seen_offset} ->
# Start the chunk if there's no last offset
if is_nil(last_seen_offset),
do: IO.binwrite(file, <<LogFile.offset(offset)::binary, write_position::64>>)

position_after_write = LogFile.expected_position(write_position, line)

# We're counting bytes only on JSON payloads that are actually sent to the client
case LogChunker.fit_into_chunk(json_size, byte_count, chunk_size) do
{:ok, new_size} ->
{file, position_after_write, new_size, offset}

{:threshold_exceeded, 0} ->
# Chunk ended, finish writing the entry
IO.binwrite(file, <<LogFile.offset(offset)::binary, position_after_write::64>>)

{file, position_after_write, 0, nil}
end
end,
fn {file, pos, _, last_offset} = acc ->
# Finish writing the last entry if there is one
if not is_nil(last_offset),
do: IO.binwrite(file, <<LogFile.offset(last_offset)::binary, pos::64>>)

acc
end,
&File.close(elem(&1, 0))
)
end

@doc """
For a given chunk index, find the chunk that contains the first
offset greater than the given one.
Returns the max offset of the found chunk and reading boundaries for the log file.
"""
@spec fetch_chunk(path :: String.t(), LogOffset.t()) ::
{:ok, max_offset :: LogOffset.t(),
{start_position :: non_neg_integer, end_position :: non_neg_integer}}
| :error
def fetch_chunk(chunk_file_path, %LogOffset{} = exclusive_min_offset) do
file = File.open!(chunk_file_path, [:read, :raw])
{:ok, size} = :file.position(file, :eof)

try do
case do_binary_search(file, 0, div(size, @chunk_entry_size) - 1, exclusive_min_offset) do
{:ok, max_offset, start_pos, end_pos} -> {:ok, max_offset, {start_pos, end_pos}}
nil -> :error
end
after
File.close(file)
end
end

defp do_binary_search(file, left, right, %LogOffset{} = target)
when left <= right do
mid = div(left + right, 2)

{:ok, <<_min_tx::64, _min_op::64, start_pos::64, max_tx::64, max_op::64, end_pos::64>>} =
:file.pread(file, mid * @chunk_entry_size, @chunk_entry_size)

max_offset = LogOffset.new(max_tx, max_op)

case {LogOffset.compare(target, max_offset), mid} do
{:lt, mid} when mid > 0 ->
# Target is less than max_offset, this chunk might be the answer
# but let's check if there's a better one in the left half
do_binary_search(file, left, mid - 1, target) || {:ok, max_offset, start_pos, end_pos}

{:lt, _} ->
{:ok, max_offset, start_pos, end_pos}

{_, mid} when mid < right ->
# Target is equal to / greater than max_offset, need to look in right half
do_binary_search(file, mid + 1, right, target)

_ ->
# Target is greater than max_offset but we're at the end
nil
end
end

defp do_binary_search(_file, _left, _right, _target), do: nil
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Electric.ShapeCache.FileStorage.Compaction do
alias Electric.LogItems
alias Electric.Utils
alias Electric.ShapeCache.LogChunker
alias Electric.ShapeCache.FileStorage.LogFile
alias Electric.ShapeCache.FileStorage.KeyIndex
alias Electric.ShapeCache.FileStorage.ActionFile

@spec compact_in_place({String.t(), String.t(), String.t()}, non_neg_integer(), (any(), any() ->
any())) ::
{String.t(), String.t(), String.t()}
def compact_in_place(
{log_file_path, chunk_index_path, key_index_path},
chunk_size \\ LogChunker.default_chunk_size_threshold(),
merge_fun \\ &LogItems.merge_updates/2
) do
KeyIndex.sort(key_index_path)
ActionFile.create_from_key_index(key_index_path, log_file_path <> ".actions")

{new_log, new_chunk_index, new_key_index} =
LogFile.apply_actions(log_file_path, log_file_path <> ".actions", chunk_size, merge_fun)

File.rm!(log_file_path <> ".actions")
File.rename!(new_log, log_file_path)
File.rename!(new_chunk_index, chunk_index_path)
File.rename!(new_key_index, key_index_path)

{log_file_path, chunk_index_path, key_index_path}
end

def merge_and_compact(
log1,
log2,
merged_log_path,
chunk_size \\ LogChunker.default_chunk_size_threshold()
) do
{log_file_path1, _, key_index_path1} = log1
{log_file_path2, _, key_index_path2} = log2

second_part_start = File.stat!(log_file_path1).size
Utils.concat_files([log_file_path1, log_file_path2], merged_log_path)

KeyIndex.merge_with_offset(
key_index_path1,
key_index_path2,
merged_log_path <> ".key_index",
second_part_start
)

compact_in_place(
{merged_log_path, merged_log_path <> ".chunk_index", merged_log_path <> ".key_index"},
chunk_size
)
end

def rm_log({log, chunk_index, key_index}) do
File.rm!(log)
File.rm!(chunk_index)
File.rm!(key_index)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
defmodule Electric.ShapeCache.FileStorage.KeyIndex do
@moduledoc false
alias Electric.Replication.LogOffset
alias Electric.Utils
alias Electric.ShapeCache.FileStorage.LogFile

require Record

@doc """
Write an unsorted key index from the stream of log items to the given path.
Key index maps the keys of operation to the offsets for further processing.
We care about sorted index maps, but it's easier to generate them on the fly
and sort them later.
Key index format is, in elixir binary:
<<key_size::32, key::binary, operation_offset::128, operation_type::8, json_start_position::64, json_size::64>>
"""
@spec write_from_stream(Enumerable.t(LogFile.log_item_with_sizes()), path :: String.t()) ::
Enumerable.t(LogFile.log_item_with_sizes())
def write_from_stream(stream, path) do
Utils.stream_add_side_effect(
stream,
# We're using delayed writes to avoid interfering with writing the log. Write size here is 64KB or 1s delay
# It's used here because we're writing a line per log line, so this reduces disk contention
fn -> {File.open!(path, [:write, :raw, {:delayed_write, 64 * 1024, 1000}]), 0} end,
fn {log_offset, key_size, key, op_type, json_size, _} = line, {file, write_position} ->
IO.binwrite(
file,
<<key_size::32, key::binary, LogFile.offset(log_offset)::binary, op_type::8,
LogFile.expected_json_position(write_position, line)::64, json_size::64>>
)

{file, LogFile.expected_position(write_position, line)}
end,
&File.close(elem(&1, 0))
)
end

Record.defrecord(:key_index_item, key: nil, op_type: nil, offset: nil, json: nil)

@type key_index_item() ::
record(:key_index_item,
key: binary(),
op_type: LogFile.op_type(),
offset: LogOffset.t(),
json: {json_start_position :: non_neg_integer, json_size :: non_neg_integer}
)

@doc """
Read a key index from the given path.
"""
@spec stream(path :: String.t()) :: Enumerable.t(key_index_item())
def stream(path) do
Stream.resource(
fn -> File.open!(path, [:read, :raw, :read_ahead]) end,
fn file ->
with <<key_size::32>> <- IO.binread(file, 4),
<<key::binary-size(key_size)>> <- IO.binread(file, key_size),
<<tx_offset::64, op_offset::64, op_type::8, json_start_position::64, json_size::64>> <-
IO.binread(file, 8 * 4 + 1) do
item =
key_index_item(
key: key,
op_type: op_type,
offset: LogOffset.new(tx_offset, op_offset),
json: {json_start_position, json_size}
)

{[item], file}
else
:eof -> {:halt, file}
end
end,
&File.close/1
)
end

@doc """
Sort the key index file.
Sorts alpha-numerically by key first and offset second, so within each
key the operations are sorted by offset.
Uses an external merge sort to support large files, but requires
storage overhead while the sort is in-progress. Rewrites the original
file after the sort is complete.
"""
def sort(path) do
Utils.external_merge_sort(path, &stream_for_sorting/1, &<=/2)
end

@spec stream_for_sorting(path :: String.t()) ::
Enumerable.t(Utils.sortable_binary({binary(), non_neg_integer(), non_neg_integer()}))
defp stream_for_sorting(path) do
Stream.resource(
fn -> File.open!(path, [:read, :raw, :read_ahead]) end,
fn file ->
with <<key_size::32>> <- IO.binread(file, 4),
<<key::binary-size(key_size)>> <- IO.binread(file, key_size),
<<tx_offset::64, op_offset::64, op_type::8, json_start_position::64, json_size::64>> <-
IO.binread(file, 17 + 8 + 8) do
full_line =
<<key_size::32, key::binary, tx_offset::64, op_offset::64, op_type::8,
json_start_position::64, json_size::64>>

{[{{key, tx_offset, op_offset}, full_line}], file}
else
:eof -> {:halt, file}
end
end,
&File.close/1
)
end

@doc """
Merge two sorted key index files into a third file adjusting the positions of the second file by the given offset.
"""
@spec merge_with_offset(
path1 :: String.t(),
path2 :: String.t(),
output_path :: String.t(),
offset :: non_neg_integer()
) :: :ok
def merge_with_offset(path1, path2, output_path, offset) do
File.cp!(path1, output_path)

stream(path2)
|> Stream.map(fn key_index_item(json: {start_position, json_size}) = item ->
key_index_item(item, json: {start_position + offset, json_size})
end)
|> Stream.map(&serialize_key_index_item/1)
|> Stream.into(File.stream!(output_path, [:append]))
|> Stream.run()
end

defp serialize_key_index_item(
key_index_item(offset: offset, key: key, op_type: op_type, json: {pos, size})
) do
<<byte_size(key)::32, key::binary, LogFile.offset(offset)::binary, op_type::8, pos::64,
size::64>>
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
defmodule Electric.ShapeCache.FileStorage.LogFile do
@moduledoc false
alias Electric.ShapeCache.FileStorage.KeyIndex
alias Electric.LogItems
alias Electric.ShapeCache.FileStorage.ActionFile
alias Electric.ShapeCache.FileStorage.ChunkIndex
alias Electric.Replication.LogOffset
alias Electric.ShapeCache.LogChunker

# 16 bytes offset + 4 bytes key size + 1 byte op type + 8 bytes json size = 29 bytes
@line_overhead 16 + 4 + 1 + 8

@type operation_type() :: :insert | :update | :delete
@type op_type() :: ?u | ?i | ?d
# We're allowing tuple offsets to avoid struct creation in the hot path
@type offset() ::
{tx_offset :: non_neg_integer(), op_offset :: non_neg_integer()} | LogOffset.t()

@typedoc "Log item that can be written to the log file"
@type normal_log_item() ::
{offset(), key :: String.t(), op_type :: operation_type(), json :: String.t()}
@typedoc """
Log item that can be read from the log file, but with precomputed
`byte_size(key)` and `byte_size(json)` values, and with `op_type` as a byte
"""
@type log_item_with_sizes() ::
{offset(), key_size :: non_neg_integer(), key :: String.t(), op_type :: op_type(),
json_size :: non_neg_integer(), json :: String.t()}
@type log_item() :: normal_log_item() | log_item_with_sizes()

@typedoc """
Paths to the log file, chunk index, and key index files, used in conjuction
"""
@type log_and_supporting() ::
{log_file_path :: String.t(), chunk_index_path :: String.t(),
key_index_path :: String.t()}

@doc """
Write a log file based on the stream of log items.
Writes 2 files: the log file itself and the chunk index alongside it.
The log file structure is, in elixir binary:
<<tx_offset::64, op_offset::64,
key_size::32, key::binary-size(key_size),
op_type::binary-size(1),
json_size::64, json::binary-size(json_size)>>
"""
@spec write_log_file(
log_stream :: Enumerable.t(log_item()),
log_file_path :: String.t(),
chunk_size :: non_neg_integer()
) :: log_and_supporting()
def write_log_file(
log_stream,
log_file_path,
chunk_size \\ LogChunker.default_chunk_size_threshold()
) do
log_stream
|> normalize_log_stream()
|> ChunkIndex.write_from_stream(log_file_path <> ".chunk_index", chunk_size)
|> KeyIndex.write_from_stream(log_file_path <> ".key_index")
|> Stream.map(fn
{log_offset, key_size, key, op_type, json_size, json} ->
# avoid constructing a binary that includes the json
[
<<offset(log_offset)::binary, key_size::32, key::binary, op_type::8, json_size::64>>,
json
]
end)
|> Stream.into(File.stream!(log_file_path))
|> Stream.run()

{log_file_path, log_file_path <> ".chunk_index", log_file_path <> ".key_index"}
end

@doc """
Apply the compaction actions to the log file
"""
def apply_actions(
log_file_path,
action_file_path,
chunk_size \\ LogChunker.default_chunk_size_threshold(),
merge_updates_fun \\ &LogItems.merge_updates/2
) do
compacted_log_file_path = log_file_path <> ".compacted"

ActionFile.stream(action_file_path)
|> Stream.transform(
fn -> File.open!(log_file_path, [:read, :raw, :read_ahead]) end,
fn
{_, :skip}, file ->
_ = read_line(file)
{[], file}

{_, :keep}, file ->
{[read_line(file)], file}

{_, {:compact, offsets}}, file ->
{[compact_log_file_lines(file, offsets, merge_updates_fun)], file}
end,
&File.close(&1)
)
|> write_log_file(compacted_log_file_path, chunk_size)
end

defp read_line(file) do
with <<tx_offset::64, op_offset::64, key_size::32>> <- IO.binread(file, 20),
<<key::binary-size(key_size)>> <- IO.binread(file, key_size),
<<op_type::8, json_size::64>> <- IO.binread(file, 9),
<<json::binary-size(json_size)>> <- IO.binread(file, json_size) do
{{tx_offset, op_offset}, key_size, key, op_type, json_size, json}
end
end

@spec compact_log_file_lines(
:file.io_device(),
[{position :: non_neg_integer(), size :: non_neg_integer()}],
(elem, elem -> elem)
) :: log_item_with_sizes()
when elem: var
defp compact_log_file_lines(file, file_offsets, merge_updates_fun) do
# The line to be replaced with compaction will keep it's offset & key
{offset, key_size, key, op_type, _, _} = read_line(file)

# Save position
{:ok, current_position} = :file.position(file, :cur)

merged_json =
file_offsets
# Group reads to be efficient, but try to limit loading the JSONs to 10MB at a time.
# In the worst case when JSONs exceed 10MB, we'll just read one at a time.
|> chunk_expected_reads(bytes: 1024 * 1024 * 10)
|> Stream.flat_map(fn offsets ->
case :file.pread(file, offsets) do
{:ok, results} -> results
{:error, reason} -> raise inspect(reason)
:eof -> raise "unexpected end of file while reading back jsons from the log"
end
end)
|> Stream.map(&Jason.decode!/1)
|> Enum.reduce(fn new, acc -> merge_updates_fun.(acc, new) end)
|> Jason.encode!()

# Restore position to continue reading in the outer loop
{:ok, _} = :file.position(file, {:bof, current_position})

{offset, key_size, key, op_type, byte_size(merged_json), merged_json}
end

@spec normalize_log_stream(Enumerable.t(log_item())) :: Enumerable.t(log_item_with_sizes())
defp normalize_log_stream(stream) do
Stream.map(stream, fn
{log_offset, key, op_type, json} ->
{log_offset, byte_size(key), key, get_op_type(op_type), byte_size(json), json}

{_, _, _, _, _, _} = formed_line ->
formed_line
end)
end

@spec chunk_expected_reads(
Enumerable.t({position :: non_neg_integer(), size :: non_neg_integer()}),
bytes: non_neg_integer()
) :: Enumerable.t(list({position :: non_neg_integer(), size :: non_neg_integer()}))
defp chunk_expected_reads(stream, bytes: chunk_size) do
Stream.chunk_while(
stream,
{0, []},
fn
{_, size} = item, {total_size, acc} when total_size > chunk_size ->
{:cont, Enum.reverse(acc), {size, [item]}}

{_, size} = item, {total_size, acc} ->
{:cont, {total_size + size, [item | acc]}}
end,
fn
{_, []} -> {:cont, []}
{_, acc} -> {:cont, Enum.reverse(acc), []}
end
)
end

@doc """
Get the expected byte position in the file after the given log item is written.
Used by other modules that know the log file structure.
"""
@spec expected_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer()
def expected_position(
current_position,
{_log_offset, key_size, _key, _op_type, json_size, _json}
) do
current_position + key_size + json_size + @line_overhead
end

@doc """
Get the expected byte position of the JSON for the given log item after it's written.
Used by other modules that know the log file structure.
"""
@spec expected_json_position(non_neg_integer(), log_item_with_sizes()) :: non_neg_integer()
def expected_json_position(current_position, {_, key_size, _, _, _, _}) do
current_position + key_size + @line_overhead
end

@doc """
Read a chunk of the log file from the given offset.
Returns a stream of json strings.
"""
@spec read_chunk(log :: log_and_supporting(), LogOffset.t()) :: Enumerable.t(String.t())
def read_chunk({log_file_path, chunk_index_path, _key_index_path}, %LogOffset{} = offset) do
case ChunkIndex.fetch_chunk(chunk_index_path, offset) do
{:ok, _max_offset, {start_position, end_position}} ->
stream_jsons(log_file_path, start_position, end_position, offset)

:error ->
[]
end
end

defp stream_jsons(log_file_path, start_position, end_position, exclusive_min_offset) do
# We can read ahead entire chunk into memory since chunk sizes are expected to be ~10MB by default,
file = File.open!(log_file_path, [:read, :raw])

try do
with {:ok, data} <- :file.pread(file, start_position, end_position - start_position) do
extract_jsons_from_binary(data, exclusive_min_offset)
else
:eof -> raise "unexpected end of file"
{:error, reason} -> raise "error reading file: #{inspect(reason)}"
end
after
File.close(file)
end
end

@spec extract_jsons_from_binary(binary(), LogOffset.t()) :: Enumerable.t(String.t())
defp extract_jsons_from_binary(binary, exclusive_min_offset, acc \\ [])
defp extract_jsons_from_binary(<<>>, _, acc), do: Enum.reverse(acc)

defp extract_jsons_from_binary(
<<tx_offset1::64, op_offset1::64, key_size::32, _::binary-size(key_size),
_::binary-size(1), json_size::64, _::binary-size(json_size), rest::binary>>,
%LogOffset{
tx_offset: tx_offset2,
op_offset: op_offset2
} = log_offset,
acc
)
when tx_offset1 < tx_offset2 or (tx_offset1 == tx_offset2 and op_offset1 <= op_offset2),
do: extract_jsons_from_binary(rest, log_offset, acc)

defp extract_jsons_from_binary(
<<_::128, key_size::32, _::binary-size(key_size), _::binary-size(1), json_size::64,
json::binary-size(json_size), rest::binary>>,
log_offset,
acc
),
do: extract_jsons_from_binary(rest, log_offset, [json | acc])

defp get_op_type(:insert), do: ?i
defp get_op_type(:update), do: ?u
defp get_op_type(:delete), do: ?d

@doc "Serialize a non-infinite non-negative offset to a 16-byte binary"
@spec offset(offset()) :: binary
def offset(%LogOffset{tx_offset: tx_offset, op_offset: op_offset}),
do: <<tx_offset::64, op_offset::64>>

def offset({tx_offset, op_offset}), do: <<tx_offset::64, op_offset::64>>
end
535 changes: 0 additions & 535 deletions packages/sync-service/lib/electric/shape_cache/file_storage/on_disk.ex

This file was deleted.

6 changes: 5 additions & 1 deletion packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ defmodule Electric.StackSupervisor do
2. `Electric.Replication.ShapeLogCollector` collects transactions from the replication connection, fanning them out to `Electric.Shapes.Consumer` (4.1.1.2)
3. `Electric.ShapeCache` coordinates shape creation and handle allocation, shape metadata
"""
alias Electric.ShapeCache.LogChunker
use Supervisor, restart: :transient

@opts_schema NimbleOptions.new!(
@@ -175,7 +176,10 @@ defmodule Electric.StackSupervisor do
{mod,
arg
|> Keyword.put(:stack_id, stack_id)
|> Keyword.put(:chunk_bytes_threshold, opts[:chunk_bytes_threshold])
|> Keyword.put(
:chunk_bytes_threshold,
opts[:chunk_bytes_threshold] || LogChunker.default_chunk_size_threshold()
)
|> mod.shared_opts()}
end

26 changes: 23 additions & 3 deletions packages/sync-service/lib/electric/utils.ex
Original file line number Diff line number Diff line change
@@ -394,6 +394,8 @@ defmodule Electric.Utils do
end
end

@type sortable_binary(key) :: {key :: key, data :: binary()}

@doc """
Performs external merge sort on a file.
@@ -402,19 +404,19 @@ defmodule Electric.Utils do
* `reader` - Function that takes a file path and returns a stream of records. Records should be
in the form of `{key, binary}`, where `binary` will be written to the file sorted by `key`.
* `sorter` - Function that compares two keys, should return true if first argument is less than or equal to second
* `chunk_size` - Byte size of each chunk (i.e. how much is sorted in memory at once)
* `chunk_size` - Byte size of each chunk (i.e. how much is sorted in memory at once). Uses 50 MB by default.
The function will:
1. Split the input file into sorted temporary chunks
2. Merge the sorted chunks back into the original file
"""
@spec external_merge_sort(
path :: String.t(),
reader :: (path :: String.t() -> Enumerable.t({elem, binary()})),
reader :: (path :: String.t() -> Enumerable.t(sortable_binary(elem))),
sorter :: (elem, elem -> boolean())
) :: :ok
when elem: var
def external_merge_sort(path, reader, sorter, chunk_size \\ 50_000) do
def external_merge_sort(path, reader, sorter \\ &<=/2, chunk_size \\ 50 * 1024 * 1024) do
tmp_dir = Path.join(System.tmp_dir!(), "external_sort_#{:erlang.system_time()}")
File.mkdir_p!(tmp_dir)

@@ -496,4 +498,22 @@ defmodule Electric.Utils do
|> Stream.into(File.stream!(into))
|> Stream.run()
end

@doc """
Transform the stream to call a side-effect function for each element before continuing.
Acts like `Stream.each/2` but with an aggregate. `start_fun`, `last_fun`, `after_fun`
have the same semantics as in `Stream.transform/5`
"""
def stream_add_side_effect(stream, start_fun, reducer, last_fun \\ & &1, after_fun \\ & &1) do
Stream.transform(
stream,
start_fun,
fn elem, acc ->
{[elem], reducer.(elem, acc)}
end,
fn acc -> {[], last_fun.(acc)} end,
after_fun
)
end
end
39 changes: 39 additions & 0 deletions packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
@@ -124,6 +124,45 @@ defmodule Electric.Plug.RouterTest do
assert [%{"value" => %{"num" => "2"}}, _] = Jason.decode!(conn.resp_body)
end

@tag with_sql: [
"INSERT INTO items VALUES ('00000000-0000-0000-0000-000000000001', 'test value 1')"
]
test "GET after a compaction proceeds correctly",
%{opts: opts, db_conn: db_conn} do
conn = conn("GET", "/v1/shape?table=items&offset=-1") |> Router.call(opts)
assert [_] = Jason.decode!(conn.resp_body)

for x <- 1..10 do
Postgrex.query!(
db_conn,
"UPDATE items SET value = 'test value #{x}' WHERE id = '00000000-0000-0000-0000-000000000001'",
[]
)
end

shape_handle = get_resp_shape_handle(conn)

Process.sleep(500)

conn =
conn("GET", "/v1/shape?table=items&handle=#{shape_handle}&offset=0_0&live")
|> Router.call(opts)

assert length(Jason.decode!(conn.resp_body)) == 11
{:ok, offset} = LogOffset.from_string(get_resp_header(conn, "electric-offset"))

# Force compaction
Electric.ShapeCache.Storage.for_shape(shape_handle, opts[:storage])
|> Electric.ShapeCache.Storage.compact(offset)

conn =
conn("GET", "/v1/shape?table=items&handle=#{shape_handle}&offset=0_0")
|> Router.call(opts)

assert [%{"value" => %{"value" => "test value 10"}}, _] = Jason.decode!(conn.resp_body)
assert LogOffset.from_string(get_resp_header(conn, "electric-offset")) == {:ok, offset}
end

@tag with_sql: [
"INSERT INTO items VALUES (gen_random_uuid(), 'test value 1')"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Electric.ShapeCache.FileStorage.ActionFileTest do
use ExUnit.Case, async: true
alias Electric.ShapeCache.FileStorage.ActionFile
alias Electric.Replication.LogOffset
alias Electric.ShapeCache.FileStorage.LogFile

@moduletag :tmp_dir

describe "stream/1" do
test "streams actions from file", %{tmp_dir: tmp_dir} do
action_file_path = Path.join(tmp_dir, "action_file")

# Write test data in the correct binary format
actions = [
# Keep action
<<1::64, 1::64, ?k::8>>,
# Skip action
<<2::64, 1::64, ?s::8>>,
# Compact action with one position
<<3::64, 1::64, ?c::8, 1::16, 0::64, 10::64>>
]

File.write!(action_file_path, Enum.join(actions))

# Test streaming
result = ActionFile.stream(action_file_path) |> Enum.to_list()
assert length(result) == 3

assert [
{{1, 1}, :keep},
{{2, 1}, :skip},
{{3, 1}, {:compact, [{0, 10}]}}
] = result
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Electric.ShapeCache.FileStorage.ChunkIndexTest do
use ExUnit.Case, async: true
alias Electric.ShapeCache.FileStorage.LogFile
alias Electric.ShapeCache.FileStorage.ChunkIndex
alias Electric.Replication.LogOffset

@moduletag :tmp_dir

describe "write_from_stream/3" do
test "writes a chunk index", %{tmp_dir: tmp_dir} do
chunk_index_path = Path.join(tmp_dir, "chunk_index")

log_stream =
[
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"},
{%LogOffset{tx_offset: 2, op_offset: 2}, "key2", :insert, "value2"},
{%LogOffset{tx_offset: 3, op_offset: 3}, "key3", :insert, "value3"}
]
|> LogFile.normalize_log_stream()

refute File.exists?(chunk_index_path)

ChunkIndex.write_from_stream(log_stream, chunk_index_path, 10)
|> Stream.run()

assert File.exists?(chunk_index_path)
end
end

describe "fetch_chunk/2" do
test "fetches a chunk by offset", %{tmp_dir: tmp_dir} do
chunk_index_path = Path.join(tmp_dir, "chunk_index")

log_stream =
[
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"},
{%LogOffset{tx_offset: 2, op_offset: 2}, "key2", :insert, "value2"}
]
|> LogFile.normalize_log_stream()

result_stream = ChunkIndex.write_from_stream(log_stream, chunk_index_path, 10)
# consume the stream to write the file
Enum.to_list(result_stream)

result = ChunkIndex.fetch_chunk(chunk_index_path, %LogOffset{tx_offset: 0, op_offset: 0})
assert match?({:ok, %LogOffset{}, {_, _}}, result)
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Electric.ShapeCache.FileStorage.CompactionTest do
use ExUnit.Case, async: true
alias Electric.ShapeCache.FileStorage.Compaction
alias Electric.ShapeCache.FileStorage.LogFile
alias Electric.Replication.LogOffset

@moduletag :tmp_dir

describe "compact_in_place/2" do
test "compacts a log file", %{tmp_dir: tmp_dir} do
log_file_path = Path.join(tmp_dir, "log_file")

# Write initial log file with supporting files
log_stream = [
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, ~S|"value1"|},
{%LogOffset{tx_offset: 2, op_offset: 1}, "key1", :update, ~S|"value2"|},
{%LogOffset{tx_offset: 3, op_offset: 1}, "key2", :insert, ~S|"value3"|},
{%LogOffset{tx_offset: 4, op_offset: 1}, "key1", :update, ~S|"value new 1"|},
{%LogOffset{tx_offset: 5, op_offset: 1}, "key1", :update, ~S|"value new 2"|},
{%LogOffset{tx_offset: 6, op_offset: 1}, "key1", :update, ~S|"value new 3"|},
{%LogOffset{tx_offset: 7, op_offset: 1}, "key1", :update, ~S|"value new 4"|},
{%LogOffset{tx_offset: 8, op_offset: 1}, "key1", :update, ~S|"value new 5"|},
{%LogOffset{tx_offset: 9, op_offset: 1}, "key2", :delete, ~S|"value"|}
]

paths = LogFile.write_log_file(log_stream, log_file_path)

assert LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0})
|> Enum.to_list()
|> length == 9

assert {log_file_path, chunk_index_path, key_index_path} =
Compaction.compact_in_place(paths, 1_000_000, &(&1 <> &2))

assert File.exists?(log_file_path)
assert File.exists?(chunk_index_path)
assert File.exists?(key_index_path)

assert LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0})
|> Enum.to_list()
|> length == 4
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule Electric.ShapeCache.FileStorage.KeyIndexTest do
use ExUnit.Case, async: true
alias Electric.ShapeCache.FileStorage.KeyIndex
alias Electric.Replication.LogOffset

@moduletag :tmp_dir

describe "write_from_stream/2" do
test "writes key index from stream", %{tmp_dir: tmp_dir} do
key_index_path = Path.join(tmp_dir, "key_index")

log_stream =
Stream.map(
[
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"},
{%LogOffset{tx_offset: 2, op_offset: 1}, "key2", :insert, "value2"}
],
fn {offset, key, op, json} ->
{offset, byte_size(key), key, if(op == :insert, do: ?i, else: ?u), byte_size(json),
json}
end
)

refute File.exists?(key_index_path)
result_stream = KeyIndex.write_from_stream(log_stream, key_index_path)
assert is_function(result_stream)

# Consume the stream to write the file
Enum.to_list(result_stream)
assert File.exists?(key_index_path)
end
end

describe "stream/1" do
test "streams key index entries", %{tmp_dir: tmp_dir} do
key_index_path = Path.join(tmp_dir, "key_index")

log_stream =
Stream.map(
[
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"},
{%LogOffset{tx_offset: 2, op_offset: 1}, "key2", :insert, "value2"}
],
fn {offset, key, op, json} ->
{offset, byte_size(key), key, if(op == :insert, do: ?i, else: ?u), byte_size(json),
json}
end
)

result_stream = KeyIndex.write_from_stream(log_stream, key_index_path)
# consume the stream to write the file
Enum.to_list(result_stream)

# Test streaming
result = KeyIndex.stream(key_index_path) |> Enum.to_list()
assert length(result) > 0
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule Electric.ShapeCache.FileStorage.LogFileTest do
use ExUnit.Case, async: true
alias Electric.ShapeCache.FileStorage.LogFile
alias Electric.Replication.LogOffset
alias Electric.ShapeCache.LogChunker

@moduletag :tmp_dir

describe "write_log_file/2" do
test "writes a log file to disk", %{tmp_dir: tmp_dir} do
log_file_path = Path.join(tmp_dir, "log_file")

log_stream = [
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"},
{%LogOffset{tx_offset: 2, op_offset: 2}, "key2", :insert, "value2"},
{%LogOffset{tx_offset: 3, op_offset: 3}, "key3", :insert, "value3"}
]

refute File.exists?(log_file_path)

assert {^log_file_path, chunk_index_path, key_index_path} =
LogFile.write_log_file(log_stream, log_file_path)

assert File.exists?(log_file_path)
assert File.exists?(chunk_index_path)
assert File.exists?(key_index_path)

assert File.read!(log_file_path) =~ "value1"
assert File.read!(log_file_path) =~ "value2"
assert File.read!(log_file_path) =~ "value3"
end
end

describe "read_chunk/2" do
test "reads a chunk from disk according to the log offset", %{tmp_dir: tmp_dir} do
log_file_path = Path.join(tmp_dir, "log_file")

log_stream = [
# Will be in chunk 1
{%LogOffset{tx_offset: 1, op_offset: 1}, "key1", :insert, "value1"},
{%LogOffset{tx_offset: 1, op_offset: 2}, "key2", :insert, "value2"},
# Will be in chunk 2
{%LogOffset{tx_offset: 2, op_offset: 1}, "key3", :insert, "value3"},
{%LogOffset{tx_offset: 2, op_offset: 2}, "key4", :insert, "value4"},
# Will be in chunk 3
{%LogOffset{tx_offset: 3, op_offset: 1}, "key5", :insert, "value5"},
{%LogOffset{tx_offset: 3, op_offset: 2}, "key6", :insert, "value6"}
]

refute File.exists?(log_file_path)
# 10-byte chunks
assert {^log_file_path, _, _} =
paths = LogFile.write_log_file(log_stream, log_file_path, 10)

chunk = LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0})
assert length(chunk) > 0
end
end
end

This file was deleted.

0 comments on commit 673ae82

Please sign in to comment.